Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve: use fixed thread pool explicitly #2265

Merged
merged 3 commits into from
Mar 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;

import org.slf4j.Logger;
Expand All @@ -24,8 +25,6 @@
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependent;
import io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflowFactory;

import static io.javaoperatorsdk.operator.api.config.ExecutorServiceManager.newThreadPoolExecutor;

/** An interface from which to retrieve configuration information. */
public interface ConfigurationService {

Expand Down Expand Up @@ -127,14 +126,18 @@ default boolean checkCRDAndValidateLocalModel() {
return false;
}

int DEFAULT_RECONCILIATION_THREADS_NUMBER = 200;
int DEFAULT_RECONCILIATION_THREADS_NUMBER = 50;
/**
* @deprecated Not used anymore in the default implementation
*/
@Deprecated(forRemoval = true)
int MIN_DEFAULT_RECONCILIATION_THREADS_NUMBER = 10;

/**
* The maximum number of threads the operator can spin out to dispatch reconciliation requests to
* reconcilers
* The number of threads the operator can spin out to dispatch reconciliation requests to
* reconcilers with the default executors
*
* @return the maximum number of concurrent reconciliation threads
* @return the number of concurrent reconciliation threads
*/
default int concurrentReconciliationThreads() {
return DEFAULT_RECONCILIATION_THREADS_NUMBER;
Expand All @@ -143,17 +146,24 @@ default int concurrentReconciliationThreads() {
/**
* The minimum number of threads the operator starts in the thread pool for reconciliations.
*
* @deprecated not used anymore by default executor implementation
* @return the minimum number of concurrent reconciliation threads
*/
@Deprecated(forRemoval = true)
default int minConcurrentReconciliationThreads() {
return MIN_DEFAULT_RECONCILIATION_THREADS_NUMBER;
}

int DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER = DEFAULT_RECONCILIATION_THREADS_NUMBER;
/**
* @deprecated Not used anymore in the default implementation
*/
@Deprecated(forRemoval = true)
int MIN_DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER = MIN_DEFAULT_RECONCILIATION_THREADS_NUMBER;

/**
* Retrieves the maximum number of threads the operator can spin out to be used in the workflows.
* Number of threads the operator can spin out to be used in the workflows with the default
* executor.
*
* @return the maximum number of concurrent workflow threads
*/
Expand All @@ -164,8 +174,10 @@ default int concurrentWorkflowExecutorThreads() {
/**
* The minimum number of threads the operator starts in the thread pool for workflows.
*
* @deprecated not used anymore by default executor implementation
* @return the minimum number of concurrent workflow threads
*/
@Deprecated(forRemoval = true)
default int minConcurrentWorkflowExecutorThreads() {
return MIN_DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER;
}
Expand All @@ -191,13 +203,11 @@ default Metrics getMetrics() {
}

default ExecutorService getExecutorService() {
return newThreadPoolExecutor(minConcurrentReconciliationThreads(),
concurrentReconciliationThreads());
return Executors.newFixedThreadPool(concurrentReconciliationThreads());
}

default ExecutorService getWorkflowExecutorService() {
return newThreadPoolExecutor(minConcurrentWorkflowExecutorThreads(),
concurrentWorkflowExecutorThreads());
return Executors.newFixedThreadPool(concurrentWorkflowExecutorThreads());
}

default boolean closeClientOnStop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,21 @@ public int concurrentWorkflowExecutorThreads() {
original.concurrentWorkflowExecutorThreads());
}

/**
* @deprecated Not used anymore in the default implementation
*/
@Deprecated(forRemoval = true)
@Override
public int minConcurrentReconciliationThreads() {
return minConcurrentReconciliationThreads != null ? minConcurrentReconciliationThreads
: original.minConcurrentReconciliationThreads();
}

/**
* @deprecated Not used anymore in the default implementation
*/
@Override
@Deprecated(forRemoval = true)
public int minConcurrentWorkflowExecutorThreads() {
return minConcurrentWorkflowExecutorThreads != null ? minConcurrentWorkflowExecutorThreads
: original.minConcurrentWorkflowExecutorThreads();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
Expand All @@ -35,14 +33,6 @@ public class ExecutorServiceManager {
start(configurationService);
}

public static ExecutorService newThreadPoolExecutor(int minThreads, int maxThreads) {
minThreads = Utils.ensureValid(minThreads, "minimum number of threads", MIN_THREAD_NUMBER);
maxThreads = Utils.ensureValid(maxThreads, "maximum number of threads", minThreads + 1);

return new ThreadPoolExecutor(minThreads, maxThreads, 1, TimeUnit.MINUTES,
new LinkedBlockingDeque<>());
}

/**
* Uses cachingExecutorService from this manager. Use this only for tasks, that don't have dynamic
* nature, in sense that won't grow with the number of inputs (thus kubernetes resources)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,6 @@ void executionOfReconciliationShouldNotStartIfProcessorStopped() throws Interrup
new BaseConfigurationService(),
o -> {
o.withConcurrentReconciliationThreads(1);
o.withMinConcurrentReconciliationThreads(1);
});
eventProcessor =
spy(new EventProcessor(controllerConfiguration(null, rateLimiterMock, configurationService),
Expand Down
Loading