Skip to content

Commit

Permalink
Added a threshold (thread_dumps_threshold) to TP. The thread will onl…
Browse files Browse the repository at this point in the history
…y be dumped when this threshold has been reached (https://issues.redhat.com/browse/JGRP-2403)
  • Loading branch information
belaban committed Nov 10, 2020
1 parent a5bd5fc commit b1794f5
Showing 1 changed file with 16 additions and 7 deletions.
23 changes: 16 additions & 7 deletions src/org/jgroups/protocols/TP.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.net.SocketTimeoutException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;


Expand Down Expand Up @@ -225,8 +225,12 @@ public abstract class TP extends Protocol implements DiagnosticsHandler.ProbeHan

protected MessageFactory msg_factory=new DefaultMessageFactory();

// When true, the threads will be dumped at FATAL level when the thread pool is full
protected final AtomicBoolean thread_dump=new AtomicBoolean(true);
@Property(description="The number of times a thread pool needs to be full before a thread dump is logged")
protected int thread_dumps_threshold=1;

// Incremented when a message is rejected due to a full thread pool. When this value exceeds thread_dumps_threshold,
// the threads will be dumped at FATAL level, and thread_dumps will be reset to 0
protected final AtomicInteger thread_dumps=new AtomicInteger();


/**
Expand Down Expand Up @@ -352,6 +356,9 @@ public <T extends TP> T setMaxBundleSize(int size) {
public String getBundlerWaitStrategy() {return bundler_wait_strategy;}
public <T extends TP> T setBundlerWaitStrategy(String b) {this.bundler_wait_strategy=b; return (T)this;}

public int getThreadDumpsThreshold() {return thread_dumps_threshold;}
public <T extends TP> T setThreadDumpsThreshold(int t) {this.thread_dumps_threshold=t; return (T)this;}


@ManagedAttribute
public String getMessageFactoryClass() {
Expand Down Expand Up @@ -445,9 +452,11 @@ public <T extends Protocol> T setLevel(String level) {
return retval;
}

@ManagedOperation(description="Resets the thread dump boolean; next time the thread is exhausted, " +
"the threads will be dumped at fatal level")
public void resetThreadDump() {thread_dump.compareAndSet(false, true);}
@ManagedAttribute(description="Number of thread dumps")
public int getNumberOfThreadDumps() {return thread_dumps.get();}

@ManagedOperation(description="Resets the thread_dumps counter")
public void resetThreadDumps() {thread_dumps.set(0);}

@ManagedOperation(description="Changes the message processing policy. The fully qualified name of a class " +
"implementing MessageProcessingPolicy needs to be given")
Expand Down Expand Up @@ -1606,7 +1615,7 @@ public boolean submitToThreadPool(Executor pool, Runnable task, boolean internal
if(!internal) {
msg_stats.incrNumRejectedMsgs(1);
// https://issues.redhat.com/browse/JGRP-2403
if(thread_dump.compareAndSet(true, false)) {
if(thread_dumps.incrementAndGet() == thread_dumps_threshold) {
log.fatal("%s: thread pool is full (max=%d, active=%d); " +
"thread dump (dumped once, until thread_dump is reset):\n%s",
local_addr, getThreadPoolMaxThreads(), getThreadPoolSize(), Util.dumpThreads());
Expand Down

0 comments on commit b1794f5

Please sign in to comment.