diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index 53bfc75df9..92859421eb 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -4,6 +4,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.function.Consumer; import org.slf4j.Logger; @@ -24,8 +25,6 @@ import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependent; import io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflowFactory; -import static io.javaoperatorsdk.operator.api.config.ExecutorServiceManager.newThreadPoolExecutor; - /** An interface from which to retrieve configuration information. */ public interface ConfigurationService { @@ -127,14 +126,18 @@ default boolean checkCRDAndValidateLocalModel() { return false; } - int DEFAULT_RECONCILIATION_THREADS_NUMBER = 200; + int DEFAULT_RECONCILIATION_THREADS_NUMBER = 50; + /** + * @deprecated Not used anymore in the default implementation + */ + @Deprecated(forRemoval = true) int MIN_DEFAULT_RECONCILIATION_THREADS_NUMBER = 10; /** - * The maximum number of threads the operator can spin out to dispatch reconciliation requests to - * reconcilers + * The number of threads the operator can spin out to dispatch reconciliation requests to + * reconcilers with the default executors * - * @return the maximum number of concurrent reconciliation threads + * @return the number of concurrent reconciliation threads */ default int concurrentReconciliationThreads() { return DEFAULT_RECONCILIATION_THREADS_NUMBER; @@ -143,17 +146,24 @@ default int concurrentReconciliationThreads() { /** * The minimum number of threads the operator starts in the thread pool for reconciliations. * + * @deprecated not used anymore by default executor implementation * @return the minimum number of concurrent reconciliation threads */ + @Deprecated(forRemoval = true) default int minConcurrentReconciliationThreads() { return MIN_DEFAULT_RECONCILIATION_THREADS_NUMBER; } int DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER = DEFAULT_RECONCILIATION_THREADS_NUMBER; + /** + * @deprecated Not used anymore in the default implementation + */ + @Deprecated(forRemoval = true) int MIN_DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER = MIN_DEFAULT_RECONCILIATION_THREADS_NUMBER; /** - * Retrieves the maximum number of threads the operator can spin out to be used in the workflows. + * Number of threads the operator can spin out to be used in the workflows with the default + * executor. * * @return the maximum number of concurrent workflow threads */ @@ -164,8 +174,10 @@ default int concurrentWorkflowExecutorThreads() { /** * The minimum number of threads the operator starts in the thread pool for workflows. * + * @deprecated not used anymore by default executor implementation * @return the minimum number of concurrent workflow threads */ + @Deprecated(forRemoval = true) default int minConcurrentWorkflowExecutorThreads() { return MIN_DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER; } @@ -191,13 +203,11 @@ default Metrics getMetrics() { } default ExecutorService getExecutorService() { - return newThreadPoolExecutor(minConcurrentReconciliationThreads(), - concurrentReconciliationThreads()); + return Executors.newFixedThreadPool(concurrentReconciliationThreads()); } default ExecutorService getWorkflowExecutorService() { - return newThreadPoolExecutor(minConcurrentWorkflowExecutorThreads(), - concurrentWorkflowExecutorThreads()); + return Executors.newFixedThreadPool(concurrentWorkflowExecutorThreads()); } default boolean closeClientOnStop() { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java index 5879383464..12a8a5c699 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java @@ -218,13 +218,21 @@ public int concurrentWorkflowExecutorThreads() { original.concurrentWorkflowExecutorThreads()); } + /** + * @deprecated Not used anymore in the default implementation + */ + @Deprecated(forRemoval = true) @Override public int minConcurrentReconciliationThreads() { return minConcurrentReconciliationThreads != null ? minConcurrentReconciliationThreads : original.minConcurrentReconciliationThreads(); } + /** + * @deprecated Not used anymore in the default implementation + */ @Override + @Deprecated(forRemoval = true) public int minConcurrentWorkflowExecutorThreads() { return minConcurrentWorkflowExecutorThreads != null ? minConcurrentWorkflowExecutorThreads : original.minConcurrentWorkflowExecutorThreads(); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index 3ea05e7e0d..112ab7188a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -8,8 +8,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Function; @@ -35,14 +33,6 @@ public class ExecutorServiceManager { start(configurationService); } - public static ExecutorService newThreadPoolExecutor(int minThreads, int maxThreads) { - minThreads = Utils.ensureValid(minThreads, "minimum number of threads", MIN_THREAD_NUMBER); - maxThreads = Utils.ensureValid(maxThreads, "maximum number of threads", minThreads + 1); - - return new ThreadPoolExecutor(minThreads, maxThreads, 1, TimeUnit.MINUTES, - new LinkedBlockingDeque<>()); - } - /** * Uses cachingExecutorService from this manager. Use this only for tasks, that don't have dynamic * nature, in sense that won't grow with the number of inputs (thus kubernetes resources) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java index 9d538713c1..93e58a55c6 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java @@ -434,7 +434,6 @@ void executionOfReconciliationShouldNotStartIfProcessorStopped() throws Interrup new BaseConfigurationService(), o -> { o.withConcurrentReconciliationThreads(1); - o.withMinConcurrentReconciliationThreads(1); }); eventProcessor = spy(new EventProcessor(controllerConfiguration(null, rateLimiterMock, configurationService),