Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pinned thread on Vault fetch secret #41313

Closed
vsevel opened this issue Jun 19, 2024 · 13 comments
Closed

Pinned thread on Vault fetch secret #41313

vsevel opened this issue Jun 19, 2024 · 13 comments
Labels
kind/bug Something isn't working

Comments

@vsevel
Copy link
Contributor

vsevel commented Jun 19, 2024

Describe the bug

I detected the following thread pinning:

Thread[#831,ForkJoinPool-1-worker-23,5,CarrierThreads]
java.base/java.lang.VirtualThread$VThreadContinuation.onPinned(VirtualThread.java:183)
java.base/jdk.internal.vm.Continuation.onPinned0(Continuation.java:393)
java.base/java.lang.VirtualThread.park(VirtualThread.java:582)
java.base/java.lang.System$2.parkVirtualThread(System.java:2643)
java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:54)
java.base/java.util.concurrent.locks.LockSupport.park(LockSupport.java:219)
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:754)
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1099)
java.base/java.util.concurrent.CountDownLatch.await(CountDownLatch.java:230)
io.smallrye.mutiny.operators.uni.UniBlockingAwait.await(UniBlockingAwait.java:67)
io.smallrye.mutiny.groups.UniAwait.atMost(UniAwait.java:65)
io.smallrye.mutiny.groups.UniAwait.indefinitely(UniAwait.java:46)
io.quarkus.vault.runtime.config.VaultConfigSource.fetchSecrets(VaultConfigSource.java:141)
io.quarkus.vault.runtime.config.VaultConfigSource.lambda$fetchSecrets$2(VaultConfigSource.java:136)
java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
io.quarkus.vault.runtime.config.VaultConfigSource.fetchSecrets(VaultConfigSource.java:136)
io.quarkus.vault.runtime.config.VaultConfigSource.lambda$fetchSecrets$1(VaultConfigSource.java:132)
java.base/java.util.HashMap.forEach(HashMap.java:1429)
io.quarkus.vault.runtime.config.VaultConfigSource.fetchSecrets(VaultConfigSource.java:132)
io.quarkus.vault.runtime.config.VaultConfigSource.getSecretConfig(VaultConfigSource.java:87)
io.quarkus.vault.runtime.config.VaultConfigSource.getValue(VaultConfigSource.java:64)
io.smallrye.config.SmallRyeConfigSources$ConfigValueConfigSourceWrapper.getConfigValue(SmallRyeConfigSources.java:73)
io.smallrye.config.SmallRyeConfigSources.getValue(SmallRyeConfigSources.java:36)
io.smallrye.config.SmallRyeConfigSourceInterceptorContext.proceed(SmallRyeConfigSourceInterceptorContext.java:25)
io.smallrye.config.SecretKeysConfigSourceInterceptor.getValue(SecretKeysConfigSourceInterceptor.java:26)
io.smallrye.config.SmallRyeConfigSourceInterceptorContext.proceed(SmallRyeConfigSourceInterceptorContext.java:25)
io.smallrye.config.RelocateConfigSourceInterceptor.getValue(RelocateConfigSourceInterceptor.java:25)
io.smallrye.config.SmallRyeConfigSourceInterceptorContext.proceed(SmallRyeConfigSourceInterceptorContext.java:25)
io.smallrye.config.RelocateConfigSourceInterceptor.getValue(RelocateConfigSourceInterceptor.java:25)
io.smallrye.config.SmallRyeConfigSourceInterceptorContext.proceed(SmallRyeConfigSourceInterceptorContext.java:25)
io.smallrye.config.RelocateConfigSourceInterceptor.getValue(RelocateConfigSourceInterceptor.java:25)
io.smallrye.config.SmallRyeConfigSourceInterceptorContext.proceed(SmallRyeConfigSourceInterceptorContext.java:25)
io.smallrye.config.ProfileConfigSourceInterceptor.getProfileValue(ProfileConfigSourceInterceptor.java:51)
io.smallrye.config.ProfileConfigSourceInterceptor.getValue(ProfileConfigSourceInterceptor.java:36)
io.smallrye.config.SmallRyeConfigSourceInterceptorContext.proceed(SmallRyeConfigSourceInterceptorContext.java:25)
io.smallrye.config.LoggingConfigSourceInterceptor.getValue(LoggingConfigSourceInterceptor.java:26)
io.smallrye.config.SmallRyeConfigSourceInterceptorContext.proceed(SmallRyeConfigSourceInterceptorContext.java:25)
io.smallrye.config.RelocateConfigSourceInterceptor.getValue(RelocateConfigSourceInterceptor.java:25)
io.smallrye.config.SmallRyeConfigSourceInterceptorContext.proceed(SmallRyeConfigSourceInterceptorContext.java:25)
io.smallrye.config.RelocateConfigSourceInterceptor.getValue(RelocateConfigSourceInterceptor.java:25)
io.smallrye.config.SmallRyeConfigSourceInterceptorContext.proceed(SmallRyeConfigSourceInterceptorContext.java:25)
io.smallrye.config.RelocateConfigSourceInterceptor.getValue(RelocateConfigSourceInterceptor.java:25)
io.quarkus.smallrye.openapi.runtime.OpenApiConfigMapping.getValue(OpenApiConfigMapping.java:32)
io.smallrye.config.SmallRyeConfigSourceInterceptorContext.proceed(SmallRyeConfigSourceInterceptorContext.java:25)
io.smallrye.config.RelocateConfigSourceInterceptor.getValue(RelocateConfigSourceInterceptor.java:25)
io.smallrye.config.SmallRyeConfigSourceInterceptorContext.proceed(SmallRyeConfigSourceInterceptorContext.java:25)
io.smallrye.config.RelocateConfigSourceInterceptor.getValue(RelocateConfigSourceInterceptor.java:25)
io.smallrye.config.SmallRyeConfigSourceInterceptorContext.proceed(SmallRyeConfigSourceInterceptorContext.java:25)
io.smallrye.config.RelocateConfigSourceInterceptor.getValue(RelocateConfigSourceInterceptor.java:25)
io.quarkus.smallrye.openapi.runtime.OpenApiConfigMapping.getValue(OpenApiConfigMapping.java:32)
io.smallrye.config.SmallRyeConfigSourceInterceptorContext.proceed(SmallRyeConfigSourceInterceptorContext.java:25)
io.smallrye.config.ExpressionConfigSourceInterceptor.getValue(ExpressionConfigSourceInterceptor.java:43)
io.smallrye.config.ExpressionConfigSourceInterceptor.getValue(ExpressionConfigSourceInterceptor.java:35)
io.smallrye.config.SmallRyeConfigSourceInterceptorContext.proceed(SmallRyeConfigSourceInterceptorContext.java:25)
io.smallrye.config.FallbackConfigSourceInterceptor.getValue(FallbackConfigSourceInterceptor.java:24)
io.quarkus.opentelemetry.runtime.config.OTelFallbackConfigSourceInterceptor.getValue(OTelFallbackConfigSourceInterceptor.java:45)
io.smallrye.config.SmallRyeConfigSourceInterceptorContext.proceed(SmallRyeConfigSourceInterceptorContext.java:25)
io.smallrye.config.SecretKeysHandlerConfigSourceInterceptor.getValue(SecretKeysHandlerConfigSourceInterceptor.java:26)
io.smallrye.config.SmallRyeConfigSourceInterceptorContext.proceed(SmallRyeConfigSourceInterceptorContext.java:25)
io.smallrye.config.FallbackConfigSourceInterceptor.getValue(FallbackConfigSourceInterceptor.java:24)
io.smallrye.config.SmallRyeConfigSourceInterceptorContext.proceed(SmallRyeConfigSourceInterceptorContext.java:25)
io.smallrye.config.FallbackConfigSourceInterceptor.getValue(FallbackConfigSourceInterceptor.java:24)
io.smallrye.config.SmallRyeConfigSourceInterceptorContext.proceed(SmallRyeConfigSourceInterceptorContext.java:25)
io.smallrye.config.FallbackConfigSourceInterceptor.getValue(FallbackConfigSourceInterceptor.java:24)
io.smallrye.config.SmallRyeConfigSourceInterceptorContext.proceed(SmallRyeConfigSourceInterceptorContext.java:25)
io.smallrye.config.FallbackConfigSourceInterceptor.getValue(FallbackConfigSourceInterceptor.java:24)
io.smallrye.config.SmallRyeConfigSourceInterceptorContext.proceed(SmallRyeConfigSourceInterceptorContext.java:25)
io.smallrye.config.FallbackConfigSourceInterceptor.getValue(FallbackConfigSourceInterceptor.java:24)
io.smallrye.config.SmallRyeConfigSourceInterceptorContext.proceed(SmallRyeConfigSourceInterceptorContext.java:25)
io.smallrye.config.FallbackConfigSourceInterceptor.getValue(FallbackConfigSourceInterceptor.java:24)
io.smallrye.config.SmallRyeConfigSourceInterceptorContext.proceed(SmallRyeConfigSourceInterceptorContext.java:25)
io.quarkus.opentelemetry.runtime.config.MpTelemetryRelocateConfigSourceInterceptor$1.getValue(MpTelemetryRelocateConfigSourceInterceptor.java:26)
io.smallrye.config.SmallRyeConfigSourceInterceptorContext.proceed(SmallRyeConfigSourceInterceptorContext.java:25)
io.smallrye.config.SmallRyeConfig.getConfigValue(SmallRyeConfig.java:382)
io.smallrye.config.SmallRyeConfig.getValue(SmallRyeConfig.java:302)
io.smallrye.config.SmallRyeConfig.getOptionalValue(SmallRyeConfig.java:399)
io.quarkus.restclient.config.RestClientConfig.getConfigValue(RestClientConfig.java:390)
io.quarkus.restclient.config.RestClientConfig.load(RestClientConfig.java:340)
io.quarkus.restclient.config.RestClientsConfig.lambda$getClientConfig$0(RestClientsConfig.java:318)
java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1708) <== monitors:1
io.quarkus.restclient.config.RestClientsConfig.getClientConfig(RestClientsConfig.java:318)
io.quarkus.rest.client.reactive.runtime.RestClientCDIDelegateBuilder.clientConfigByClassName(RestClientCDIDelegateBuilder.java:409)
io.quarkus.rest.client.reactive.runtime.RestClientCDIDelegateBuilder.configureBaseUrl(RestClientCDIDelegateBuilder.java:377)
io.quarkus.rest.client.reactive.runtime.RestClientCDIDelegateBuilder.configureBuilder(RestClientCDIDelegateBuilder.java:68)
io.quarkus.rest.client.reactive.runtime.RestClientCDIDelegateBuilder.build(RestClientCDIDelegateBuilder.java:63)
io.quarkus.rest.client.reactive.runtime.RestClientCDIDelegateBuilder.createDelegate(RestClientCDIDelegateBuilder.java:46)
io.quarkus.rest.client.reactive.runtime.RestClientReactiveCDIWrapperBase.delegate(RestClientReactiveCDIWrapperBase.java:76)
io.quarkus.rest.client.reactive.runtime.RestClientReactiveCDIWrapperBase.<init>(RestClientReactiveCDIWrapperBase.java:30)
... // business code
io.quarkus.arc.impl.AbstractSharedContext.createInstanceHandle(AbstractSharedContext.java:119)
io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:38)
io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:35)
io.quarkus.arc.generator.Default_jakarta_enterprise_context_ApplicationScoped_ContextInstances.c54(Unknown Source)
io.quarkus.arc.generator.Default_jakarta_enterprise_context_ApplicationScoped_ContextInstances.computeIfAbsent(Unknown Source)
io.quarkus.arc.impl.AbstractSharedContext.get(AbstractSharedContext.java:35)
io.quarkus.arc.impl.ClientProxies.getApplicationScopedDelegate(ClientProxies.java:21)
... // business code
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:1024)
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
... // business code
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
... // business code
io.smallrye.reactive.messaging.providers.AbstractMediator.lambda$invokeBlocking$15(AbstractMediator.java:190)
io.smallrye.context.impl.wrappers.SlowContextualSupplier.get(SlowContextualSupplier.java:21)
io.smallrye.mutiny.operators.uni.builders.UniCreateFromDeferredSupplier.subscribe(UniCreateFromDeferredSupplier.java:25)
io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
io.smallrye.mutiny.operators.uni.UniRunSubscribeOn.lambda$subscribe$0(UniRunSubscribeOn.java:27)
io.quarkus.virtual.threads.ContextPreservingExecutorService$ContextPreservingRunnable.run(ContextPreservingExecutorService.java:45)
java.base/java.util.concurrent.ThreadPerTaskExecutor$TaskRunner.run(ThreadPerTaskExecutor.java:314)
java.base/java.lang.VirtualThread.run(VirtualThread.java:309)

we have not identified any native blocking calls in our business code.
what could cause this pinning?
I tried creating a reproducer, with no luck.
may be that is an issue with the vault extension, but I do not see any native call. or in the business code?
how can we investigate this issue?

Expected behavior

No response

Actual behavior

No response

How to Reproduce?

No response

Output of uname -a or ver

No response

Output of java -version

No response

Quarkus version or git rev

3.9.5

Build tool (ie. output of mvnw --version or gradlew --version)

No response

Additional information

No response

@mariofusco
Copy link
Contributor

This issue is caused by the synchronized block in the internal implementation of the computeIfAbsent method of the ConcurrentHashMap that is invoked here and here.

I honestly don't know if it makes sense at all to try to fix this issue, but if so I don't think we can get rid of that ConcurrentHashMap or at least we would need to replace it with an equivalent data structure not using synchronization. Any advice on this?

@vsevel
Copy link
Contributor Author

vsevel commented Jul 12, 2024

thanks for the investigation @mariofusco
I did not realize ConcurrentHashMap was actually relying on synchronized. for some reasons I thought it was using lock free data structures.
that said, this is an overall issue with all config sources that request data from the outside (such as vault), when triggered from a virtual thread.
one way to solve this is to replace the computeIfAbsent by something like this:

  • check if key is absent
  • if absent, load values
  • call compute if absent with values (instead of lambda fetching the data)
    worst case you fetch data twice from vault (or other config sources).
    if we want to load it only once, could we use a reentrant lock in RestClientsConfig, and the test+load+set within that lock, still avoiding the long call as a lambda passed to computeIfAbsent
    ideally I would go ahead and improve the situation. we started evaluating virtual threads on numerous java 21 applications, and we get an alarm everytime there is some thread pinning on an app deployed in integration. then we investigate. that way we hope to chase all of the issues and build a high confidence.
    if we can close those issues as they come, it will be easier to get focused on new occurrences. if we have to filter out false positives, it is harder to keep focused.
    and it is actually a real case of thread pinning by the book: synchronized + long running IO operation.

@vsevel
Copy link
Contributor Author

vsevel commented Jul 12, 2024

WDYT @radcortez ?

@vsevel
Copy link
Contributor Author

vsevel commented Jul 12, 2024

(the much better solution would for ConcurrentHashMap to stop using synchronized)

@mariofusco
Copy link
Contributor

one way to solve this is to replace the computeIfAbsent by something like this:

Consider that computeIfAbsent is only the tip of the iceberg, maybe the method where the problem is more evident, especially when the generation of the value for the missing key is time consuming and/or resource intensive. However, if you look at the actual implementation of the ConcurrentHashMap, you will see that synchronized blocks are used in many other places including the more innocent looking put.

@vsevel
Copy link
Contributor Author

vsevel commented Jul 12, 2024

it is not an issue to have a synchronized in a virtual thread, as long as what you are doing in there is fast. thread pinning happens after 20 ms. the issue is when you have a synchronized and a potentially long running operation within, which is the case for externally loaded config sources.
so calling a put from a virtual thread is ok, but calling coputeIfAbsent with a lambda calling an external service is an anti-pattern.

@radcortez
Copy link
Member

We should reconsider how the REST Client builds / retrieves the configuration.

Usually, a config object is populated at the start of the application. This allows us to validate the configuration and ensure we have everything we need. In this case, the boundary seems blurred because CDI creates injected REST Clients before the application starts, but this creates an invisible dependency between Config and CDI, which shouldn't exist.

If we get rid of that, there is no need to keep a ConcurrentHashMap and the Vault source shouldn't be queried during runtime.

@mariofusco
Copy link
Contributor

@radcortez I'd like to help fixing this issue but

this creates an invisible dependency between Config and CDI, which shouldn't exist.

it is not cleat at all to me how this is happening. Are you aware of any integration test reproducing this behavior or could you give me a few hints on how to develop a reproducer on my own?

@radcortez
Copy link
Member

radcortez commented Jul 15, 2024

Yes, check here:

These are the CDI parts that shouldn't be there:

Here the configuration is being created manually:

  • public static RestClientConfig load(String configKey) {
    final RestClientConfig instance = new RestClientConfig();
    instance.url = getConfigValue(configKey, "url", String.class);
    instance.uri = getConfigValue(configKey, "uri", String.class);
    instance.providers = getConfigValue(configKey, "providers", String.class);
    instance.connectTimeout = getConfigValue(configKey, "connect-timeout", Long.class);
    instance.readTimeout = getConfigValue(configKey, "read-timeout", Long.class);
    instance.followRedirects = getConfigValue(configKey, "follow-redirects", Boolean.class);
    instance.multipartPostEncoderMode = getConfigValue(configKey, "multipart-post-encoder-mode", String.class);
    instance.proxyAddress = getConfigValue(configKey, "proxy-address", String.class);
    instance.proxyUser = getConfigValue(configKey, "proxy-user", String.class);
    instance.proxyPassword = getConfigValue(configKey, "proxy-password", String.class);
    instance.nonProxyHosts = getConfigValue(configKey, "non-proxy-hosts", String.class);
    instance.queryParamStyle = getConfigValue(configKey, "query-param-style", QueryParamStyle.class);
    instance.verifyHost = getConfigValue(configKey, "verify-host", Boolean.class);
    instance.trustStore = getConfigValue(configKey, "trust-store", String.class);
    instance.trustStorePassword = getConfigValue(configKey, "trust-store-password", String.class);
    instance.trustStoreType = getConfigValue(configKey, "trust-store-type", String.class);
    instance.keyStore = getConfigValue(configKey, "key-store", String.class);
    instance.keyStorePassword = getConfigValue(configKey, "key-store-password", String.class);
    instance.keyStoreType = getConfigValue(configKey, "key-store-type", String.class);
    instance.hostnameVerifier = getConfigValue(configKey, "hostname-verifier", String.class);
    instance.tlsConfigurationName = getConfigValue(configKey, "tls-configuration-name", String.class);
    instance.connectionTTL = getConfigValue(configKey, "connection-ttl", Integer.class);
    instance.connectionPoolSize = getConfigValue(configKey, "connection-pool-size", Integer.class);
    instance.keepAliveEnabled = getConfigValue(configKey, "keep-alive-enabled", Boolean.class);
    instance.maxRedirects = getConfigValue(configKey, "max-redirects", Integer.class);
    instance.headers = getConfigValues(configKey, "headers", String.class, String.class);
    instance.shared = getConfigValue(configKey, "shared", Boolean.class);
    instance.name = getConfigValue(configKey, "name", String.class);
    instance.userAgent = getConfigValue(configKey, "user-agent", String.class);
    instance.http2 = getConfigValue(configKey, "http2", Boolean.class);
    instance.maxChunkSize = getConfigValue(configKey, "max-chunk-size", MemorySize.class);
    instance.alpn = getConfigValue(configKey, "alpn", Boolean.class);
    instance.captureStacktrace = getConfigValue(configKey, "capture-stacktrace", Boolean.class);
    instance.multipart = new RestClientMultipartConfig();
    instance.multipart.maxChunkSize = getConfigValue(configKey, "multipart.max-chunk-size", Integer.class);
    return instance;
    }

What we should do is remove any manual configuration creation and let the config system handle it. If we need to access the configuration values, we should use ConfigProvider.getConfig().unwrap(SmallRyeConfig.class).getConfigMapping(Class). The configuration class for the rest of the clients has to be migrated to use @ConfigMapping.

@franz1981
Copy link
Contributor

The pinned event is troublesome not because of the sycnhronized but the inner wait on the count down latch: I suggest to fix in 3 lines the pinning problems and address by taking the right time to do it right, the problem shown by @radcortez

@radcortez
Copy link
Member

Yes, it will take a while to redo the REST Client configuration.

@mariofusco
Copy link
Contributor

I believe this issue can be closed since it has been structurally fixed with this commit 4f31e0d

@vsevel
Copy link
Contributor Author

vsevel commented Aug 2, 2024

I agree. we will reopen if we see new occurences after we upgrade. thanks for the effort @radcortez and @mariofusco

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants