Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add requestProgress function to Watch and Watcher interfaces #957

Merged
merged 1 commit into from
Jul 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion docs/Watch.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ The Watch provide methods to watch on a key interval and cancel a watcher. If th

4. Cancel watch request, the etcd client should process watch cancellation and filter all the notification after cancellation request.

5. The watch client should be able to make a progress notify request that propagates the latest revision number to all watches

# Implementation

The etcd client process watch request with [watch function](#watch-function), process notification with [processEvents function](#processevents-function) , process resume with [resume function](#resume-function) and process cancel with [cancelWatch function](#cancelwatch-function).
The etcd client process watch request with [watch function](#watch-function), process notification with [processEvents function](#processevents-function) , process resume with [resume function](#resume-function), process cancel with [cancelWatch function](#cancelwatch-function) and request progress with [requestProgress function](#requestProgress-function).

## watch function

Expand Down Expand Up @@ -44,6 +46,13 @@ Cancel the watch task with the watcher, the `onCanceled` will be called after su
1. The watcher will be removed from [watchers](#watchers-instance) map.
2. If the [watchers](#watchers-instance) map contain the watcher, it will be moved to [cancelWatchers](#cancelwatchers) and send cancel request to [requestStream](#requeststream-instance).

## requestProgress function

Send the latest revision processed to all active [watchers](#watchers-instance)

1. Send a progress request to [requestStream](#requeststream-instance).
2. Working watchers will receive a WatchResponse containing the latest revision number. All future revision numbers are guaranteed to be greater than or equal to the received revision number.

## requestStream instance

StreamObserver instance
Expand Down
10 changes: 10 additions & 0 deletions jetcd-core/src/main/java/io/etcd/jetcd/Watch.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ default Watcher watch(ByteSequence key, WatchOption option, Consumer<WatchRespon
return watch(key, option, listener(onNext, onError, onCompleted));
}

/**
* Requests the latest revision processed for all watcher instances
*/
void requestProgress();

static Listener listener(Consumer<WatchResponse> onNext) {
return listener(onNext, t -> {
}, () -> {
Expand Down Expand Up @@ -205,5 +210,10 @@ interface Watcher extends Closeable {
*/
@Override
void close();

/**
* Requests the latest revision processed and propagates it to listeners
*/
void requestProgress();
}
}
21 changes: 21 additions & 0 deletions jetcd-core/src/main/java/io/etcd/jetcd/WatchImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.etcd.jetcd.api.WatchCancelRequest;
import io.etcd.jetcd.api.WatchCreateRequest;
import io.etcd.jetcd.api.WatchGrpc;
import io.etcd.jetcd.api.WatchProgressRequest;
import io.etcd.jetcd.api.WatchRequest;
import io.etcd.jetcd.api.WatchResponse;
import io.etcd.jetcd.common.exception.ErrorCode;
Expand Down Expand Up @@ -98,6 +99,15 @@ public void close() {
}
}

@Override
public void requestProgress() {
if (!closed.get()) {
synchronized (this.lock) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively we can send a single RequestProgress update at this point (we'd need to add a FutureStub property if we go this route) and we can propagate the response to all watchers.

This'll save the etcd server from getting hit with multiple requests if there are multiple watchers

watchers.forEach(Watcher::requestProgress);
}
}
}

final class WatcherImpl implements Watcher, StreamObserver<WatchResponse> {
private final ByteSequence key;
private final WatchOption option;
Expand Down Expand Up @@ -192,6 +202,14 @@ public void close() {
}
}

@Override
public void requestProgress() {
if (!closed.get() && stream != null) {
WatchProgressRequest watchProgressRequest = WatchProgressRequest.newBuilder().build();
stream.onNext(WatchRequest.newBuilder().setProgressRequest(watchProgressRequest).build());
}
}

// ************************
//
// StreamObserver
Expand Down Expand Up @@ -245,6 +263,9 @@ public void onNext(WatchResponse response) {
}

handleError(toEtcdException(error), false);
} else if (io.etcd.jetcd.watch.WatchResponse.isProgressNotify(response)) {
listener.onNext(new io.etcd.jetcd.watch.WatchResponse(response));
revision = Math.max(revision, response.getHeader().getRevision());
} else if (response.getEventsCount() == 0 && option.isProgressNotify()) {

//
Expand Down
12 changes: 12 additions & 0 deletions jetcd-core/src/main/java/io/etcd/jetcd/watch/WatchResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,16 @@ public synchronized List<WatchEvent> getEvents() {

return events;
}

public boolean isProgressNotify() {
return isProgressNotify(getResponse());
}

/**
* returns true if the WatchResponse is progress notification.
*/
public static boolean isProgressNotify(io.etcd.jetcd.api.WatchResponse response) {
return response.getEventsCount() == 0 && !response.getCreated() && !response.getCanceled()
&& response.getCompactRevision() == 0 && response.getHeader().getRevision() != 0;
}
}
2 changes: 1 addition & 1 deletion jetcd-core/src/main/proto/kv.proto
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2016-2020 The jetcd authors
// Copyright 2016-2021 The jetcd authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
18 changes: 17 additions & 1 deletion jetcd-core/src/main/proto/rpc.proto
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2016-2020 The jetcd authors
// Copyright 2016-2021 The jetcd authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,6 +53,13 @@ service KV {
}

service Watch {
// Progress requests that a watch stream progress status
// be sent in the watch response stream as soon as possible.
// For watch progress responses, the header.revision indicates progress. All future events
// received in this stream are guaranteed to have a higher revision number than the
// header.revision number.
rpc Progress(WatchProgressRequest) returns (WatchResponse) {}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
rpc Progress(WatchProgressRequest) returns (WatchResponse) {}
rpc ProgressRequest(WatchProgressRequest) returns (WatchResponse) {}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if improvement

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave this for the maintainers to decide. I was following what looked like the proto pattern for RPCs in the file

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd go with Progress so it uses the same pattern


// Watch watches for events happening or that have happened. Both input and output
// are streams; the input stream is for creating and canceling watchers and the output
// stream sends events. One watch RPC can watch on multiple key ranges, streaming events
Expand Down Expand Up @@ -175,6 +182,9 @@ message ResponseHeader {
// member_id is the ID of the member which sent the response.
uint64 member_id = 2;
// revision is the key-value store revision when the request was applied.
// For watch progress responses, the header.revision indicates progress. All future events
// recieved in this stream are guaranteed to have a higher revision number than the
// header.revision number.
int64 revision = 3;
// raft_term is the raft term when the request was applied.
uint64 raft_term = 4;
Expand Down Expand Up @@ -458,6 +468,7 @@ message WatchRequest {
oneof request_union {
WatchCreateRequest create_request = 1;
WatchCancelRequest cancel_request = 2;
WatchProgressRequest progress_request = 3;
}
}

Expand Down Expand Up @@ -497,6 +508,11 @@ message WatchCancelRequest {
int64 watch_id = 1;
}

// Requests a watch stream progress status be sent in the
// watch response stream as soon as possible.
message WatchProgressRequest {
}

message WatchResponse {
ResponseHeader header = 1;
// watch_id is the ID of the watcher that corresponds to the response.
Expand Down
53 changes: 53 additions & 0 deletions jetcd-core/src/test/java/io/etcd/jetcd/WatchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,59 @@ public void testWatchClose(final Client client) throws Exception {
assertThat(events.get(0).getEvents().get(0).getKeyValue().getValue()).isEqualTo(value);
}

@ParameterizedTest
@MethodSource("parameters")
public void testProgressRequest(final Client client) throws Exception {
final ByteSequence key = randomByteSequence();
final ByteSequence value = randomByteSequence();
final Watch watchClient = client.getWatchClient();
final AtomicReference<WatchResponse> emptyWatcherEventRef = new AtomicReference<>();
final AtomicReference<WatchResponse> activeWatcherEventRef = new AtomicReference<>();

try (Watcher activeWatcher = watchClient.watch(key, activeWatcherEventRef::set);
Watcher emptyWatcher = watchClient.watch(key.concat(randomByteSequence()), emptyWatcherEventRef::set)) {
// Check that a requestProgress returns identical revisions initially
watchClient.requestProgress();
await().atMost(TIME_OUT_SECONDS, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(activeWatcherEventRef.get()).isNotNull();
assertThat(emptyWatcherEventRef.get()).isNotNull();
});
WatchResponse activeEvent = activeWatcherEventRef.get();
WatchResponse emptyEvent = emptyWatcherEventRef.get();
assertThat(activeEvent).satisfies(WatchResponse::isProgressNotify);
assertThat(emptyEvent).satisfies(WatchResponse::isProgressNotify);
assertThat(activeEvent.getHeader().getRevision()).isEqualTo(emptyEvent.getHeader().getRevision());

// Put a value being watched by only the active watcher
activeWatcherEventRef.set(null);
emptyWatcherEventRef.set(null);
client.getKVClient().put(key, value).get();
await().atMost(TIME_OUT_SECONDS, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(activeWatcherEventRef.get()).isNotNull();
});
activeEvent = activeWatcherEventRef.get();
emptyEvent = emptyWatcherEventRef.get();
assertThat(emptyEvent).isNull();
assertThat(activeEvent).isNotNull();
long latestRevision = activeEvent.getHeader().getRevision();

// verify the next progress notify brings both watchers to the latest revision
activeWatcherEventRef.set(null);
emptyWatcherEventRef.set(null);
watchClient.requestProgress();
await().atMost(TIME_OUT_SECONDS, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(activeWatcherEventRef.get()).isNotNull();
assertThat(emptyWatcherEventRef.get()).isNotNull();
});
activeEvent = activeWatcherEventRef.get();
emptyEvent = emptyWatcherEventRef.get();
assertThat(activeEvent).satisfies(WatchResponse::isProgressNotify);
assertThat(emptyEvent).satisfies(WatchResponse::isProgressNotify);
assertThat(activeEvent.getHeader().getRevision()).isEqualTo(emptyEvent.getHeader().getRevision())
.isEqualTo(latestRevision);
}
}

@ParameterizedTest
@MethodSource("parameters")
public void testWatchFutureRevisionIsNotOverwrittenOnCreation(final Client client) throws Exception {
Expand Down