Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[JENKINS-68092] Serialization of java.util.concurrent data structure in Pipeline: Groovy #518

Merged
merged 7 commits into from
Mar 23, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,19 @@ public final class CpsThreadGroup implements Serializable {
*/
private /*almost final*/ transient CpsFlowExecution execution;

/**
* Persistent version of {@link #runtimeThreads}.
*/
private volatile Map<Integer, CpsThread> threads;

/**
* All the member threads by their {@link CpsThread#id}.
*
* All mutation occurs only on the CPS VM thread. Read access through {@link CpsStepContext#doGet}
* and iteration through {@link CpsThreadDump#from(CpsThreadGroup)} may occur on other threads
* (e.g. non-blocking steps, thread dumps from the UI).
*/
private final NavigableMap<Integer,CpsThread> threads = new ConcurrentSkipListMap<>();
private transient NavigableMap<Integer, CpsThread> runtimeThreads;

/**
* Unique thread ID generator.
Expand Down Expand Up @@ -178,6 +183,7 @@ private Object readResolve() {
execution = CpsFlowExecution.PROGRAM_STATE_SERIALIZATION.get();
setupTransients();
assert execution!=null;
runtimeThreads.putAll(threads);
if (/* compatibility: the field will be null in old programs */ scripts != null && !scripts.isEmpty()) {
GroovyShell shell = execution.getShell();
// Take the canonical bindings from the main script and relink that object with that of the shell and all other loaded scripts which kept the same bindings.
Expand All @@ -193,15 +199,21 @@ private Object readResolve() {
}

private void setupTransients() {
runtimeThreads = new ConcurrentSkipListMap<>();
runner = new CpsVmExecutorService(this);
pausedByQuietMode = new AtomicBoolean();
}

private Object writeReplace() {
threads = new HashMap<>(runtimeThreads);
return this;
}

@CpsVmThreadOnly
public CpsThread addThread(@NonNull Continuable program, FlowHead head, ContextVariableSet contextVariables) {
assertVmThread();
CpsThread t = new CpsThread(this, iota++, program, head, contextVariables);
threads.put(t.id, t);
runtimeThreads.put(t.id, t);
return t;
}

Expand All @@ -223,9 +235,9 @@ private void assertVmThread() {
* null if the thread has finished executing.
*/
public CpsThread getThread(int id) {
CpsThread thread = threads.get(id);
CpsThread thread = runtimeThreads.get(id);
if (thread == null && LOGGER.isLoggable(Level.FINE)) {
LOGGER.log(Level.FINE, "no thread " + id + " among " + threads.keySet(), new IllegalStateException());
LOGGER.log(Level.FINE, "no thread " + id + " among " + runtimeThreads.keySet(), new IllegalStateException());
}
return thread;
}
Expand All @@ -234,7 +246,7 @@ public CpsThread getThread(int id) {
* Returns an unmodifiable snapshot of all threads in the thread group.
*/
public Iterable<CpsThread> getThreads() {
return threads.values();
return runtimeThreads.values();
}

@CpsVmThreadOnly("root")
Expand Down Expand Up @@ -327,7 +339,7 @@ public void run() {
// ensures that everything submitted in front of us has finished.
runner.submit(new Runnable() {
public void run() {
if (threads.isEmpty()) {
if (runtimeThreads.isEmpty()) {
runner.shutdown();
}
// the original promise of scheduleRun() is now complete
Expand Down Expand Up @@ -403,7 +415,7 @@ private boolean run() {
boolean stillRunnable = false;

// TODO: maybe instead of running all the thread, run just one thread in round robin
for (CpsThread t : threads.values().toArray(new CpsThread[threads.size()])) {
for (CpsThread t : runtimeThreads.values().toArray(new CpsThread[runtimeThreads.size()])) {
if (t.isRunnable()) {
Outcome o = t.runNextChunk();
if (o.isFailure()) {
Expand All @@ -426,9 +438,9 @@ private boolean run() {
LOGGER.fine("completed " + t);
t.fireCompletionHandlers(o); // do this after ErrorAction is set above

threads.remove(t.id);
runtimeThreads.remove(t.id);
t.cleanUp();
if (threads.isEmpty()) {
if (runtimeThreads.isEmpty()) {
execution.onProgramEnd(o);
try {
this.execution.saveOwner();
Expand Down Expand Up @@ -620,7 +632,7 @@ private void propagateErrorToWorkflow(Throwable t) {
// as that's the ony more likely to have caused the problem.
// TODO: when we start tracking which thread is just waiting for the body, then
// that information would help. or maybe we should just remember the thread that has run the last time
Map.Entry<Integer,CpsThread> lastEntry = threads.lastEntry();
Map.Entry<Integer,CpsThread> lastEntry = runtimeThreads.lastEntry();
if (lastEntry != null) {
lastEntry.getValue().resume(new Outcome(null,t));
} else {
Expand Down