diff --git a/bin/jgroups.sh b/bin/jgroups.sh index d35b080a69..a5d9514565 100755 --- a/bin/jgroups.sh +++ b/bin/jgroups.sh @@ -50,5 +50,8 @@ FLAGS="-server -Xmx1G -Xms500M -XX:+HeapDumpOnOutOfMemoryError" # SSL_FLAGS="-Djavax.net.debug=ssl:handshake" Z1=-XX:+UseZGC -java $Z1 -cp $CP $SSL_FLAGS $DEBUG $LOG $JG_FLAGS $FLAGS $JMX $JMC $* +# Dump with jcmd Thread.dump_to_file +# DUMP_VTHREADS=-Djdk.trackAllThreads=true + +java $DUMP_VTHREADS $Z1 -cp $CP $SSL_FLAGS $DEBUG $LOG $JG_FLAGS $FLAGS $JMX $JMC $* diff --git a/src/org/jgroups/protocols/FlowControl.java b/src/org/jgroups/protocols/FlowControl.java index 847a894678..bda99b61f9 100644 --- a/src/org/jgroups/protocols/FlowControl.java +++ b/src/org/jgroups/protocols/FlowControl.java @@ -44,7 +44,7 @@ public abstract class FlowControl extends Protocol { * a REPLENISHMENT request to the members from which we expect credits. A value <= 0 means to wait forever. */ @Property(description="Max time (in ms) to block",type=AttributeType.TIME) - protected long max_block_time=500; + protected long max_block_time=5000; /** @@ -55,7 +55,7 @@ public abstract class FlowControl extends Protocol { @Property(description="The threshold (as a percentage of max_credits) at which a receiver sends more credits to " + "a sender. Example: if max_credits is 1'000'000, and min_threshold 0.25, then we send ca. 250'000 credits " + "to P once we've got only 250'000 credits left for P (we've received 750'000 bytes from P)") - protected double min_threshold=0.40; + protected double min_threshold=0.20; /** * Computed as max_credits times min_theshold. If explicitly set, this will diff --git a/tests/perf/org/jgroups/tests/perf/MPerf.java b/tests/perf/org/jgroups/tests/perf/MPerf.java index 0b14dfa031..14c409b64b 100644 --- a/tests/perf/org/jgroups/tests/perf/MPerf.java +++ b/tests/perf/org/jgroups/tests/perf/MPerf.java @@ -49,6 +49,7 @@ public class MPerf implements Receiver { protected final Log log=LogFactory.getLog(getClass()); protected Path out_file_path; protected boolean looping=true; + protected long sleep; protected final ResponseCollector results=new ResponseCollector<>(); protected ThreadFactory thread_factory; protected static final short ID=ClassConfigurator.getProtocolId(MPerf.class); @@ -99,14 +100,14 @@ protected void eventLoop() { final String INPUT= "[1] Start test [2] View [4] Threads (%d) [6] Time (%,ds) [7] Msg size (%s)\n" + "[8] Number of senders (%s) [o] Toggle OOB (%s) [l] Toggle measure local messages (%s)\n" + - "[s] Display message sources (%s)\n" + + "[s] Display message sources (%s) [9] sleep (%d ms)\n" + "[x] Exit this [X] Exit all"; while(looping) { try { int c=Util.keyPress(String.format(INPUT, num_threads, time, Util.printBytes(msg_size), num_senders <= 0? "all" : String.valueOf(num_senders), - oob, log_local, display_msg_src)); + oob, log_local, display_msg_src, sleep)); switch(c) { case '1': startTest(); @@ -126,6 +127,9 @@ protected void eventLoop() { case '8': configChange("num_senders"); break; + case '9': + configChange("sleep"); + break; case 'o': ConfigChange change=new ConfigChange("oob", !oob); send(null, change, MPerfHeader.CONFIG_CHANGE, Message.Flag.RSVP); @@ -208,6 +212,7 @@ protected String printParameters() { sb.append("num_senders=").append(num_senders).append('\n'); sb.append("oob=").append(oob).append('\n'); sb.append("log_local=").append(log_local).append('\n'); + sb.append("sleep=").append(sleep).append('\n'); sb.append("display_msg_src=").append(display_msg_src).append('\n'); return sb.toString(); } @@ -267,6 +272,8 @@ public void receive(Message msg) { case MPerfHeader.DATA: if (log_local || !Objects.equals(msg.getSrc(), local_addr)) { received_msgs_map.addMessage(msg.getSrc()); + if(sleep > 0) + Util.sleep(sleep); } break; @@ -323,6 +330,8 @@ public void receive(MessageBatch batch) { if(type == MPerfHeader.DATA) { if (log_local || !Objects.equals(msg.getSrc(), local_addr)) { received_msgs_map.addMessage(msg.getSrc()); + if(sleep > 0) + Util.sleep(sleep); } } else { receive(msg); @@ -362,6 +371,7 @@ protected void handleConfigRequest(Address sender) throws Exception { cfg.addChange("num_senders", num_senders); cfg.addChange("oob", oob); cfg.addChange("log_local", log_local); + cfg.addChange("sleep", sleep); send(sender,cfg,MPerfHeader.CONFIG_RSP); }