Skip to content

Commit

Permalink
[FAB-2606] Enable deliver_stdout to seek
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-2606

The `deliver_stdout` client is useful for debugging. This changeset
introduces a `seek` flag that allows us to seek to oldest or newest
(using the convention in [1]), or extract a specific block.

[1] https://godoc.org/github.com/Shopify/sarama#pkg-constants

Change-Id: Iaf8d3c596cfc23768d2bfc257ee32ef9a4512c54
Signed-off-by: Kostas Christidis <kostas@christidis.io>
  • Loading branch information
kchristidis committed Mar 2, 2017
1 parent d4fa505 commit 28687ca
Showing 1 changed file with 32 additions and 7 deletions.
39 changes: 32 additions & 7 deletions orderer/sample_clients/deliver_stdout/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ import (
"google.golang.org/grpc"
)

var (
oldest = &ab.SeekPosition{Type: &ab.SeekPosition_Oldest{Oldest: &ab.SeekOldest{}}}
newest = &ab.SeekPosition{Type: &ab.SeekPosition_Newest{Newest: &ab.SeekNewest{}}}
maxStop = &ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: math.MaxUint64}}}
)

type deliverClient struct {
client ab.AtomicBroadcast_DeliverClient
chainID string
Expand All @@ -39,7 +45,7 @@ func newDeliverClient(client ab.AtomicBroadcast_DeliverClient, chainID string) *
return &deliverClient{client: client, chainID: chainID}
}

func seekHelper(chainID string, start *ab.SeekPosition) *cb.Envelope {
func seekHelper(chainID string, start *ab.SeekPosition, stop *ab.SeekPosition) *cb.Envelope {
return &cb.Envelope{
Payload: utils.MarshalOrPanic(&cb.Payload{
Header: &cb.Header{
Expand All @@ -51,23 +57,24 @@ func seekHelper(chainID string, start *ab.SeekPosition) *cb.Envelope {

Data: utils.MarshalOrPanic(&ab.SeekInfo{
Start: start,
Stop: &ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: math.MaxUint64}}},
Stop: stop,
Behavior: ab.SeekInfo_BLOCK_UNTIL_READY,
}),
}),
}
}

func (r *deliverClient) seekOldest() error {
return r.client.Send(seekHelper(r.chainID, &ab.SeekPosition{Type: &ab.SeekPosition_Oldest{Oldest: &ab.SeekOldest{}}}))
return r.client.Send(seekHelper(r.chainID, oldest, maxStop))
}

func (r *deliverClient) seekNewest() error {
return r.client.Send(seekHelper(r.chainID, &ab.SeekPosition{Type: &ab.SeekPosition_Newest{Newest: &ab.SeekNewest{}}}))
return r.client.Send(seekHelper(r.chainID, newest, maxStop))
}

func (r *deliverClient) seek(blockNumber uint64) error {
return r.client.Send(seekHelper(r.chainID, &ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: blockNumber}}}))
func (r *deliverClient) seekSingle(blockNumber uint64) error {
specific := &ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: blockNumber}}}
return r.client.Send(seekHelper(r.chainID, specific, specific))
}

func (r *deliverClient) readUntilClose() {
Expand All @@ -93,11 +100,21 @@ func main() {

var chainID string
var serverAddr string
var seek int

flag.StringVar(&serverAddr, "server", fmt.Sprintf("%s:%d", config.General.ListenAddress, config.General.ListenPort), "The RPC server to connect to.")
flag.StringVar(&chainID, "chainID", provisional.TestChainID, "The chain ID to deliver from.")
flag.IntVar(&seek, "seek", -2, "Specify the range of requested blocks."+
"Acceptable values:"+
"-2 (or -1) to start from oldest (or newest) and keep at it indefinitely."+
"N >= 0 to fetch block N only.")
flag.Parse()

if seek < -2 {
fmt.Println("Wrong seek value.")
flag.PrintDefaults()
}

conn, err := grpc.Dial(serverAddr, grpc.WithInsecure())
if err != nil {
fmt.Println("Error connecting:", err)
Expand All @@ -110,7 +127,15 @@ func main() {
}

s := newDeliverClient(client, chainID)
err = s.seekOldest()
switch seek {
case -2:
err = s.seekOldest()
case -1:
err = s.seekNewest()
default:
err = s.seekSingle(uint64(seek))
}

if err != nil {
fmt.Println("Received error:", err)
}
Expand Down

0 comments on commit 28687ca

Please sign in to comment.