diff --git a/src/org/jgroups/protocols/TP.java b/src/org/jgroups/protocols/TP.java index 400b92abfea..07aebef40ba 100644 --- a/src/org/jgroups/protocols/TP.java +++ b/src/org/jgroups/protocols/TP.java @@ -25,7 +25,7 @@ import java.net.SocketTimeoutException; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; @@ -225,8 +225,12 @@ public abstract class TP extends Protocol implements DiagnosticsHandler.ProbeHan protected MessageFactory msg_factory=new DefaultMessageFactory(); - // When true, the threads will be dumped at FATAL level when the thread pool is full - protected final AtomicBoolean thread_dump=new AtomicBoolean(true); + @Property(description="The number of times a thread pool needs to be full before a thread dump is logged") + protected int thread_dumps_threshold=1; + + // Incremented when a message is rejected due to a full thread pool. When this value exceeds thread_dumps_threshold, + // the threads will be dumped at FATAL level, and thread_dumps will be reset to 0 + protected final AtomicInteger thread_dumps=new AtomicInteger(); /** @@ -352,6 +356,9 @@ public T setMaxBundleSize(int size) { public String getBundlerWaitStrategy() {return bundler_wait_strategy;} public T setBundlerWaitStrategy(String b) {this.bundler_wait_strategy=b; return (T)this;} + public int getThreadDumpsThreshold() {return thread_dumps_threshold;} + public T setThreadDumpsThreshold(int t) {this.thread_dumps_threshold=t; return (T)this;} + @ManagedAttribute public String getMessageFactoryClass() { @@ -445,9 +452,11 @@ public T setLevel(String level) { return retval; } - @ManagedOperation(description="Resets the thread dump boolean; next time the thread is exhausted, " + - "the threads will be dumped at fatal level") - public void resetThreadDump() {thread_dump.compareAndSet(false, true);} + @ManagedAttribute(description="Number of thread dumps") + public int getNumberOfThreadDumps() {return thread_dumps.get();} + + @ManagedOperation(description="Resets the thread_dumps counter") + public void resetThreadDumps() {thread_dumps.set(0);} @ManagedOperation(description="Changes the message processing policy. The fully qualified name of a class " + "implementing MessageProcessingPolicy needs to be given") @@ -1606,7 +1615,7 @@ public boolean submitToThreadPool(Executor pool, Runnable task, boolean internal if(!internal) { msg_stats.incrNumRejectedMsgs(1); // https://issues.redhat.com/browse/JGRP-2403 - if(thread_dump.compareAndSet(true, false)) { + if(thread_dumps.incrementAndGet() == thread_dumps_threshold) { log.fatal("%s: thread pool is full (max=%d, active=%d); " + "thread dump (dumped once, until thread_dump is reset):\n%s", local_addr, getThreadPoolMaxThreads(), getThreadPoolSize(), Util.dumpThreads());