Skip to content

Commit

Permalink
Clients now cache discovery responses until they've joined successful…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Mar 24, 2020
1 parent 316ed64 commit 4c01a03
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 78 deletions.
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/Discovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ protected static String addressAsString(Address address) {
* @param left_mbrs The members which left. These are excluded from dissemination. Can be null if no members left
* @param new_mbrs The new members that we need to disseminate the information to. Will be all members if null.
*/
protected void disseminateDiscoveryInformation(List current_mbrs, List<Address> left_mbrs, List<Address> new_mbrs) {
protected void disseminateDiscoveryInformation(List<Address> current_mbrs, List<Address> left_mbrs, List<Address> new_mbrs) {
if(new_mbrs == null || new_mbrs.isEmpty())
return;

Expand Down
126 changes: 65 additions & 61 deletions src/org/jgroups/protocols/pbcast/ClientGmsImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,78 +46,86 @@ public void joinWithStateTransfer(Address local_addr, boolean useFlushIfPresent)
}

/**
* Joins this process to a group. Determines the coordinator and sends a
* unicast handleJoin() message to it. The coordinator returns a JoinRsp and
* then broadcasts the new view, which contains a message digest and the
* current membership (including the joiner). The joiner is then supposed to
* install the new view and the digest and starts accepting mcast messages.
* Previous mcast messages were discarded (this is done in PBCAST).
* <p>
* If successful, impl is changed to an instance of ParticipantGmsImpl.
* Otherwise, we continue trying to send join() messages to the coordinator,
* until we succeed (or there is no member in the group. In this case, we
* Makes this process join a group. Determines the coordinator and sends a JOIN request to it. The coordinator
* returns a JOIN response, then broadcasts the new view, which contains a message digest and the current membership
* (including the joiner). The joiner then installs the new view and the digest and starts accepting messages.
* <br/>
* If successful, impl is changed to an instance of ParticipantGmsImpl. Otherwise, we continue trying to send JOIN
* requests to the coordinator, until we succeed (or there is no member in the group. In this case, we
* create our own singleton group).
*
* @param mbr Our own address (assigned through SET_LOCAL_ADDRESS)
* @param mbr Our own address
*/
protected void joinInternal(Address mbr, boolean joinWithStateTransfer, boolean useFlushIfPresent) {
int join_attempts=0;
join_promise.reset();

while(!gms.isLeaving()) {
if(installViewIfValidJoinRsp(join_promise, false))
return;

long start=System.currentTimeMillis();
Responses responses=(Responses)gms.getDownProtocol().down(new Event(Event.FIND_INITIAL_MBRS, gms.getJoinTimeout()));

// Sept 2008 (bela): return if we got a belated JoinRsp (https://jira.jboss.org/jira/browse/JGRP-687)
if(installViewIfValidJoinRsp(join_promise, false))
return;

responses.waitFor(gms.join_timeout);
responses.done();
long diff=System.currentTimeMillis() - start;
boolean empty;
if((empty=responses.isEmpty()) || responses.isCoord(gms.local_addr)) {
String m=String.format("%s: %s: creating cluster as coordinator", gms.local_addr,
empty? String.format("no members discovered after %d ms", diff)
: "I'm the first member");
log.info(m);
becomeSingletonMember(mbr);
return;
}
log.trace("%s: discovery took %d ms, members: %s", gms.local_addr, diff, responses);
Responses responses=null; // caches responses from all discovery runs (usually there's only 1 run)
try {
while(!gms.isLeaving()) {
if(installViewIfValidJoinRsp(join_promise, false))
return;

List<Address> coords=getCoords(responses);
long start=System.currentTimeMillis();
if(responses == null)
responses=(Responses)gms.getDownProtocol().down(new Event(Event.FIND_INITIAL_MBRS, gms.getJoinTimeout()));
else {
Responses tmp=(Responses)gms.getDownProtocol().down(new Event(Event.FIND_INITIAL_MBRS, gms.getJoinTimeout()));
if(tmp != null) {
responses.add(tmp, gms.local_addr);
tmp.done();
}
}

// We didn't get any coord responses; all responses were clients. If I'm the first of the sorted clients
// I'll become coordinator. The others will wait and then retry the discovery and join process
if(coords == null) { // e.g. because we have all clients only
if(firstOfAllClients(mbr, responses))
// Sept 2008 (bela): return if we got a belated JoinRsp (https://jira.jboss.org/jira/browse/JGRP-687)
if(installViewIfValidJoinRsp(join_promise, false))
return;

responses.waitFor(gms.join_timeout);
long diff=System.currentTimeMillis() - start;
boolean empty;
if((empty=responses.isEmpty()) || responses.isCoord(gms.local_addr)) {
log.info("%s: %s: creating cluster as coordinator", gms.local_addr,
empty? String.format("no members discovered after %d ms", diff) : "I'm the first member");
becomeSingletonMember(mbr);
return;
}
else {
if(coords.size() > 1) {
log.debug("%s: found multiple coords: %s", gms.local_addr, coords);
Collections.shuffle(coords); // so the code below doesn't always pick the same coord
}
for(Address coord : coords) {
log.debug("%s: sending JOIN(%s) to %s", gms.local_addr, mbr, coord);
sendJoinMessage(coord, mbr, joinWithStateTransfer, useFlushIfPresent);
if(installViewIfValidJoinRsp(join_promise, true))
log.trace("%s: discovery took %d ms, members: %s", gms.local_addr, diff, responses);

List<Address> coords=getCoords(responses);

// We didn't get any coord responses; all responses were clients. If I'm the first of the sorted clients
// I'll become coordinator. The others will wait and then retry the discovery and join process
if(coords == null) { // e.g. because we have all clients only
if(firstOfAllClients(mbr, responses))
return;
log.warn("%s: JOIN(%s) sent to %s timed out (after %d ms), on try %d",
gms.local_addr, mbr, coord, gms.join_timeout, join_attempts);
}
}
else {
if(coords.size() > 1) {
log.debug("%s: found multiple coords: %s", gms.local_addr, coords);
Collections.shuffle(coords); // so the code below doesn't always pick the same coord
}
for(Address coord : coords) {
log.debug("%s: sending JOIN(%s) to %s", gms.local_addr, mbr, coord);
sendJoinMessage(coord, mbr, joinWithStateTransfer, useFlushIfPresent);
if(installViewIfValidJoinRsp(join_promise, true))
return;
log.warn("%s: JOIN(%s) sent to %s timed out (after %d ms), on try %d",
gms.local_addr, mbr, coord, gms.join_timeout, join_attempts);
}
}

if(gms.max_join_attempts > 0 && ++join_attempts >= gms.max_join_attempts) {
log.warn("%s: too many JOIN attempts (%d): becoming singleton", gms.local_addr, join_attempts);
becomeSingletonMember(mbr);
return;
if(gms.max_join_attempts > 0 && ++join_attempts >= gms.max_join_attempts) {
log.warn("%s: too many JOIN attempts (%d): becoming singleton", gms.local_addr, join_attempts);
becomeSingletonMember(mbr);
return;
}
}
}
finally {
if(responses != null)
responses.done();
}
}


Expand Down Expand Up @@ -202,10 +210,6 @@ protected boolean isJoinResponseValid(final JoinRsp rsp) {
}



/**
* Called by join(). Installs the view returned by calling Coord.handleJoin() and becomes coordinator.
*/
private boolean installView(View new_view, Digest digest) {
if(!new_view.containsMember(gms.local_addr)) {
log.error("%s: I'm not member of %s, will not install view", gms.local_addr, new_view);
Expand All @@ -226,7 +230,7 @@ void sendJoinMessage(Address coord, Address mbr,boolean joinWithTransfer, boolea
}


/** Returns all members whose PingData is flagged as coordinator */
/** Returns all members who are flagged as coordinator */
private static List<Address> getCoords(Iterable<PingData> mbrs) {
if(mbrs == null)
return null;
Expand Down
13 changes: 11 additions & 2 deletions src/org/jgroups/util/Responses.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ public Responses clear() {
}
}

public void addResponse(PingData rsp, boolean overwrite) {
public Responses addResponse(PingData rsp, boolean overwrite) {
if(rsp == null)
return;
return this;

boolean is_coord_rsp=rsp.isCoord(), changed=false;
lock.lock();
Expand All @@ -83,12 +83,21 @@ public void addResponse(PingData rsp, boolean overwrite) {
}
if(changed && ((num_expected_rsps > 0 && index >= num_expected_rsps) || break_on_coord_rsp && is_coord_rsp))
_done();
return this;
}
finally {
lock.unlock();
}
}

public Responses add(Responses rsps, Address local_addr) {
if(rsps != null) {
for(PingData rsp: rsps)
addResponse(rsp, Objects.equals(local_addr, rsp.getAddress()));
}
return this;
}

public boolean containsResponseFrom(Address mbr) {
if(mbr == null) return false;
for(int i=0; i < index; i++) {
Expand Down
39 changes: 27 additions & 12 deletions tests/junit-functional/org/jgroups/tests/ResponsesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,23 @@ public void testAddResponses() throws Exception {
assert rsps.waitFor(60000);
}

public void testAdd() {
Responses rsps=new Responses(0, true, 16);
System.out.println("rsps = " + rsps);
assert !rsps.isDone();
for(int i=0; i < 5; i++)
rsps.addResponse(new PingData(addrs[i], true, names[i], phys_addrs[i]), false);
assert !rsps.isDone();
System.out.println("rsps = " + rsps);

Responses rsps2=new Responses(0, true)
.addResponse(new PingData(addrs[2], true, names[2], phys_addrs[2]).coord(true), false);
assert rsps2.isDone();
rsps.add(rsps2, addrs[2]);
System.out.println("rsps = " + rsps);
assert rsps.isDone();
}

public void testContainsResponse() {
Responses rsps=new Responses(10, true);
assert !rsps.isDone();
Expand All @@ -79,14 +96,14 @@ public void testResize() throws Exception {
}

public void testSizeOfOne() {
Responses rsps=new Responses(1, true, 1);
rsps.addResponse(new PingData(addrs[0],true,names[0],phys_addrs[0]),false);
Responses rsps=new Responses(1, true, 1)
.addResponse(new PingData(addrs[0],true,names[0],phys_addrs[0]),false);
assert rsps.isDone();
}

public void testBreakOnCoordRsp() {
Responses rsps=new Responses(true);
rsps.addResponse(new PingData(addrs[0],true,names[0],phys_addrs[0]), false);
Responses rsps=new Responses(true)
.addResponse(new PingData(addrs[0],true,names[0],phys_addrs[0]), false);
assert !rsps.isDone();

rsps.addResponse(new PingData(addrs[1],true,names[1],phys_addrs[1]).coord(true), false);
Expand Down Expand Up @@ -133,14 +150,12 @@ public void testWaitFor2() throws Exception {
boolean done=rsps.waitFor(500);
assert !done;
long start=System.currentTimeMillis();
new Thread() {
public void run() {
Util.sleep(500);
for(int i=0; i < 2; i++)
rsps.addResponse(new PingData(addrs[i],true,names[i],phys_addrs[i]), false);
rsps.addResponse(new PingData(addrs[3], true, names[3], phys_addrs[3]).coord(true), false);
}
}.start();
new Thread(() -> {
Util.sleep(500);
for(int i=0; i < 2; i++)
rsps.addResponse(new PingData(addrs[i],true,names[i],phys_addrs[i]), false);
rsps.addResponse(new PingData(addrs[3], true, names[3], phys_addrs[3]).coord(true), false);
}).start();

done=rsps.waitFor(20000);
long time=System.currentTimeMillis() - start;
Expand Down
4 changes: 2 additions & 2 deletions tests/junit/org/jgroups/tests/ConcurrentStartupTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ public Object up(Event evt) {
}

// @Test(invocationCount=10)
/*public void testConcurrentJoinWithPING() throws Exception {
public void testConcurrentJoinWithPING() throws Exception {
setup(UDP.class, PING.class);
startThreads("withUDPandPING");
}*/
}

protected static JChannel create(Class<? extends TP> tp_cl, Class<? extends Discovery> discovery_cl,
String name) throws Exception {
Expand Down

0 comments on commit 4c01a03

Please sign in to comment.