Skip to content

Commit

Permalink
*: Add progress notify request watch request
Browse files Browse the repository at this point in the history
  • Loading branch information
jpbetz committed Jun 20, 2018
1 parent 55a05d9 commit 03a8503
Show file tree
Hide file tree
Showing 11 changed files with 737 additions and 414 deletions.
11 changes: 10 additions & 1 deletion Documentation/dev-guide/api_reference_v3.md
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ Empty field.
| ----- | ----------- | ---- |
| cluster_id | cluster_id is the ID of the cluster which sent the response. | uint64 |
| member_id | member_id is the ID of the member which sent the response. | uint64 |
| revision | revision is the key-value store revision when the request was applied. | int64 |
| revision | revision is the key-value store revision when the request was applied. For watch progress responses, the header.revision indicates progress. All future events recieved in this stream are guarenteed to have a higher revision number than the header.revision number. | int64 |
| raft_term | raft_term is the raft term when the request was applied. | uint64 |


Expand Down Expand Up @@ -840,13 +840,22 @@ From google paxosdb paper: Our implementation hinges around a powerful primitive



##### message `WatchProgressRequest` (etcdserver/etcdserverpb/rpc.proto)

Requests the a watch stream progress status be sent in the watch response stream as soon as possible.

Empty field.



##### message `WatchRequest` (etcdserver/etcdserverpb/rpc.proto)

| Field | Description | Type |
| ----- | ----------- | ---- |
| request_union | request_union is a request to either create a new watcher or cancel an existing watcher. | oneof |
| create_request | | WatchCreateRequest |
| cancel_request | | WatchCancelRequest |
| progress_request | | WatchProgressRequest |



Expand Down
9 changes: 8 additions & 1 deletion Documentation/dev-guide/apispec/swagger/rpc.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -2195,7 +2195,7 @@
"format": "uint64"
},
"revision": {
"description": "revision is the key-value store revision when the request was applied.",
"description": "revision is the key-value store revision when the request was applied.\nFor watch progress responses, the header.revision indicates progress. All future events\nrecieved in this stream are guarenteed to have a higher revision number than the\nheader.revision number.",
"type": "string",
"format": "int64"
}
Expand Down Expand Up @@ -2396,6 +2396,10 @@
}
}
},
"etcdserverpbWatchProgressRequest": {
"description": "Requests the a watch stream progress status be sent in the watch response stream as soon as\npossible.",
"type": "object"
},
"etcdserverpbWatchRequest": {
"type": "object",
"properties": {
Expand All @@ -2404,6 +2408,9 @@
},
"create_request": {
"$ref": "#/definitions/etcdserverpbWatchCreateRequest"
},
"progress_request": {
"$ref": "#/definitions/etcdserverpbWatchProgressRequest"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@
"revision": {
"type": "string",
"format": "int64",
"description": "revision is the key-value store revision when the request was applied."
"description": "revision is the key-value store revision when the request was applied.\nFor watch progress responses, the header.revision indicates progress. All future events\nrecieved in this stream are guarenteed to have a higher revision number than the\nheader.revision number."
},
"raft_term": {
"type": "string",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
"revision": {
"type": "string",
"format": "int64",
"description": "revision is the key-value store revision when the request was applied."
"description": "revision is the key-value store revision when the request was applied.\nFor watch progress responses, the header.revision indicates progress. All future events\nrecieved in this stream are guarenteed to have a higher revision number than the\nheader.revision number."
},
"raft_term": {
"type": "string",
Expand Down
46 changes: 46 additions & 0 deletions clientv3/integration/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,52 @@ func testWatchWithProgressNotify(t *testing.T, watchOnPut bool) {
}
}

func TestWatchRequestProgress(t *testing.T) {
defer testutil.AfterTest(t)

watchTimeout := 3 * time.Second

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)

wc := clus.RandClient()

rch := wc.Watch(context.Background(), "/", clientv3.WithPrefix())

_, err := wc.Put(context.Background(), "/a", "1")
if err != nil {
t.Fatal(err)
}

select {
case resp := <-rch: // wait for notification
if len(resp.Events) != 1 {
t.Fatalf("resp.Events expected 1, got %d", len(resp.Events))
}
case <-time.After(watchTimeout):
t.Fatalf("watch response expected in %v, but timed out", watchTimeout)
}

for i := 0; i < 2; i++ { // ensure progress does not change if the only requests are for progress
err = wc.RequestProgress(context.Background())
if err != nil {
t.Fatal(err)
}

select {
case resp := <-rch:
if !resp.IsProgressNotify() {
t.Fatalf("expected resp.IsProgressNotify() == true")
}
if resp.Header.Revision != 2 {
t.Fatalf("resp.Header.Revision expected 2, got %d", resp.Header.Revision)
}
case <-time.After(watchTimeout):
t.Fatalf("watch response expected in %v, but timed out", watchTimeout)
}
}
}

func TestWatchEventType(t *testing.T) {
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
Expand Down
102 changes: 82 additions & 20 deletions clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ type Watcher interface {

// Close closes the watcher and cancels all watch requests.
Close() error

// RequestProgress requests a progress notify response be sent in all WatchChans.
RequestProgress(ctx context.Context) error
}

type WatchResponse struct {
Expand Down Expand Up @@ -150,7 +153,7 @@ type watchGrpcStream struct {
resuming []*watcherStream

// reqc sends a watch request from Watch() to the main goroutine
reqc chan *watchRequest
reqc chan watchStreamRequest
// respc receives data from the watch client
respc chan *pb.WatchResponse
// donec closes to broadcast shutdown
Expand All @@ -168,6 +171,11 @@ type watchGrpcStream struct {
closeErr error
}

// watchStreamRequest is a union of the supported watch request operation types
type watchStreamRequest interface {
toPB() *pb.WatchRequest
}

// watchRequest is issued by the subscriber to start a new watcher
type watchRequest struct {
ctx context.Context
Expand All @@ -192,6 +200,10 @@ type watchRequest struct {
retc chan chan WatchResponse
}

// progressRequest is issued by the subscriber to request watch progress
type progressRequest struct {
}

// watcherStream represents a registered watcher
type watcherStream struct {
// initReq is the request that initiated this request
Expand Down Expand Up @@ -249,7 +261,7 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
cancel: cancel,
substreams: make(map[int64]*watcherStream),
respc: make(chan *pb.WatchResponse),
reqc: make(chan *watchRequest),
reqc: make(chan watchStreamRequest),
donec: make(chan struct{}),
errc: make(chan error, 1),
closingc: make(chan *watcherStream),
Expand Down Expand Up @@ -355,6 +367,44 @@ func (w *watcher) Close() (err error) {
return err
}

// RequestProgress requests a progress notify response be sent in all WatchChans.
func (w *watcher) RequestProgress(ctx context.Context) (err error) {
ctxKey := streamKeyFromCtx(ctx)

w.mu.Lock()
if w.streams == nil {
return fmt.Errorf("No stream found for context")
}
wgs := w.streams[ctxKey]
if wgs == nil {
wgs = w.newWatcherGrpcStream(ctx)
w.streams[ctxKey] = wgs
}
donec := wgs.donec
reqc := wgs.reqc
w.mu.Unlock()

pr := &progressRequest{}

select {
case reqc <- pr:
return nil
case <-ctx.Done():
if err == nil {
return ctx.Err()
}
return err
case <-donec:
if wgs.closeErr != nil {
return wgs.closeErr
}
// retry; may have dropped stream from no ctxs
return w.RequestProgress(ctx)
}
// TODO: block for progress response?
return fmt.Errorf("Unexpected error submitting progress request")
}

func (w *watchGrpcStream) close() (err error) {
w.cancel()
<-w.donec
Expand Down Expand Up @@ -462,26 +512,31 @@ func (w *watchGrpcStream) run() {
for {
select {
// Watch() requested
case wreq := <-w.reqc:
outc := make(chan WatchResponse, 1)
// TODO: pass custom watch ID?
ws := &watcherStream{
initReq: *wreq,
id: -1,
outc: outc,
// unbuffered so resumes won't cause repeat events
recvc: make(chan *WatchResponse),
}
case req := <-w.reqc:
switch wreq := req.(type) {
case *watchRequest:
outc := make(chan WatchResponse, 1)
// TODO: pass custom watch ID?
ws := &watcherStream{
initReq: *wreq,
id: -1,
outc: outc,
// unbuffered so resumes won't cause repeat events
recvc: make(chan *WatchResponse),
}

ws.donec = make(chan struct{})
w.wg.Add(1)
go w.serveSubstream(ws, w.resumec)
ws.donec = make(chan struct{})
w.wg.Add(1)
go w.serveSubstream(ws, w.resumec)

// queue up for watcher creation/resume
w.resuming = append(w.resuming, ws)
if len(w.resuming) == 1 {
// head of resume queue, can register a new watcher
wc.Send(ws.initReq.toPB())
// queue up for watcher creation/resume
w.resuming = append(w.resuming, ws)
if len(w.resuming) == 1 {
// head of resume queue, can register a new watcher
wc.Send(ws.initReq.toPB())
}
case *progressRequest:
wc.Send(wreq.toPB())
}

// new events from the watch client
Expand Down Expand Up @@ -882,6 +937,13 @@ func (wr *watchRequest) toPB() *pb.WatchRequest {
return &pb.WatchRequest{RequestUnion: cr}
}

// toPB converts an internal progress request structure to its protobuf WatchRequest structure.
func (pr *progressRequest) toPB() *pb.WatchRequest {
req := &pb.WatchProgressRequest{}
cr := &pb.WatchRequest_ProgressRequest{ProgressRequest: req}
return &pb.WatchRequest{RequestUnion: cr}
}

func streamKeyFromCtx(ctx context.Context) string {
if md, ok := metadata.FromOutgoingContext(ctx); ok {
return fmt.Sprintf("%+v", md)
Expand Down
41 changes: 25 additions & 16 deletions etcdctl/ctlv3/command/watch_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,27 +101,33 @@ func watchInteractiveFunc(cmd *cobra.Command, osArgs []string, envKey, envRange
l = strings.TrimSuffix(l, "\n")

args := argify(l)
if len(args) < 2 && envKey == "" {
fmt.Fprintf(os.Stderr, "Invalid command %s (command type or key is not provided)\n", l)
continue
}
switch args[0] {
case "watch":
if len(args) < 2 && envKey == "" {
fmt.Fprintf(os.Stderr, "Invalid command %s (command type or key is not provided)\n", l)
continue
}
watchArgs, execArgs, perr := parseWatchArgs(osArgs, args, envKey, envRange, true)
if perr != nil {
ExitWithError(ExitBadArgs, perr)
}

if args[0] != "watch" {
ch, err := getWatchChan(c, watchArgs)
if err != nil {
fmt.Fprintf(os.Stderr, "Invalid command %s (%v)\n", l, err)
continue
}
go printWatchCh(c, ch, execArgs)
case "progress":
err := c.RequestProgress(clientv3.WithRequireLeader(context.Background()))
if err != nil {
ExitWithError(ExitError, err)
}
default:
fmt.Fprintf(os.Stderr, "Invalid command %s (only support watch)\n", l)
continue
}

watchArgs, execArgs, perr := parseWatchArgs(osArgs, args, envKey, envRange, true)
if perr != nil {
ExitWithError(ExitBadArgs, perr)
}

ch, err := getWatchChan(c, watchArgs)
if err != nil {
fmt.Fprintf(os.Stderr, "Invalid command %s (%v)\n", l, err)
continue
}
go printWatchCh(c, ch, execArgs)
}
}

Expand Down Expand Up @@ -152,6 +158,9 @@ func printWatchCh(c *clientv3.Client, ch clientv3.WatchChan, execArgs []string)
if resp.Canceled {
fmt.Fprintf(os.Stderr, "watch was canceled (%v)\n", resp.Err())
}
if resp.IsProgressNotify() {
fmt.Fprintf(os.Stdout, "progress notify: %d\n", resp.Header.Revision)
}
display.Watch(resp)

if len(execArgs) > 0 {
Expand Down
7 changes: 6 additions & 1 deletion etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,12 @@ func (sws *serverWatchStream) recvLoop() error {
sws.mu.Unlock()
}
}

case *pb.WatchRequest_ProgressRequest:
if uv.ProgressRequest != nil {
sws.ctrlStream <- &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchStream.Rev()),
}
}
default:
// we probably should not shutdown the entire stream when
// receive an valid command.
Expand Down
1 change: 1 addition & 0 deletions etcdserver/etcdserverpb/etcdserver.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 03a8503

Please sign in to comment.