Skip to content

Commit

Permalink
improve: use fixed thread pool explicitly (#2265)
Browse files Browse the repository at this point in the history

Signed-off-by: Attila Mészáros <csviri@gmail.com>
  • Loading branch information
csviri committed Mar 9, 2024
1 parent dc0b4ee commit ab7c01f
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;

import org.slf4j.Logger;
Expand All @@ -24,8 +25,6 @@
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependent;
import io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflowFactory;

import static io.javaoperatorsdk.operator.api.config.ExecutorServiceManager.newThreadPoolExecutor;

/** An interface from which to retrieve configuration information. */
public interface ConfigurationService {

Expand Down Expand Up @@ -127,14 +126,18 @@ default boolean checkCRDAndValidateLocalModel() {
return false;
}

int DEFAULT_RECONCILIATION_THREADS_NUMBER = 200;
int DEFAULT_RECONCILIATION_THREADS_NUMBER = 50;
/**
* @deprecated Not used anymore in the default implementation
*/
@Deprecated(forRemoval = true)
int MIN_DEFAULT_RECONCILIATION_THREADS_NUMBER = 10;

/**
* The maximum number of threads the operator can spin out to dispatch reconciliation requests to
* reconcilers
* The number of threads the operator can spin out to dispatch reconciliation requests to
* reconcilers with the default executors
*
* @return the maximum number of concurrent reconciliation threads
* @return the number of concurrent reconciliation threads
*/
default int concurrentReconciliationThreads() {
return DEFAULT_RECONCILIATION_THREADS_NUMBER;
Expand All @@ -143,17 +146,24 @@ default int concurrentReconciliationThreads() {
/**
* The minimum number of threads the operator starts in the thread pool for reconciliations.
*
* @deprecated not used anymore by default executor implementation
* @return the minimum number of concurrent reconciliation threads
*/
@Deprecated(forRemoval = true)
default int minConcurrentReconciliationThreads() {
return MIN_DEFAULT_RECONCILIATION_THREADS_NUMBER;
}

int DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER = DEFAULT_RECONCILIATION_THREADS_NUMBER;
/**
* @deprecated Not used anymore in the default implementation
*/
@Deprecated(forRemoval = true)
int MIN_DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER = MIN_DEFAULT_RECONCILIATION_THREADS_NUMBER;

/**
* Retrieves the maximum number of threads the operator can spin out to be used in the workflows.
* Number of threads the operator can spin out to be used in the workflows with the default
* executor.
*
* @return the maximum number of concurrent workflow threads
*/
Expand All @@ -164,8 +174,10 @@ default int concurrentWorkflowExecutorThreads() {
/**
* The minimum number of threads the operator starts in the thread pool for workflows.
*
* @deprecated not used anymore by default executor implementation
* @return the minimum number of concurrent workflow threads
*/
@Deprecated(forRemoval = true)
default int minConcurrentWorkflowExecutorThreads() {
return MIN_DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER;
}
Expand All @@ -191,13 +203,11 @@ default Metrics getMetrics() {
}

default ExecutorService getExecutorService() {
return newThreadPoolExecutor(minConcurrentReconciliationThreads(),
concurrentReconciliationThreads());
return Executors.newFixedThreadPool(concurrentReconciliationThreads());
}

default ExecutorService getWorkflowExecutorService() {
return newThreadPoolExecutor(minConcurrentWorkflowExecutorThreads(),
concurrentWorkflowExecutorThreads());
return Executors.newFixedThreadPool(concurrentWorkflowExecutorThreads());
}

default boolean closeClientOnStop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,21 @@ public int concurrentWorkflowExecutorThreads() {
original.concurrentWorkflowExecutorThreads());
}

/**
* @deprecated Not used anymore in the default implementation
*/
@Deprecated(forRemoval = true)
@Override
public int minConcurrentReconciliationThreads() {
return minConcurrentReconciliationThreads != null ? minConcurrentReconciliationThreads
: original.minConcurrentReconciliationThreads();
}

/**
* @deprecated Not used anymore in the default implementation
*/
@Override
@Deprecated(forRemoval = true)
public int minConcurrentWorkflowExecutorThreads() {
return minConcurrentWorkflowExecutorThreads != null ? minConcurrentWorkflowExecutorThreads
: original.minConcurrentWorkflowExecutorThreads();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
Expand All @@ -35,14 +33,6 @@ public class ExecutorServiceManager {
start(configurationService);
}

public static ExecutorService newThreadPoolExecutor(int minThreads, int maxThreads) {
minThreads = Utils.ensureValid(minThreads, "minimum number of threads", MIN_THREAD_NUMBER);
maxThreads = Utils.ensureValid(maxThreads, "maximum number of threads", minThreads + 1);

return new ThreadPoolExecutor(minThreads, maxThreads, 1, TimeUnit.MINUTES,
new LinkedBlockingDeque<>());
}

/**
* Uses cachingExecutorService from this manager. Use this only for tasks, that don't have dynamic
* nature, in sense that won't grow with the number of inputs (thus kubernetes resources)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,6 @@ void executionOfReconciliationShouldNotStartIfProcessorStopped() throws Interrup
new BaseConfigurationService(),
o -> {
o.withConcurrentReconciliationThreads(1);
o.withMinConcurrentReconciliationThreads(1);
});
eventProcessor =
spy(new EventProcessor(controllerConfiguration(null, rateLimiterMock, configurationService),
Expand Down

0 comments on commit ab7c01f

Please sign in to comment.