Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

System subject #195

Open
wants to merge 81 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
3506973
Create ExecutionContext and show example with ActionPluginProxy
cwperks May 30, 2024
f8cf238
Only allow core to set the ExecutionContext
cwperks May 30, 2024
19155ca
Merge branch 'main' into plugin-aware-thread-context
cwperks May 31, 2024
b6b2a19
WIP on plugin aware thread context
cwperks May 31, 2024
8165f05
Plugin Aware API Handling
cwperks May 31, 2024
7e603e1
Merge branch 'main' into plugin-aware-thread-context
cwperks Jun 11, 2024
5b989d6
Add test to verify that ExecutionContext is being populated during Re…
cwperks Jun 11, 2024
444fde7
Merge branch 'main' into plugin-aware-thread-context
cwperks Jul 2, 2024
aaba063
Merge branch 'main' into plugin-aware-thread-context
cwperks Jul 9, 2024
eec8b96
Clear context in a finally block
cwperks Jul 10, 2024
8a98618
Create switchContext method in ThreadContext and make pluginExecution…
cwperks Jul 10, 2024
5f7a26f
Merge branch 'main' into plugin-aware-thread-context
cwperks Jul 11, 2024
6b6e955
WIP on plugin aware stash context
cwperks Jul 12, 2024
73835ed
Create class called PluginAwareNodeClient that provides a method call…
cwperks Jul 12, 2024
4d048b7
Remove ExecutionContext class
cwperks Jul 12, 2024
70c0c80
Update javadoc
cwperks Jul 12, 2024
dc5c5a8
Change createComponents to take in PluginAwareNodeClient
cwperks Jul 12, 2024
e26ac6c
Update all instances of createComponents
cwperks Jul 12, 2024
c2e0a64
Initialize clients
cwperks Jul 12, 2024
1b234b5
Merge branch 'main' into plugin-aware-thread-context
cwperks Jul 16, 2024
1d7df43
Remove casting
cwperks Jul 16, 2024
711d542
WIP on notion of ContextSwitcher
cwperks Jul 18, 2024
e2ebb36
Make stashContext package-private
cwperks Jul 23, 2024
ed71247
Make markAsSystemContext package-private
cwperks Jul 23, 2024
4b27f3a
Merge branch 'main' into plugin-aware-thread-context
cwperks Jul 23, 2024
b031db9
Add javadoc on param
cwperks Jul 23, 2024
54439a0
Remove SystemContextSwitcher
cwperks Aug 2, 2024
383fa83
Merge branch 'main' into plugin-aware-thread-context
cwperks Aug 2, 2024
dd8e7c3
Merge with main
cwperks Aug 2, 2024
2ff8f86
Cleanup
cwperks Aug 2, 2024
b006ed5
Remove SystemIndexFilter
cwperks Aug 2, 2024
f718299
Add notion of Forbidden Headers to the ThreadContext
cwperks Aug 2, 2024
526cd43
Merge branch 'main' into plugin-aware-thread-context
cwperks Aug 6, 2024
7458969
Fix tests
cwperks Aug 6, 2024
5c1d94e
Fix test
cwperks Aug 6, 2024
8fbf024
Add method to initialize plugins
cwperks Aug 6, 2024
76e8b04
Create concept of pluginNodeClient that can be used for executing tra…
cwperks Aug 7, 2024
84a325d
Add test
cwperks Aug 7, 2024
45c4a3e
Add another test for setPluginNodeClient
cwperks Aug 7, 2024
e41496a
Remove newline
cwperks Aug 7, 2024
9a89b64
Merge branch 'main' into plugin-aware-thread-context
cwperks Aug 7, 2024
bccf5eb
Add another test
cwperks Aug 8, 2024
cbeeb6a
Merge branch 'main' into plugin-aware-thread-context
cwperks Aug 8, 2024
3948acf
Merge branch 'main' into plugin-aware-thread-context
cwperks Aug 8, 2024
e685c5c
Subject.runAs and introduce PluginSubject
cwperks Aug 13, 2024
f7f245d
Do nothing when runAs is called for ShiroSubject and NoopSubject
cwperks Aug 13, 2024
6dd4153
Remove extraneous changes
cwperks Aug 13, 2024
38fe2e1
Test all methods in PluginSubject
cwperks Aug 14, 2024
7f2e545
Merge branch 'main' into plugin-aware-thread-context
cwperks Aug 14, 2024
17b4444
Pass a Callable to runAs
cwperks Aug 15, 2024
247fd59
Update import
cwperks Aug 15, 2024
8f38206
Merge branch 'main' into plugin-aware-thread-context
cwperks Aug 15, 2024
16f4251
Simplify PR, make NoopPluginSubject and introduce IdentityAwarePlugin
cwperks Aug 16, 2024
7b3b5ec
Add final
cwperks Aug 16, 2024
0a26f26
Remove server dependency
cwperks Aug 16, 2024
94a5f2c
Remove AbstractSubject
cwperks Aug 16, 2024
19dffb7
Remove unnecessary changes
cwperks Aug 16, 2024
17abb63
Add javadoc to NoopPluginSubject
cwperks Aug 16, 2024
d6be989
Merge branch 'main' into plugin-aware-thread-context
cwperks Aug 16, 2024
48bd43c
Rename to assignSubject
cwperks Aug 16, 2024
be9a89a
Add experimental label
cwperks Aug 16, 2024
ca1f297
Add getPluginSubject(plugin) to IdentityPlugin
cwperks Aug 16, 2024
84c0cc3
Make runAs generic
cwperks Aug 16, 2024
c200272
package-private constructor
cwperks Aug 16, 2024
be9b3cc
Move IdentityAwarePlugin initialization
cwperks Aug 16, 2024
9c07676
Create separate PluginSubject interface
cwperks Aug 16, 2024
901d53d
Remove authenticate method
cwperks Aug 16, 2024
55834df
Remove import
cwperks Aug 16, 2024
7374479
Separate UserSubject and PluginSubject
cwperks Aug 16, 2024
efac919
Terminate TestThreadPool
cwperks Aug 16, 2024
1536380
mock ThreadPool in RestSendToExtensionActionTests
cwperks Aug 16, 2024
e6eb37e
Fix Thread leak
cwperks Aug 17, 2024
2c9ba56
Add to CHANGELOG
cwperks Aug 19, 2024
9c5b2c4
Rename to getCurrentSubject
cwperks Aug 19, 2024
4b8c7bd
Add type check
cwperks Aug 19, 2024
0531871
WIP on system subject
cwperks Aug 19, 2024
9e30148
Merge branch 'plugin-aware-thread-context' into system-subject
cwperks Aug 19, 2024
29e3527
Introduce SystemSubject and replace usages of stashContext
cwperks Aug 20, 2024
18737dc
Merge branch 'main' into system-subject
cwperks Sep 19, 2024
4f4d758
Exception handling
cwperks Sep 19, 2024
6702707
Add tests for SystemSubject
cwperks Sep 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.util.concurrent.PrioritizedOpenSearchThreadPoolExecutor;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.concurrent.ThreadContextAccess;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.identity.SystemSubject;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;
import org.opensearch.threadpool.Scheduler;
Expand Down Expand Up @@ -396,31 +396,33 @@ private void submitStateUpdateTask(
}
final ThreadContext threadContext = threadPool.getThreadContext();
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(true);
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
ThreadContextAccess.doPrivilegedVoid(threadContext::markAsSystemContext);
final UpdateTask updateTask = new UpdateTask(
config.priority(),
source,
new SafeClusterApplyListener(listener, supplier, logger),
executor
);
if (config.timeout() != null) {
threadPoolExecutor.execute(
updateTask,
config.timeout(),
() -> threadPool.generic()
.execute(() -> listener.onFailure(source, new ProcessClusterEventTimeoutException(config.timeout(), source)))
SystemSubject.getInstance().runAs(() -> {
try {
final UpdateTask updateTask = new UpdateTask(
config.priority(),
source,
new SafeClusterApplyListener(listener, supplier, logger),
executor
);
} else {
threadPoolExecutor.execute(updateTask);
}
} catch (OpenSearchRejectedExecutionException e) {
// ignore cases where we are shutting down..., there is really nothing interesting
// to be done here...
if (!lifecycle.stoppedOrClosed()) {
throw e;
if (config.timeout() != null) {
threadPoolExecutor.execute(
updateTask,
config.timeout(),
() -> threadPool.generic()
.execute(() -> listener.onFailure(source, new ProcessClusterEventTimeoutException(config.timeout(), source)))
);
} else {
threadPoolExecutor.execute(updateTask);
}
} catch (OpenSearchRejectedExecutionException e) {
// ignore cases where we are shutting down..., there is really nothing interesting
// to be done here...
if (!lifecycle.stoppedOrClosed()) {
throw e;
}
}
}
return null;
});
}

/** asserts that the current thread is <b>NOT</b> the cluster state update thread */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.util.concurrent.PrioritizedOpenSearchThreadPoolExecutor;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.concurrent.ThreadContextAccess;
import org.opensearch.core.Assertions;
import org.opensearch.core.common.text.Text;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.discovery.Discovery;
import org.opensearch.identity.SystemSubject;
import org.opensearch.node.Node;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;
Expand Down Expand Up @@ -1022,21 +1022,23 @@ public <T> void submitStateUpdateTasks(
}
final ThreadContext threadContext = threadPool.getThreadContext();
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(true);
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
ThreadContextAccess.doPrivilegedVoid(threadContext::markAsSystemContext);

List<Batcher.UpdateTask> safeTasks = tasks.entrySet()
.stream()
.map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue(), supplier), executor))
.collect(Collectors.toList());
taskBatcher.submitTasks(safeTasks, config.timeout());
} catch (OpenSearchRejectedExecutionException e) {
// ignore cases where we are shutting down..., there is really nothing interesting
// to be done here...
if (!lifecycle.stoppedOrClosed()) {
throw e;

SystemSubject.getInstance().runAs(() -> {
try {
List<Batcher.UpdateTask> safeTasks = tasks.entrySet()
.stream()
.map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue(), supplier), executor))
.collect(Collectors.toList());
taskBatcher.submitTasks(safeTasks, config.timeout());
} catch (OpenSearchRejectedExecutionException e) {
// ignore cases where we are shutting down..., there is really nothing interesting
// to be done here...
if (!lifecycle.stoppedOrClosed()) {
throw e;
}
}
}
return null;
});
}

public ClusterStateStats getClusterStateStats() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.identity.SystemSubject;
import org.opensearch.rest.RestChannel;
import org.opensearch.rest.RestRequest;
import org.opensearch.telemetry.tracing.Span;
Expand Down Expand Up @@ -383,9 +384,9 @@ public void incomingRequest(final HttpRequest httpRequest, final HttpChannel htt

// Visible for testing
void dispatchRequest(final RestRequest restRequest, final RestChannel channel, final Throwable badRequestCause) {
RestChannel traceableRestChannel = channel;
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
SystemSubject.getInstance().runAs(() -> {
RestChannel traceableRestChannel = channel;
final Span span = tracer.startSpan(SpanBuilder.from(restRequest));
try (final SpanScope spanScope = tracer.withSpanInScope(span)) {
if (channel != null) {
Expand All @@ -397,8 +398,8 @@ void dispatchRequest(final RestRequest restRequest, final RestChannel channel, f
dispatcher.dispatchRequest(restRequest, traceableRestChannel, threadContext);
}
}
}

return null;
});
}

private void handleIncomingRequest(final HttpRequest httpRequest, final HttpChannel httpChannel, final Exception exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class IdentityService {

public IdentityService(final Settings settings, final ThreadPool threadPool, final List<IdentityPlugin> identityPlugins) {
this.settings = settings;
SystemSubject.getInstance().initialize(threadPool);

if (identityPlugins.size() == 0) {
log.debug("Identity plugins size is 0");
Expand Down
69 changes: 69 additions & 0 deletions server/src/main/java/org/opensearch/identity/SystemSubject.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.identity;

import org.opensearch.common.annotation.InternalApi;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.concurrent.ThreadContextAccess;
import org.opensearch.threadpool.ThreadPool;

import java.security.Principal;
import java.util.concurrent.Callable;

/**
* A special reserved {@link Subject} that represents the internal system subject
*
* @opensearch.internal
*/
@InternalApi
public class SystemSubject implements Subject {
private static final SystemSubject INSTANCE = new SystemSubject();

private static final SystemPrincipal SYSTEM_PRINCIPAL = new SystemPrincipal();

private ThreadPool threadPool;

private SystemSubject() {}

public static SystemSubject getInstance() {
return SystemSubject.INSTANCE;
}

void initialize(ThreadPool threadPool) {
this.threadPool = threadPool;
}

@Override
public Principal getPrincipal() {
return SYSTEM_PRINCIPAL;
}

@Override
public <T> T runAs(Callable<T> callable) throws RuntimeException {
ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ctx = threadPool.getThreadContext().stashContext()) {
ThreadContextAccess.doPrivilegedVoid(threadContext::markAsSystemContext);
try {
return callable.call();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

private static class SystemPrincipal implements Principal {

private SystemPrincipal() {}

@Override
public String getName() {
return "system";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,10 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.concurrent.ThreadContextAccess;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.identity.SystemSubject;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.index.translog.Translog;
Expand Down Expand Up @@ -97,15 +96,14 @@ public GlobalCheckpointSyncAction(
}

public void updateGlobalCheckpointForShard(final ShardId shardId) {
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
ThreadContextAccess.doPrivilegedVoid(threadContext::markAsSystemContext);
SystemSubject.getInstance().runAs(() -> {
execute(new Request(shardId), ActionListener.wrap(r -> {}, e -> {
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
logger.info(new ParameterizedMessage("{} global checkpoint sync failed", shardId), e);
}
}));
}
return null;
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,12 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.concurrent.ThreadContextAccess;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.identity.SystemSubject;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
Expand Down Expand Up @@ -120,10 +119,7 @@ protected void doExecute(Task task, Request request, ActionListener<ReplicationR
}

final void backgroundSync(ShardId shardId, String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases) {
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
// we have to execute under the system context so that if security is enabled the sync is authorized
ThreadContextAccess.doPrivilegedVoid(threadContext::markAsSystemContext);
SystemSubject.getInstance().runAs(() -> {
final Request request = new Request(shardId, retentionLeases);
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_background_sync", request);
transportService.sendChildRequest(
Expand Down Expand Up @@ -170,7 +166,8 @@ public void handleException(TransportException e) {
}
}
);
}
return null;
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,12 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.concurrent.ThreadContextAccess;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.identity.SystemSubject;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.index.shard.IndexShard;
Expand Down Expand Up @@ -135,10 +134,7 @@ final void sync(
RetentionLeases retentionLeases,
ActionListener<ReplicationResponse> listener
) {
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
// we have to execute under the system context so that if security is enabled the sync is authorized
ThreadContextAccess.doPrivilegedVoid(threadContext::markAsSystemContext);
SystemSubject.getInstance().runAs(() -> {
final Request request = new Request(shardId, retentionLeases);
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_sync", request);
transportService.sendChildRequest(
Expand Down Expand Up @@ -181,7 +177,8 @@ public void handleException(TransportException e) {
}
}
);
}
return null;
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.concurrent.ThreadContextAccess;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.identity.SystemSubject;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
Expand Down Expand Up @@ -111,10 +110,7 @@ public ReplicationMode getReplicationMode(IndexShard indexShard) {
final void publish(IndexShard indexShard, ReplicationCheckpoint checkpoint) {
String primaryAllocationId = indexShard.routingEntry().allocationId().getId();
long primaryTerm = indexShard.getPendingPrimaryTerm();
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
// we have to execute under the system context so that if security is enabled the sync is authorized
ThreadContextAccess.doPrivilegedVoid(threadContext::markAsSystemContext);
SystemSubject.getInstance().runAs(() -> {
PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint);
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "segrep_publish_checkpoint", request);
final ReplicationTimer timer = new ReplicationTimer();
Expand Down Expand Up @@ -182,7 +178,8 @@ public void handleException(TransportException e) {
checkpoint
)
);
}
return null;
});
}

@Override
Expand Down
Loading
Loading