Skip to content

Commit

Permalink
Added xmits_enabled to UNICAST3 (https://issues.redhat.com/browse/JGR…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Mar 14, 2023
1 parent d720dbc commit 1e307bd
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 49 deletions.
112 changes: 71 additions & 41 deletions src/org/jgroups/protocols/UNICAST3.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@
import org.jgroups.util.*;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -37,7 +34,7 @@
@MBean(description="Reliable unicast layer")
public class UNICAST3 extends Protocol implements AgeOutCache.Handler<Address> {
protected static final long DEFAULT_FIRST_SEQNO=Global.DEFAULT_FIRST_UNICAST_SEQNO;

protected static final long DEFAULT_XMIT_INTERVAL=500;

/* ------------------------------------------ Properties ------------------------------------------ */

Expand Down Expand Up @@ -68,7 +65,12 @@ public class UNICAST3 extends Protocol implements AgeOutCache.Handler<Address> {
protected long max_retransmit_time=60 * 1000L;

@Property(description="Interval (in milliseconds) at which messages in the send windows are resent",type=AttributeType.TIME)
protected long xmit_interval=500;
protected long xmit_interval=DEFAULT_XMIT_INTERVAL;

@Property(description="When true, the sender retransmits messages until ack'ed and the receiver asks for missing " +
"messages. When false, this is not done, but ack'ing and stale connection testing is still done. " +
"https://issues.redhat.com/browse/JGRP-2676")
protected boolean xmits_enabled=true;

@Property(description="If true, trashes warnings about retransmission messages not found in the xmit_table (used for testing)")
protected boolean log_not_found_msgs=true;
Expand Down Expand Up @@ -203,6 +205,8 @@ public <T extends Protocol> T setLevel(String level) {
}
public long getXmitInterval() {return xmit_interval;}
public UNICAST3 setXmitInterval(long i) {xmit_interval=i; return this;}
public boolean isXmitsEnabled() {return xmits_enabled;}
public UNICAST3 setXmitsEnabled(boolean b) {xmits_enabled=b; return this;}
public int getXmitTableNumRows() {return xmit_table_num_rows;}
public UNICAST3 setXmitTableNumRows(int n) {this.xmit_table_num_rows=n; return this;}
public int getXmitTableMsgsPerRow() {return xmit_table_msgs_per_row;}
Expand Down Expand Up @@ -384,8 +388,25 @@ public void init() throws Exception {
max_xmit_req_size=Math.min(max_xmit_req_size, estimated_max_msgs_in_xmit_req);
if(old_max_xmit_size != max_xmit_req_size)
log.trace("%s: set max_xmit_req_size from %d to %d", local_addr, old_max_xmit_size, max_xmit_req_size);

if(xmit_interval <= 0) {
log.warn("%s: xmit_interval (%d) has to be > 0; setting it to the default of %d",
local_addr, xmit_interval, DEFAULT_XMIT_INTERVAL);
xmit_interval=DEFAULT_XMIT_INTERVAL;
}

if(xmits_enabled == false) {
// https://issues.redhat.com/browse/JGRP-2676
RejectedExecutionHandler handler=transport.getThreadPool().getRejectedExecutionHandler();
if(!isCallerRunsHandler(handler)) {
log.warn("%s: xmits_enabled == false requires a CallerRunsPolicy in the thread pool; replacing %s",
local_addr, handler.getClass().getSimpleName());
transport.getThreadPool().setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
}
}
}


public void start() throws Exception {
msg_cache.clear();
timer=getTransport().getTimer();
Expand Down Expand Up @@ -1101,6 +1122,11 @@ public void stopRetransmitTask() {
}
}

protected static boolean isCallerRunsHandler(RejectedExecutionHandler h) {
return h instanceof ThreadPoolExecutor.CallerRunsPolicy ||
(h instanceof ShutdownRejectedExecutionHandler
&& ((ShutdownRejectedExecutionHandler)h).handler() instanceof ThreadPoolExecutor.CallerRunsPolicy);
}

protected void sendAck(Address dst, long seqno, short conn_id) {
if(!running) // if we are disconnected, then don't send any acks which throw exceptions on shutdown
Expand Down Expand Up @@ -1243,8 +1269,7 @@ public int removeConnections(boolean remove_send_connections, boolean remove_rec

@ManagedOperation(description="Triggers the retransmission task")
public void triggerXmit() {
SeqnoList missing;

// check for gaps in the received messages and ask senders to send them again
for(Map.Entry<Address,ReceiverEntry> entry: recv_table.entrySet()) {
Address target=entry.getKey(); // target to send retransmit requests to
ReceiverEntry val=entry.getValue();
Expand All @@ -1254,48 +1279,53 @@ public void triggerXmit() {
if(win != null && val.needToSendAck()) // sendAck() resets send_ack to false
sendAck(target, win.getHighestDeliverable(), val.connId());

// receiver: retransmit missing messages (getNumMissing() is fast)
if(win != null && win.getNumMissing() > 0 && (missing=win.getMissing(max_xmit_req_size)) != null) {
long highest=missing.getLast();
Long prev_seqno=xmit_task_map.get(target);
if(prev_seqno == null)
xmit_task_map.put(target, highest); // no retransmission
else {
missing.removeHigherThan(prev_seqno); // we only retransmit the 'previous batch'
if(highest > prev_seqno)
xmit_task_map.put(target, highest);
if(!missing.isEmpty()) {
// remove msgs that are <= highest-delivered (https://issues.redhat.com/browse/JGRP-2574)
long highest_deliverable=win.getHighestDeliverable(), first=missing.getFirst();
if(first < highest_deliverable)
missing.removeLowerThan(highest_deliverable + 1);
retransmit(missing, target);
if(xmits_enabled) {
// receiver: retransmit missing messages (getNumMissing() is fast)
SeqnoList missing;
if(win != null && win.getNumMissing() > 0 && (missing=win.getMissing(max_xmit_req_size)) != null) {
long highest=missing.getLast();
Long prev_seqno=xmit_task_map.get(target);
if(prev_seqno == null)
xmit_task_map.put(target, highest); // no retransmission
else {
missing.removeHigherThan(prev_seqno); // we only retransmit the 'previous batch'
if(highest > prev_seqno)
xmit_task_map.put(target, highest);
if(!missing.isEmpty()) {
// remove msgs that are <= highest-delivered (https://issues.redhat.com/browse/JGRP-2574)
long highest_deliverable=win.getHighestDeliverable(), first=missing.getFirst();
if(first < highest_deliverable)
missing.removeLowerThan(highest_deliverable + 1);
retransmit(missing, target);
}
}
}
else if(!xmit_task_map.isEmpty())
xmit_task_map.remove(target); // no current gaps for target
}
else if(!xmit_task_map.isEmpty())
xmit_task_map.remove(target); // no current gaps for target
}

// sender: only send the *highest sent* message if HA < HS and HA/HS didn't change from the prev run
for(SenderEntry val: send_table.values()) {
Table<Message> win=val != null? val.msgs : null;
if(win != null) {
long highest_acked=win.getHighestDelivered(); // highest delivered == highest ack (sender win)
long highest_sent=win.getHighestReceived(); // we use table as a *sender* win, so it's highest *sent*...

if(highest_acked < highest_sent && val.watermark[0] == highest_acked && val.watermark[1] == highest_sent) {
// highest acked and sent hasn't moved up - let's resend the HS
Message highest_sent_msg=win.get(highest_sent);
if(highest_sent_msg != null)
retransmit(highest_sent_msg);
if(xmits_enabled) {
// resend sent messages until ack'ed
// sender: only send the *highest sent* message if HA < HS and HA/HS didn't change from the prev run
for(SenderEntry val : send_table.values()) {
Table<Message> win=val != null? val.msgs : null;
if(win != null) {
long highest_acked=win.getHighestDelivered(); // highest delivered == highest ack (sender win)
long highest_sent=win.getHighestReceived(); // we use table as a *sender* win, so it's highest *sent*...

if(highest_acked < highest_sent && val.watermark[0] == highest_acked && val.watermark[1] == highest_sent) {
// highest acked and sent hasn't moved up - let's resend the HS
Message highest_sent_msg=win.get(highest_sent);
if(highest_sent_msg != null)
retransmit(highest_sent_msg);
}
else
val.watermark(highest_acked, highest_sent);
}
else
val.watermark(highest_acked, highest_sent);
}
}


// close idle connections
if(conn_expiry_timeout > 0)
closeIdleConnections();
Expand Down
10 changes: 7 additions & 3 deletions src/org/jgroups/protocols/pbcast/NAKACK2.java
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ public void init() throws Exception {
// https://issues.redhat.com/browse/JGRP-2675
become_server_queue=new ConcurrentLinkedQueue<>();
RejectedExecutionHandler handler=transport.getThreadPool().getRejectedExecutionHandler();
if(!(handler instanceof ThreadPoolExecutor.CallerRunsPolicy)) {
if(!isCallerRunsHandler(handler)) {
log.warn("%s: xmit_interval of %d requires a CallerRunsPolicy in the thread pool; replacing %s",
local_addr, xmit_interval, handler.getClass().getSimpleName());
transport.getThreadPool().setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
Expand Down Expand Up @@ -1060,8 +1060,6 @@ protected void cancelRebroadcasting() {
}




/**
* Sends a message msg to the requester. We have to wrap the original message into a retransmit message, as we need
* to preserve the original message's properties, such as src, headers etc.
Expand Down Expand Up @@ -1253,6 +1251,12 @@ protected static boolean isGreaterThanOrEqual(Digest first, Digest other) {
return true;
}

protected static boolean isCallerRunsHandler(RejectedExecutionHandler h) {
return h instanceof ThreadPoolExecutor.CallerRunsPolicy ||
(h instanceof ShutdownRejectedExecutionHandler
&& ((ShutdownRejectedExecutionHandler)h).handler() instanceof ThreadPoolExecutor.CallerRunsPolicy);
}


/**
* Removes old members from xmit-table and adds new members to xmit-table (at seqnos hd=0, hr=0).
Expand Down
9 changes: 4 additions & 5 deletions src/org/jgroups/util/ShutdownRejectedExecutionHandler.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.jgroups.util;

import java.util.Objects;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

Expand All @@ -14,21 +15,19 @@
* @author Vladimir Blagojevic
* @see ThreadPoolExecutor
* @see RejectedExecutionHandler
* 14:49:05 belaban Exp $
*/
public class ShutdownRejectedExecutionHandler implements RejectedExecutionHandler {

RejectedExecutionHandler handler;

public ShutdownRejectedExecutionHandler(RejectedExecutionHandler handler) {
super();
if(handler == null)
throw new NullPointerException("RejectedExecutionHandler cannot be null");
this.handler=handler;
this.handler=Objects.requireNonNull(handler);
}

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
public RejectedExecutionHandler handler() {return handler;}

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if(!executor.isShutdown()) {
handler.rejectedExecution(r, executor);
}
Expand Down

0 comments on commit 1e307bd

Please sign in to comment.