diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 4d990cf..1513af3 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -10,16 +10,30 @@ jobs: build: runs-on: ubuntu-latest + env: + working-directory: ./ steps: - uses: actions/checkout@v3 - name: Set up Go uses: actions/setup-go@v3 with: - go-version: 1.18 + go-version: 1.16 + + - name: Install cfssl + working-directory: ${{env.working-directory}} + run: | + sudo apt-get update + sudo apt-get -y install golang-cfssl + + - name: Create credentials + working-directory: ${{env.working-directory}} + run: | + make init + make gencert - name: Build run: go build -v ./... - name: Test - run: go test -v ./... + run: make test diff --git a/Makefile b/Makefile index 2034f02..597b667 100644 --- a/Makefile +++ b/Makefile @@ -49,7 +49,7 @@ $(CONFIG_PATH)/policy.csv: .PHONY: test test: $(CONFIG_PATH)/policy.csv $(CONFIG_PATH)/model.conf - go test -race ./... -v + go test -v ./... .PHONY: compile compile: diff --git a/api/streaming/v1/streaming.pb.go b/api/streaming/v1/streaming.pb.go index 7df1b98..54875ef 100644 --- a/api/streaming/v1/streaming.pb.go +++ b/api/streaming/v1/streaming.pb.go @@ -25,8 +25,7 @@ type ConsumeRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Locus string `protobuf:"bytes,1,opt,name=locus,proto3" json:"locus,omitempty"` - Point string `protobuf:"bytes,2,opt,name=point,proto3" json:"point,omitempty"` + Point string `protobuf:"bytes,1,opt,name=point,proto3" json:"point,omitempty"` } func (x *ConsumeRequest) Reset() { @@ -61,13 +60,6 @@ func (*ConsumeRequest) Descriptor() ([]byte, []int) { return file_api_streaming_v1_streaming_proto_rawDescGZIP(), []int{0} } -func (x *ConsumeRequest) GetLocus() string { - if x != nil { - return x.Locus - } - return "" -} - func (x *ConsumeRequest) GetPoint() string { if x != nil { return x.Point @@ -127,8 +119,7 @@ type ProduceRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Locus string `protobuf:"bytes,1,opt,name=locus,proto3" json:"locus,omitempty"` - Point string `protobuf:"bytes,2,opt,name=point,proto3" json:"point,omitempty"` + Point string `protobuf:"bytes,1,opt,name=point,proto3" json:"point,omitempty"` Frame []byte `protobuf:"bytes,3,opt,name=frame,proto3" json:"frame,omitempty"` } @@ -164,13 +155,6 @@ func (*ProduceRequest) Descriptor() ([]byte, []int) { return file_api_streaming_v1_streaming_proto_rawDescGZIP(), []int{2} } -func (x *ProduceRequest) GetLocus() string { - if x != nil { - return x.Locus - } - return "" -} - func (x *ProduceRequest) GetPoint() string { if x != nil { return x.Point @@ -190,7 +174,7 @@ type ProduceResponse struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Offset uint64 `protobuf:"varint,1,opt,name=offset,proto3" json:"offset,omitempty"` + Records []*Record `protobuf:"bytes,1,rep,name=records,proto3" json:"records,omitempty"` } func (x *ProduceResponse) Reset() { @@ -225,7 +209,62 @@ func (*ProduceResponse) Descriptor() ([]byte, []int) { return file_api_streaming_v1_streaming_proto_rawDescGZIP(), []int{3} } -func (x *ProduceResponse) GetOffset() uint64 { +func (x *ProduceResponse) GetRecords() []*Record { + if x != nil { + return x.Records + } + return nil +} + +type Record struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Point string `protobuf:"bytes,1,opt,name=point,proto3" json:"point,omitempty"` + Offset uint64 `protobuf:"varint,2,opt,name=offset,proto3" json:"offset,omitempty"` +} + +func (x *Record) Reset() { + *x = Record{} + if protoimpl.UnsafeEnabled { + mi := &file_api_streaming_v1_streaming_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Record) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Record) ProtoMessage() {} + +func (x *Record) ProtoReflect() protoreflect.Message { + mi := &file_api_streaming_v1_streaming_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Record.ProtoReflect.Descriptor instead. +func (*Record) Descriptor() ([]byte, []int) { + return file_api_streaming_v1_streaming_proto_rawDescGZIP(), []int{4} +} + +func (x *Record) GetPoint() string { + if x != nil { + return x.Point + } + return "" +} + +func (x *Record) GetOffset() uint64 { if x != nil { return x.Offset } @@ -238,35 +277,38 @@ var file_api_streaming_v1_streaming_proto_rawDesc = []byte{ 0x0a, 0x20, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2f, 0x76, 0x31, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0c, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, - 0x22, 0x3c, 0x0a, 0x0e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x6c, 0x6f, 0x63, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x05, 0x6c, 0x6f, 0x63, 0x75, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x70, 0x6f, 0x69, 0x6e, - 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x22, 0x27, - 0x0a, 0x0f, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x12, 0x14, 0x0a, 0x05, 0x66, 0x72, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, - 0x52, 0x05, 0x66, 0x72, 0x61, 0x6d, 0x65, 0x22, 0x52, 0x0a, 0x0e, 0x50, 0x72, 0x6f, 0x64, 0x75, - 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x6c, 0x6f, 0x63, - 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6c, 0x6f, 0x63, 0x75, 0x73, 0x12, - 0x14, 0x0a, 0x05, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, - 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x66, 0x72, 0x61, 0x6d, 0x65, 0x18, 0x03, - 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x66, 0x72, 0x61, 0x6d, 0x65, 0x22, 0x29, 0x0a, 0x0f, 0x50, - 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, - 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x06, - 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x32, 0xaf, 0x01, 0x0a, 0x09, 0x53, 0x74, 0x72, 0x65, 0x61, - 0x6d, 0x69, 0x6e, 0x67, 0x12, 0x50, 0x0a, 0x0d, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x53, - 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x1c, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, - 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2e, - 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x50, 0x0a, 0x0d, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, - 0x65, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x1c, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, - 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, - 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x42, 0x2b, 0x5a, 0x29, 0x67, 0x69, 0x74, 0x68, - 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x42, 0x72, 0x69, 0x6a, 0x65, 0x73, 0x68, 0x6c, 0x61, - 0x6b, 0x6b, 0x61, 0x64, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, - 0x6e, 0x67, 0x5f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x22, 0x26, 0x0a, 0x0e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x05, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x22, 0x27, 0x0a, 0x0f, 0x43, 0x6f, 0x6e, 0x73, + 0x75, 0x6d, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x66, + 0x72, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x66, 0x72, 0x61, 0x6d, + 0x65, 0x22, 0x3c, 0x0a, 0x0e, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x05, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x66, 0x72, 0x61, + 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x66, 0x72, 0x61, 0x6d, 0x65, 0x22, + 0x41, 0x0a, 0x0f, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x2e, 0x0a, 0x07, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x18, 0x01, 0x20, + 0x03, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2e, + 0x76, 0x31, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x07, 0x72, 0x65, 0x63, 0x6f, 0x72, + 0x64, 0x73, 0x22, 0x36, 0x0a, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x12, 0x14, 0x0a, 0x05, + 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x70, 0x6f, 0x69, + 0x6e, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x04, 0x52, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x32, 0xaf, 0x01, 0x0a, 0x09, 0x53, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x12, 0x50, 0x0a, 0x0d, 0x43, 0x6f, 0x6e, 0x73, + 0x75, 0x6d, 0x65, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x1c, 0x2e, 0x73, 0x74, 0x72, 0x65, + 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x50, 0x0a, 0x0d, 0x50, 0x72, + 0x6f, 0x64, 0x75, 0x63, 0x65, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x1c, 0x2e, 0x73, 0x74, + 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x72, 0x6f, 0x64, 0x75, + 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x73, 0x74, 0x72, 0x65, + 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x42, 0x2b, 0x5a, 0x29, + 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x42, 0x72, 0x69, 0x6a, 0x65, + 0x73, 0x68, 0x6c, 0x61, 0x6b, 0x6b, 0x61, 0x64, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x74, 0x72, + 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x5f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, } var ( @@ -281,23 +323,25 @@ func file_api_streaming_v1_streaming_proto_rawDescGZIP() []byte { return file_api_streaming_v1_streaming_proto_rawDescData } -var file_api_streaming_v1_streaming_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_api_streaming_v1_streaming_proto_msgTypes = make([]protoimpl.MessageInfo, 5) var file_api_streaming_v1_streaming_proto_goTypes = []interface{}{ (*ConsumeRequest)(nil), // 0: streaming.v1.ConsumeRequest (*ConsumeResponse)(nil), // 1: streaming.v1.ConsumeResponse (*ProduceRequest)(nil), // 2: streaming.v1.ProduceRequest (*ProduceResponse)(nil), // 3: streaming.v1.ProduceResponse + (*Record)(nil), // 4: streaming.v1.Record } var file_api_streaming_v1_streaming_proto_depIdxs = []int32{ - 0, // 0: streaming.v1.Streaming.ConsumeStream:input_type -> streaming.v1.ConsumeRequest - 2, // 1: streaming.v1.Streaming.ProduceStream:input_type -> streaming.v1.ProduceRequest - 1, // 2: streaming.v1.Streaming.ConsumeStream:output_type -> streaming.v1.ConsumeResponse - 3, // 3: streaming.v1.Streaming.ProduceStream:output_type -> streaming.v1.ProduceResponse - 2, // [2:4] is the sub-list for method output_type - 0, // [0:2] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name + 4, // 0: streaming.v1.ProduceResponse.records:type_name -> streaming.v1.Record + 0, // 1: streaming.v1.Streaming.ConsumeStream:input_type -> streaming.v1.ConsumeRequest + 2, // 2: streaming.v1.Streaming.ProduceStream:input_type -> streaming.v1.ProduceRequest + 1, // 3: streaming.v1.Streaming.ConsumeStream:output_type -> streaming.v1.ConsumeResponse + 3, // 4: streaming.v1.Streaming.ProduceStream:output_type -> streaming.v1.ProduceResponse + 3, // [3:5] is the sub-list for method output_type + 1, // [1:3] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name } func init() { file_api_streaming_v1_streaming_proto_init() } @@ -354,6 +398,18 @@ func file_api_streaming_v1_streaming_proto_init() { return nil } } + file_api_streaming_v1_streaming_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Record); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -361,7 +417,7 @@ func file_api_streaming_v1_streaming_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_api_streaming_v1_streaming_proto_rawDesc, NumEnums: 0, - NumMessages: 4, + NumMessages: 5, NumExtensions: 0, NumServices: 1, }, diff --git a/api/streaming/v1/streaming.proto b/api/streaming/v1/streaming.proto index 538845f..7ad8b2b 100644 --- a/api/streaming/v1/streaming.proto +++ b/api/streaming/v1/streaming.proto @@ -10,8 +10,7 @@ service Streaming { } message ConsumeRequest { - string locus = 1; - string point = 2; + string point = 1; } message ConsumeResponse { @@ -19,11 +18,15 @@ message ConsumeResponse { } message ProduceRequest { - string locus = 1; - string point = 2; + string point = 1; bytes frame = 3; } message ProduceResponse { - uint64 offset = 1; + repeated Record records = 1; +} + +message Record { + string point = 1; + uint64 offset = 2; } \ No newline at end of file diff --git a/go.mod b/go.mod index 0970782..6bef45a 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/Brijeshlakkad/ring v0.0.0-20220814223321-1a57633133d0 github.com/armon/go-metrics v0.4.0 // indirect github.com/casbin/casbin v1.9.1 + github.com/go-redis/redis v6.15.9+incompatible github.com/google/btree v1.1.2 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/hashicorp/errwrap v1.1.0 // indirect @@ -14,6 +15,9 @@ require ( github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-sockaddr v1.0.2 // indirect github.com/miekg/dns v1.1.50 // indirect + github.com/onsi/ginkgo v1.16.5 // indirect + github.com/onsi/gomega v1.20.0 // indirect + github.com/pelletier/go-toml v1.9.5 github.com/soheilhy/cmux v0.1.5 github.com/spf13/cobra v1.5.0 github.com/spf13/viper v1.12.0 @@ -21,7 +25,7 @@ require ( github.com/travisjeffery/go-dynaport v1.0.0 github.com/tysonmote/gommap v0.0.2 go.opencensus.io v0.23.0 - go.uber.org/atomic v1.10.0 // indirect + go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.8.0 // indirect golang.org/x/net v0.0.0-20220812174116-3211cb980234 // indirect golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 diff --git a/go.sum b/go.sum index 88edff3..bf15c2d 100644 --- a/go.sum +++ b/go.sum @@ -127,6 +127,8 @@ github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE= github.com/frankban/quicktest v1.14.3/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI= github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= @@ -139,7 +141,10 @@ github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vb github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= +github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= +github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= @@ -213,6 +218,7 @@ github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210601050228-01bbb1931b22/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210609004039-a478d1d731e9/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= @@ -275,6 +281,7 @@ github.com/hashicorp/serf v0.9.6/go.mod h1:TXZNMjZQijwlDvp+r0b63xZ45H7JmCmgg4gpT github.com/hashicorp/serf v0.9.7/go.mod h1:TXZNMjZQijwlDvp+r0b63xZ45H7JmCmgg4gpTwn9UV4= github.com/hashicorp/serf v0.10.0 h1:89qvvpfMQnz6c2y4pv7j2vUUmeT1+5TSZMexuTbtsPs= github.com/hashicorp/serf v0.10.0/go.mod h1:bXN03oZc5xlH46k/K1qTrpXb9ERKyY1/i/N5mxvgrZw= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= @@ -341,6 +348,22 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= +github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= +github.com/onsi/ginkgo/v2 v2.1.3/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c= +github.com/onsi/ginkgo/v2 v2.1.4/go.mod h1:um6tUpWM/cxCK3/FK8BXqEiUMUwRgSM4JXG47RKZmLU= +github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= +github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= +github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro= +github.com/onsi/gomega v1.20.0 h1:8W0cWlwFkflGPLltQvLRB7ZVD5HuP6ng320w2IS245Q= +github.com/onsi/gomega v1.20.0/go.mod h1:DtrZpjmvpn2mPm4YWQa0/ALMDj9v4YxLgojwPeREyVo= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= @@ -429,6 +452,7 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.etcd.io/etcd/api/v3 v3.5.4/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A= go.etcd.io/etcd/client/pkg/v3 v3.5.4/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= @@ -444,6 +468,7 @@ go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= @@ -503,10 +528,12 @@ golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -530,6 +557,7 @@ golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20200501053045-e0ff5e5a1de5/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200506145744-7e3656a0809f/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200513185701-a91f0712d120/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= @@ -545,8 +573,10 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8= +golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= @@ -595,6 +625,7 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -606,11 +637,14 @@ golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -636,6 +670,7 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -660,14 +695,17 @@ golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220328115105-d36c6a25d886/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220422013727-9388b58f7150/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220502124256-b6088ccd6cba/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -735,6 +773,7 @@ golang.org/x/tools v0.0.0-20200904185747-39188db58858/go.mod h1:Cj7w3i3Rnn0Xh82u golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= @@ -745,6 +784,7 @@ golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E= golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -939,8 +979,11 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/ini.v1 v1.66.4 h1:SsAcf+mM7mRZo2nJNGt8mZCjG8ZRaNGMURJw7BsIST4= gopkg.in/ini.v1 v1.66.4/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/internal/agent/agent.go b/internal/agent/agent.go index ec2c3b7..2439e45 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -122,8 +122,6 @@ func (a *Agent) setupLoci() error { a.membership.AddListener("locimanager", a.loci) - // TODO: Bootstrap - return err } @@ -134,8 +132,8 @@ func (a *Agent) setupServer() error { ) serverConfig := &server.Config{ StreamingConfig: &server.StreamingConfig{ - LociManager: a.loci, - Authorizer: authorizer, + Locus: a.loci, + Authorizer: authorizer, }, } var opts []grpc.ServerOption @@ -184,7 +182,7 @@ func (a *Agent) Shutdown() error { a.server.GracefulStop() return nil }, - a.loci.CloseAll, + a.loci.Close, } for _, fn := range shutdown { if err := fn(); err != nil { diff --git a/internal/agent/agent_test.go b/internal/agent/agent_test.go index 2e81ac0..106c80e 100644 --- a/internal/agent/agent_test.go +++ b/internal/agent/agent_test.go @@ -45,18 +45,20 @@ func TestAgent(t *testing.T) { bindAddr := fmt.Sprintf("%s:%d", "127.0.0.1", ports[0]) rpcPort := ports[1] - dataDir, err := ioutil.TempDir("", "agent-test-loci") + dataDir, err := ioutil.TempDir("", "agentInstance-test-loci") require.NoError(t, err) var startJoinAddrs []string + bootstrap := true if i != 0 { + bootstrap = false startJoinAddrs = append( startJoinAddrs, agents[0].Config.BindAddr, ) } - agent, err := agent.New(agent.Config{ + agentInstance, err := agent.New(agent.Config{ NodeName: fmt.Sprintf("%d", i), SeedAddresses: startJoinAddrs, BindAddr: bindAddr, @@ -66,17 +68,18 @@ func TestAgent(t *testing.T) { ACLPolicyFile: config.ACLPolicyFile, ServerTLSConfig: serverTLSConfig, PeerTLSConfig: peerTLSConfig, + Bootstrap: bootstrap, }) require.NoError(t, err) - agents = append(agents, agent) + agents = append(agents, agentInstance) } defer func() { - for _, agent := range agents { - err := agent.Shutdown() + for _, agentInstance := range agents { + err := agentInstance.Shutdown() require.NoError(t, err) require.NoError(t, - os.RemoveAll(agent.Config.DataDir), + os.RemoveAll(agentInstance.Config.DataDir), ) } }() @@ -84,7 +87,6 @@ func TestAgent(t *testing.T) { time.Sleep(3 * time.Second) var ( - locusId = "goutube-client" pointId = "sample_file" lines = 10 ) @@ -94,7 +96,7 @@ func TestAgent(t *testing.T) { require.NoError(t, err) for i := 0; i < lines; i++ { - err := stream.Send(&streaming_api.ProduceRequest{Locus: locusId, Point: pointId, Frame: []byte(fmt.Sprintln(i))}) + err := stream.Send(&streaming_api.ProduceRequest{Point: pointId, Frame: []byte(fmt.Sprintln(i))}) require.NoError(t, err) } @@ -106,7 +108,7 @@ func TestAgent(t *testing.T) { // test consume stream followerClient := client(t, agents[1], peerTLSConfig) - resStream, err := followerClient.ConsumeStream(context.Background(), &streaming_api.ConsumeRequest{Locus: locusId, Point: pointId}) + resStream, err := followerClient.ConsumeStream(context.Background(), &streaming_api.ConsumeRequest{Point: pointId}) if err != nil { log.Fatalf("error while calling ConsumeStream RPC: %v", err) } diff --git a/internal/locus/arc.go b/internal/locus/arc.go index 2a4124c..a0fbceb 100644 --- a/internal/locus/arc.go +++ b/internal/locus/arc.go @@ -8,35 +8,47 @@ import ( "github.com/hashicorp/go-hclog" ) -type Arc struct { - transport *Transport - // Used for our logging - logger hclog.Logger +var ( + // ErrArcShutdown is returned when operations are requested against an + // inactive Raft. + ErrArcShutdown = errors.New("arc is already shutdown") + + // ErrEnqueueTimeout is returned when a command fails due to a timeout. + ErrEnqueueTimeout = errors.New("timed out enqueuing operation") + + // ErrStoreNullPointer is returned when the provided ArcConfig has nil Log + ErrStoreNullPointer = errors.New("store cannot be nil") + // ErrFSMNullPointer is returned when the provided ArcConfig has nil FSM + ErrFSMNullPointer = errors.New("FSM cannot be nil") +) + +type Arc struct { State fsm FSM + // Dialer + StreamLayer StreamLayer + logger hclog.Logger + // Timeout is used to apply I/O deadlines. For InstallSnapshot, we multiply + // the timeout by (SnapshotSize / TimeoutScale). + Timeout time.Duration + store Store + MaxChunkSize uint64 + transport *Transport // Shutdown channel to exit, protected to prevent concurrent exits - shutdown bool - shutdownCh chan struct{} - shutdownLock sync.Mutex - - applyCh chan *CommandPromise - + shutdown bool + shutdownCh chan struct{} + shutdownLock sync.Mutex + applyCh chan *RecordPromise rpcCh <-chan RPC replicateStateLock sync.Mutex + Dir string + bundler Bundler + bootStrap bool } -var ( - // ErrArcShutdown is returned when operations are requested against an - // inactive Raft. - ErrArcShutdown = errors.New("arc is already shutdown") - - // ErrEnqueueTimeout is returned when a command fails due to a timeout. - ErrEnqueueTimeout = errors.New("timed out enqueuing operation") -) - type ArcConfig struct { fsm FSM // Dialer @@ -45,9 +57,17 @@ type ArcConfig struct { // Timeout is used to apply I/O deadlines. For InstallSnapshot, we multiply // the timeout by (SnapshotSize / TimeoutScale). Timeout time.Duration + + store Store + + MaxChunkSize uint64 + + Bundler Bundler + + Bootstrap bool } -func NewArc(config ArcConfig) *Arc { +func NewArc(config ArcConfig) (*Arc, error) { if config.Logger == nil { config.Logger = hclog.New(&hclog.LoggerOptions{ Name: "goutube-arc", @@ -55,6 +75,15 @@ func NewArc(config ArcConfig) *Arc { Level: hclog.DefaultLevel, }) } + if config.store == nil { + return nil, ErrStoreNullPointer + } + if config.fsm == nil { + return nil, ErrFSMNullPointer + } + if config.MaxChunkSize == 0 { + config.MaxChunkSize = 512 + } transport := NewTransportWithConfig( &TransportConfig{ Stream: config.StreamLayer, @@ -63,21 +92,27 @@ func NewArc(config ArcConfig) *Arc { }, ) arc := &Arc{ - transport: transport, - rpcCh: transport.Consumer(), - fsm: config.fsm, - shutdownCh: make(chan struct{}), - applyCh: make(chan *CommandPromise), + transport: transport, + rpcCh: transport.Consumer(), + fsm: config.fsm, + StreamLayer: config.StreamLayer, + logger: config.Logger, + Timeout: config.Timeout, + store: config.store, + MaxChunkSize: config.MaxChunkSize, + shutdownCh: make(chan struct{}), + applyCh: make(chan *RecordPromise), State: State{ replicateState: make(map[string]*Follower), }, - logger: config.Logger, + bundler: config.Bundler, + bootStrap: config.Bootstrap, } go arc.runFSM() go arc.runThisPeer() - return arc + return arc, nil } func (arc *Arc) runThisPeer() { @@ -85,32 +120,54 @@ func (arc *Arc) runThisPeer() { select { case rpc := <-arc.rpcCh: arc.processRPC(rpc) + case <-arc.shutdownCh: + return } } } func (arc *Arc) processRPC(rpc RPC) { - rpc.Respond(arc.fsm.Apply(rpc.Command.(*CommandRequest)), nil) + var nextOffset uint64 + switch req := rpc.Command.(type) { + case *RecordEntriesRequest: + if len(req.Entries) > 0 { + for _, entry := range req.Entries { + resp := arc.fsm.Apply(entry) + nextOffset = resp.StoreValue.(uint64) + } + } + rpc.Respond(&RecordEntriesResponse{LastOff: nextOffset}, nil) + case *RecordRequest: + resp := arc.fsm.Apply(req) + nextOffset = resp.StoreValue.(uint64) + rpc.Respond(&RecordResponse{LastOff: nextOffset}, nil) + } } -func (arc *Arc) Apply(data []byte, timeout time.Duration) *CommandPromise { +func (arc *Arc) Apply(data []byte, timeout time.Duration) *RecordPromise { var timer <-chan time.Time if timeout > 0 { timer = time.After(timeout) } // Create a log future, no index or term yet - commandPromise := NewCommandPromise(&CommandRequest{ - Data: data, - }) + recordPromise := &RecordPromise{ + req: &RecordRequest{ + Data: data, + }, + resp: &RecordResponse{}, + } + recordPromise.init() select { case <-timer: - return commandPromise.respondError(ErrEnqueueTimeout) + recordPromise.respondError(ErrEnqueueTimeout) + return recordPromise case <-arc.shutdownCh: - return commandPromise.respondError(ErrArcShutdown) - case arc.applyCh <- commandPromise: - return commandPromise + recordPromise.respondError(ErrArcShutdown) + return recordPromise + case arc.applyCh <- recordPromise: + return recordPromise } } @@ -125,7 +182,9 @@ func (arc *Arc) Join(rpcAddr string, vNodeCount int) error { arc.replicateState[rpcAddr] = s - go arc.replicate(s) + if arc.bootStrap { + go arc.replicate(s) + } return nil } @@ -163,6 +222,12 @@ type State struct { // Tracks running goroutines routinesGroup sync.WaitGroup + + // protects 4 next fields + lastLock sync.Mutex + + // Cache the latest log + lastLogIndex uint64 } // Start a goroutine and properly handle the race between a routine @@ -178,3 +243,16 @@ func (r *State) goFunc(f func()) { func (r *State) waitShutdown() { r.routinesGroup.Wait() } + +func (r *State) setLastLog(index uint64) { + r.lastLock.Lock() + r.lastLogIndex = index + r.lastLock.Unlock() +} + +func (r *State) getLastLog() (index uint64) { + r.lastLock.Lock() + index = r.lastLogIndex + r.lastLock.Unlock() + return +} diff --git a/internal/locus/arc_test.go b/internal/locus/arc_test.go index b052d51..ffee9b8 100644 --- a/internal/locus/arc_test.go +++ b/internal/locus/arc_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" + "github.com/Brijeshlakkad/goutube/internal/locus/pointcron" + streaming_api "github.com/Brijeshlakkad/goutube/api/streaming/v1" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" @@ -21,33 +23,47 @@ func TestArc_FSM(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(dataDir) - lociManager, err := NewLociManager(dataDir, Config{}) + locusDir, err := createDirectory(dataDir, "locus") + require.NoError(t, err) + + logDir, err := createDirectory(dataDir, "log") + require.NoError(t, err) + + logStore, err := NewInMomoryPointStore(logDir) + require.NoError(t, err) + + locus := setupTestLocus(t, locusDir) + fsm := &fsm{ - lociManager, + locus, } - arc := NewArc(ArcConfig{ + arc, err := NewArc(ArcConfig{ StreamLayer: streamLayer, fsm: fsm, + store: logStore, + Bundler: &RequestBundler{}, + Bootstrap: false, }) + require.NoError(t, err) for i := 0; i < 10; i++ { data := []byte(fmt.Sprintf("Test data line %d", i)) - resp, err := apply(arc, AppendRequestType, &streaming_api.ProduceRequest{Locus: locusId, Point: pointId, Frame: data}) + resp, err := apply(arc, AppendRequestType, &streaming_api.ProduceRequest{Point: pointId, Frame: data}) require.NoError(t, err) - _ = resp.(*streaming_api.ProduceResponse).Offset + records := resp.(*streaming_api.ProduceResponse).Records + require.Equal(t, 1, len(records)) } - for i := 0; i < 10; i++ { - var pos uint64 - for i := uint64(0); i < 10; i++ { - data := []byte(fmt.Sprintf("Test data line %d", i)) - read, err := lociManager.Read(locusId, pointId, pos) - require.NoError(t, err) - require.Equal(t, data, read) - pos += uint64(len(data)) + lenWidth - } + var pos uint64 + for i := uint64(0); i < 10; i++ { + data := []byte(fmt.Sprintf("Test data line %d", i)) + nextOffset, read, err := locus.Read(pointId, pos) + require.NoError(t, err) + require.Equal(t, data, read) + pos += uint64(len(data)) + lenWidth + require.Equal(t, pos, nextOffset) } } @@ -56,21 +72,32 @@ func TestArc_Followers(t *testing.T) { streamLayer, err := newTCPStreamLayer("localhost:0", nil) require.NoError(t, err) - dataDir, err := ioutil.TempDir("", "arc-test") + dataDir_Leader, err := ioutil.TempDir("", "arc-test") + require.NoError(t, err) + defer os.RemoveAll(dataDir_Leader) + + locusDir_Leader, err := createDirectory(dataDir_Leader, "locus") + require.NoError(t, err) + + logDir_Leader, err := createDirectory(dataDir_Leader, "log") + require.NoError(t, err) + + logStore_Leader, err := NewInMomoryPointStore(logDir_Leader) require.NoError(t, err) - defer func(dir string) { - _ = os.RemoveAll(dir) - }(dataDir) - lociManager, err := NewLociManager(dataDir, Config{}) + locus_Leader := setupTestLocus(t, locusDir_Leader) fsm_leader := &fsm{ - lociManager, + locus_Leader, } - arc_leader := NewArc(ArcConfig{ + arc_leader, err := NewArc(ArcConfig{ StreamLayer: streamLayer, fsm: fsm_leader, + store: logStore_Leader, + Bundler: &RequestBundler{}, + Bootstrap: true, }) + require.NoError(t, err) // Follower Arc streamLayer_Follower, err := newTCPStreamLayer("localhost:0", nil) @@ -78,19 +105,30 @@ func TestArc_Followers(t *testing.T) { dataDir_Follower, err := ioutil.TempDir("", "arc-test") require.NoError(t, err) - defer func(dir string) { - _ = os.RemoveAll(dir) - }(dataDir) + defer os.RemoveAll(dataDir_Follower) + + locusDir_Follower, err := createDirectory(dataDir_Follower, "locus") + require.NoError(t, err) + + logDir_Follower, err := createDirectory(dataDir_Follower, "log") + require.NoError(t, err) + + logStore_Follower, err := NewInMomoryPointStore(logDir_Follower) + require.NoError(t, err) - lociManager_Follower, err := NewLociManager(dataDir_Follower, Config{}) + locus_Follower := setupTestLocus(t, locusDir_Follower) fsm_Follower := &fsm{ - lociManager_Follower, + locus_Follower, } - _ = NewArc(ArcConfig{ + _, err = NewArc(ArcConfig{ StreamLayer: streamLayer_Follower, fsm: fsm_Follower, + store: logStore_Follower, + Bundler: &RequestBundler{}, + Bootstrap: false, }) + require.NoError(t, err) followerState, err := NewFollower(ServerAddress(streamLayer_Follower.Addr().String())) arc_leader.replicateState[streamLayer_Follower.Addr().String()] = followerState @@ -100,22 +138,24 @@ func TestArc_Followers(t *testing.T) { for i := 0; i < 10; i++ { data := []byte(fmt.Sprintf("Test data line %d", i)) - resp, err := apply(arc_leader, AppendRequestType, &streaming_api.ProduceRequest{Locus: locusId, Point: pointId, Frame: data}) + resp, err := apply(arc_leader, AppendRequestType, &streaming_api.ProduceRequest{Point: pointId, Frame: data}) require.NoError(t, err) - _ = resp.(*streaming_api.ProduceResponse).Offset + records := resp.(*streaming_api.ProduceResponse).Records + require.Equal(t, 1, len(records)) } // Wait for replication to get completed! - time.Sleep(3 * time.Second) + time.Sleep(5 * time.Second) var pos uint64 for i := uint64(0); i < 10; i++ { data := []byte(fmt.Sprintf("Test data line %d", i)) - read, err := lociManager_Follower.Read(locusId, pointId, pos) + nextOffset, read, err := locus_Follower.Read(pointId, pos) require.NoError(t, err) require.Equal(t, data, read) pos += uint64(len(data)) + lenWidth + require.Equal(t, pos, nextOffset) } } @@ -142,6 +182,20 @@ func apply(arc *Arc, reqType RequestType, req proto.Message) ( if err := commandPromise.Error(); err != nil { return nil, err } - res := commandPromise.Response().(*CommandResponse) + res := commandPromise.Response().(*RecordResponse) return res.Response, nil } + +func setupTestLocus(t *testing.T, dataDir string) *Locus { + c := Config{} + pointcronConfig := pointcron.Config{} + pointcronConfig.CloseTimeout = 1 * time.Second + pointcronConfig.TickTime = time.Second + c.Point.pointScheduler = pointcron.NewPointScheduler(pointcronConfig) + c.Point.pointScheduler.StartAsync() + + locus, err := NewLocus(dataDir, c) + require.NoError(t, err) + + return locus +} diff --git a/internal/locus/config.go b/internal/locus/config.go index 4a1363a..336861a 100644 --- a/internal/locus/config.go +++ b/internal/locus/config.go @@ -4,14 +4,17 @@ import ( "time" "github.com/Brijeshlakkad/goutube/internal/locus/pointcron" + "github.com/hashicorp/go-hclog" ) type Config struct { Distributed struct { - LocalID string - Bootstrap bool - BindAdr string - StreamLayer *LocusStreamLayer + LocalID string + Bootstrap bool + BindAdr string + StreamLayer *LocusStreamLayer + StoreAddress string + Logger hclog.Logger } Point struct { TickTime time.Duration diff --git a/internal/locus/distributed.go b/internal/locus/distributed.go index cd5e735..4a1389a 100644 --- a/internal/locus/distributed.go +++ b/internal/locus/distributed.go @@ -5,141 +5,157 @@ import ( "crypto/tls" "fmt" "net" - "os" - "path/filepath" "sync" "time" streaming_api "github.com/Brijeshlakkad/goutube/api/streaming/v1" + "github.com/Brijeshlakkad/goutube/internal/locus/pointcron" + "github.com/hashicorp/go-hclog" "google.golang.org/protobuf/proto" ) type DistributedLoci struct { config Config - loci *LociManager + locus *Locus + store Store + logger hclog.Logger arc *Arc mu sync.Mutex + + bundler *RequestBundler } func NewDistributedLoci(dataDir string, config Config) ( *DistributedLoci, error, ) { + if config.Distributed.Logger == nil { + config.Distributed.Logger = hclog.New(&hclog.LoggerOptions{ + Name: "distributed-loci", + Output: hclog.DefaultOutput, + Level: hclog.DefaultLevel, + }) + } d := &DistributedLoci{ - config: config, + config: config, + bundler: &RequestBundler{}, + } + var err error + + if err = d.setupLociManager(dataDir); err != nil { + return nil, err } - if err := d.setupLociManager(dataDir); err != nil { + if err = d.setupStore(dataDir); err != nil { return nil, err } + arcConfig := ArcConfig{ StreamLayer: config.Distributed.StreamLayer, - fsm: &fsm{d.loci}, + fsm: &fsm{ + locus: d.locus, + }, + store: d.store, + Bundler: d.bundler, + Bootstrap: config.Distributed.Bootstrap, + } + d.arc, err = NewArc(arcConfig) + if err != nil { + return nil, err } - d.arc = NewArc(arcConfig) return d, nil } func (d *DistributedLoci) setupLociManager(dataDir string) error { - lociDir := filepath.Join(dataDir, "loci") - // Create a hierarchy of directories if necessary - if err := os.MkdirAll(lociDir, 0755); err != nil { + lociDir, err := createDirectory(dataDir, "locus") + if err != nil { return err } - var err error - d.loci, err = NewLociManager(lociDir, d.config) + d.config.Point.pointScheduler = pointcron.NewPointScheduler(pointcron.Config{ + CloseTimeout: d.config.Point.CloseTimeout, + TickTime: d.config.Point.TickTime, + }) + d.config.Point.pointScheduler.StartAsync() + + d.locus, err = NewLocus(lociDir, d.config) return err } -func (d *DistributedLoci) GetLoci() []string { - return d.loci.GetLoci() +func (d *DistributedLoci) setupStore(dataDir string) (err error) { + logDir, err := createDirectory(dataDir, "log") + if err != nil { + return err + } + d.store, err = NewInMomoryPointStore(logDir) + return err } -func (d *DistributedLoci) GetPoints(locusId string) []string { - return d.loci.GetPoints(locusId) +func (d *DistributedLoci) GetPoints() []string { + return d.locus.GetPoints() } -func (d *DistributedLoci) Append(locusId string, pointId string, b []byte) (pos uint64, err error) { +func (d *DistributedLoci) Append(record *streaming_api.ProduceRequest) (pos uint64, err error) { d.mu.Lock() defer d.mu.Unlock() - apply, err := d.apply(AppendRequestType, &streaming_api.ProduceRequest{Locus: locusId, Point: pointId, Frame: b}) + apply, err := d.apply(AppendRequestType, record.Point, record.Frame) if err != nil { return 0, err } - return apply.(*streaming_api.ProduceResponse).Offset, nil + records := apply.(*streaming_api.ProduceResponse).Records + if len(records) > 0 { + return apply.(*streaming_api.ProduceResponse).Records[0].Offset, nil + } + return 0, nil } -func (d *DistributedLoci) apply(reqType RequestType, req proto.Message) ( +func (d *DistributedLoci) apply(reqType RequestType, key interface{}, value interface{}) ( interface{}, error, ) { - var buf bytes.Buffer - _, err := buf.Write([]byte{byte(reqType)}) - if err != nil { - return nil, err - } - b, err := proto.Marshal(req) - if err != nil { - return nil, err - } - - _, err = buf.Write(b) + b, err := d.bundler.Build(reqType, key, value) if err != nil { return nil, err } timeout := 10 * time.Second - commandPromise := d.arc.Apply(buf.Bytes(), timeout) + commandPromise := d.arc.Apply(b, timeout) if err := commandPromise.Error(); err != nil { return nil, err } - res := commandPromise.Response().(*CommandResponse) + res := commandPromise.Response().(*RecordResponse) return res.Response, nil } -func (d *DistributedLoci) Read(locusId string, pointId string, pos uint64) ([]byte, error) { - return d.loci.Read(locusId, pointId, pos) -} - -func (d *DistributedLoci) ReadAt(locusId string, pointId string, b []byte, off int64) (int, error) { - return d.loci.ReadAt(locusId, pointId, b, off) -} - -func (d *DistributedLoci) Close(locusId string) error { - d.mu.Lock() - defer d.mu.Unlock() - - return d.loci.Close(locusId) +func (d *DistributedLoci) Read(pointId string, pos uint64) ([]byte, error) { + _, b, err := d.locus.Read(pointId, pos) + return b, err } -func (d *DistributedLoci) ClosePoint(locusId string, pointId string) error { - d.mu.Lock() - defer d.mu.Unlock() - - return d.loci.ClosePoint(locusId, pointId) +func (d *DistributedLoci) ReadAt(pointId string, b []byte, off uint64) (int, error) { + return d.locus.ReadAt(pointId, b, off) } -func (d *DistributedLoci) CloseAll() error { +func (d *DistributedLoci) ClosePoint(pointId string) error { d.mu.Lock() defer d.mu.Unlock() - return d.loci.CloseAll() + return d.locus.Close(pointId) } -func (d *DistributedLoci) Remove(locusId string) error { +func (d *DistributedLoci) Close() error { d.mu.Lock() defer d.mu.Unlock() - return d.loci.Remove(locusId) + return d.locus.CloseAll() } -func (d *DistributedLoci) RemoveAll() error { +func (d *DistributedLoci) Remove() error { d.mu.Lock() defer d.mu.Unlock() - return d.loci.RemoveAll() + return d.locus.RemoveAll() } func (d *DistributedLoci) Join(rpcAddr string, vNodeCount int) error { @@ -217,29 +233,67 @@ func (s *LocusStreamLayer) Addr() net.Addr { var _ FSM = (*fsm)(nil) type fsm struct { - loci *LociManager + locus *Locus } // Apply Invokes this method after committing a log entry. -func (l *fsm) Apply(record *CommandRequest) interface{} { +func (f *fsm) Apply(record *RecordRequest) *FSMRecordResponse { buf := record.Data reqType := RequestType(buf[0]) switch reqType { case AppendRequestType: - return l.applyAppend(buf[1:]) + return f.applyAppend(buf[1:]) } return nil } -func (l *fsm) applyAppend(b []byte) interface{} { +func (f *fsm) applyAppend(b []byte) *FSMRecordResponse { var req streaming_api.ProduceRequest err := proto.Unmarshal(b, &req) if err != nil { - return err + return &FSMRecordResponse{Response: err} } - offset, err := l.loci.Append(req.GetLocus(), req.Point, req.GetFrame()) + nextOffset, offset, err := f.locus.Append(req.GetPoint(), req.GetFrame()) if err != nil { - return err + return &FSMRecordResponse{Response: err} + } + return &FSMRecordResponse{ + StoreKey: req.Point, + StoreValue: nextOffset, + Response: &streaming_api.ProduceResponse{Records: []*streaming_api.Record{ + { + Point: req.Point, + Offset: offset, + }, + }}, + } +} + +func (f *fsm) Read(key string, offset uint64) (uint64, []byte, error) { + return f.locus.Read(key, offset) +} + +type RequestBundler struct { +} + +func (rb *RequestBundler) Build(header interface{}, key interface{}, value interface{}) ( + []byte, + error, +) { + req := &streaming_api.ProduceRequest{Point: key.(string), Frame: value.([]byte)} + var buf bytes.Buffer + _, err := buf.Write([]byte{byte(header.(RequestType))}) + if err != nil { + return nil, err + } + b, err := proto.Marshal(req) + if err != nil { + return nil, err + } + + _, err = buf.Write(b) + if err != nil { + return nil, err } - return &streaming_api.ProduceResponse{Offset: offset} + return buf.Bytes(), nil } diff --git a/internal/locus/distributed_test.go b/internal/locus/distributed_test.go index e92f2a8..5ba3625 100644 --- a/internal/locus/distributed_test.go +++ b/internal/locus/distributed_test.go @@ -8,43 +8,54 @@ import ( "testing" "time" + streaming_v1 "github.com/Brijeshlakkad/goutube/api/streaming/v1" "github.com/Brijeshlakkad/goutube/internal/locus/pointcron" "github.com/stretchr/testify/require" ) +var ( + pointId = "sample_test_file" +) + func TestDistributedLoci_Create_Append_Read(t *testing.T) { - distributedLoci_Leader := testCreatedDistributedLoci(t) - distributedLoci_Follower := testCreatedDistributedLoci(t) + dataDir_Leader, err := ioutil.TempDir("", "distributed-locus-test") + require.NoError(t, err) + defer os.RemoveAll(dataDir_Leader) - err := distributedLoci_Leader.Join(distributedLoci_Follower.config.Distributed.StreamLayer.Addr().String(), 0) + dataDir_Follower, err := ioutil.TempDir("", "distributed-locus-test") + require.NoError(t, err) + defer os.RemoveAll(dataDir_Follower) + + distributedLoci_Leader := setupTestDistributedLoci(t, dataDir_Leader, true, "distributed-locus-0") + distributedLoci_Follower := setupTestDistributedLoci(t, dataDir_Follower, false, "distributed-locus-1") + + err = distributedLoci_Leader.Join(distributedLoci_Follower.config.Distributed.StreamLayer.Addr().String(), 0) require.NoError(t, err) for i := 0; i < 10; i++ { data := []byte(fmt.Sprintf("Test data line %d", i)) - _, err := distributedLoci_Leader.Append(locusId, pointId, data) + record := &streaming_v1.ProduceRequest{ + Point: pointId, + Frame: data, + } + _, err := distributedLoci_Leader.Append(record) require.NoError(t, err) } // Wait for replication to get completed! - time.Sleep(3 * time.Second) + time.Sleep(5 * time.Second) - for i := 0; i < 10; i++ { - var pos uint64 - for i := uint64(0); i < 10; i++ { - data := []byte(fmt.Sprintf("Test data line %d", i)) - read, err := distributedLoci_Follower.Read(locusId, pointId, pos) - require.NoError(t, err) - require.Equal(t, data, read) - pos += uint64(len(data)) + lenWidth - } + var pos uint64 + for i := uint64(0); i < 10; i++ { + data := []byte(fmt.Sprintf("Test data line %d", i)) + read, err := distributedLoci_Follower.Read(pointId, pos) + require.NoError(t, err) + require.Equal(t, data, read) + pos += uint64(len(data)) + lenWidth } } -func testCreatedDistributedLoci(t *testing.T) *DistributedLoci { - dataDir, err := ioutil.TempDir("", "distributed-loci-test") - require.NoError(t, err) - defer os.RemoveAll(dataDir) - +func setupTestDistributedLoci(t *testing.T, dataDir string, bootstrap bool, localId string) *DistributedLoci { listener, err := net.Listen("tcp", "localhost:0") require.NoError(t, err) @@ -54,7 +65,8 @@ func testCreatedDistributedLoci(t *testing.T) *DistributedLoci { nil, nil, } - c.Distributed.LocalID = "distributed-loci-0" + c.Distributed.LocalID = localId + c.Distributed.Bootstrap = bootstrap pointcronConfig := pointcron.Config{} pointcronConfig.CloseTimeout = 3 * time.Second pointcronConfig.TickTime = time.Second diff --git a/internal/locus/fsm.go b/internal/locus/fsm.go index d63c615..f71d168 100644 --- a/internal/locus/fsm.go +++ b/internal/locus/fsm.go @@ -1,40 +1,56 @@ package locus type FSM interface { - Apply(command *CommandRequest) interface{} + Apply(command *RecordRequest) *FSMRecordResponse + Read(key string, offset uint64) (uint64, []byte, error) } func (arc *Arc) runFSM() { - applySingle := func(cp *CommandPromise) { - var resp interface{} + applySingle := func(cp *RecordPromise) interface{} { + var resp *FSMRecordResponse // Make sure we send a response defer func() { // Invoke the promise if given if cp != nil { - cp.respond(resp) + cp.respond(resp.Response) cp.respondError(nil) } }() resp = arc.fsm.Apply(cp.req) + + if err := arc.store.AddPointEvent(resp.StoreKey.(string), resp.StoreValue.(uint64)); err != nil { + arc.logger.Error("failed to add event to the store", "error", err) + } + + return resp.StoreKey } for { - var cp *CommandPromise + var cp *RecordPromise select { case cp = <-arc.applyCh: - applySingle(cp) + key := applySingle(cp) - // Synchronous notifying other replicas. - arc.replicateStateLock.Lock() - // Iterate through all the replicas to notify of the change. - for _, server := range arc.replicateState { - server.triggerCh <- cp.req - } - arc.replicateStateLock.Unlock() + // Async notifying replicas. + arc.notifyFollowers(key) case <-arc.shutdownCh: return } } } + +func (arc *Arc) notifyFollowers(key interface{}) { + // Iterate through all the replicas to notify of the change. + for _, follower := range arc.replicateState { + asyncNotifyCh(follower.triggerCh, key) + } +} + +func asyncNotifyCh(ch chan interface{}, key interface{}) { + select { + case ch <- key: + default: + } +} diff --git a/internal/locus/in_memory_store.go b/internal/locus/in_memory_store.go new file mode 100644 index 0000000..7de0c8c --- /dev/null +++ b/internal/locus/in_memory_store.go @@ -0,0 +1,70 @@ +package locus + +import ( + "errors" + "sync" + + "github.com/Brijeshlakkad/goutube/internal/log" +) + +var ( + ErrPointNotFoundInMemory = errors.New("point not found in the store") +) + +// InMemoryPointStore manages points and its last offset +type InMemoryPointStore struct { + client map[string]*inMemoryPointValue + log *log.Log // Backup to warm up the cache +} + +type inMemoryLogPoint struct { + Point string + Offset uint64 +} + +type inMemoryPointValue struct { + PointOffset uint64 + LogOffset uint64 + mu sync.Mutex +} + +func (store *InMemoryPointStore) AddPointEvent(pointId string, offset uint64) error { + logOffset, err := store.log.Append(&log.Record{Value: &inMemoryLogPoint{Point: pointId, Offset: offset}}) + if err != nil { + return err + } + + if _, ok := store.client[pointId]; !ok { + store.client[pointId] = &inMemoryPointValue{} + } + + store.client[pointId].mu.Lock() + defer store.client[pointId].mu.Unlock() + + // Store both log and point offsets. + store.client[pointId].PointOffset = offset + store.client[pointId].LogOffset = logOffset + + return nil +} + +func (store *InMemoryPointStore) GetPointEvent(pointId string) (uint64, error) { + if _, ok := store.client[pointId]; !ok { + return 0, ErrPointNotFoundInMemory + } + store.client[pointId].mu.Lock() + defer store.client[pointId].mu.Unlock() + return store.client[pointId].PointOffset, nil +} + +func NewInMomoryPointStore(dir string) (*InMemoryPointStore, error) { + logStore, err := log.NewLog(dir, log.Config{}) + if err != nil { + return nil, err + } + + return &InMemoryPointStore{ + client: make(map[string]*inMemoryPointValue), + log: logStore, + }, nil +} diff --git a/internal/locus/loci-manager.go b/internal/locus/loci-manager.go deleted file mode 100644 index 28b7e76..0000000 --- a/internal/locus/loci-manager.go +++ /dev/null @@ -1,189 +0,0 @@ -package locus - -import ( - "io/ioutil" - "os" - "sync" - - streaming_api "github.com/Brijeshlakkad/goutube/api/streaming/v1" - "github.com/Brijeshlakkad/goutube/internal/locus/pointcron" -) - -type LociManager struct { - mu *sync.Mutex - Config Config - - Dir string - loci map[string]*Locus -} - -func NewLociManager(dir string, config Config) (*LociManager, error) { - config.Point.pointScheduler = pointcron.NewPointScheduler(pointcron.Config{ - CloseTimeout: config.Point.CloseTimeout, - TickTime: config.Point.TickTime, - }) - config.Point.pointScheduler.StartAsync() - - l := &LociManager{ - Config: config, - Dir: dir, - mu: new(sync.Mutex), - loci: make(map[string]*Locus), - } - return l, l.setup() -} - -func (l *LociManager) setup() error { - directories, err := ioutil.ReadDir(l.Dir) - if err != nil { - return err - } - for _, directory := range directories { - lo, err := newLocus(l.Dir, directory.Name(), l.Config) - if err != nil { - return err - } - l.loci[lo.locusId] = lo - } - return nil -} - -func (l *LociManager) addLocus(locusId string) (*Locus, error) { - lo, err := newLocus(l.Dir, locusId, l.Config) - if err != nil { - return nil, err - } - - l.loci[lo.locusId] = lo - return lo, nil -} - -func (l *LociManager) GetLoci() []string { - list := make([]string, 0, len(l.loci)) - - for locusId, _ := range l.loci { - list = append(list, locusId) - } - return list -} - -func (l *LociManager) GetPoints(locusId string) []string { - lo, err := l.get(locusId) - if err != nil { - return nil - } - - return lo.GetPoints() -} - -func (l *LociManager) get(locusId string) (*Locus, error) { - p, ok := l.loci[locusId] - if !ok { - return nil, streaming_api.LocusNotFound{LocusId: locusId} - } - return p, nil -} - -func (l *LociManager) Append(locusId string, pointId string, b []byte) (pos uint64, err error) { - l.mu.Lock() - defer l.mu.Unlock() - - lo, err := l.get(locusId) - if _, ok := err.(streaming_api.LocusNotFound); ok { - lo, err = l.addLocus(locusId) - - l.loci[lo.locusId] = lo - } - - _, pos, err = lo.Append(pointId, b) - return pos, err -} - -func (l *LociManager) Read(locusId string, pointId string, pos uint64) ([]byte, error) { - l.mu.Lock() - defer l.mu.Unlock() - - lo, err := l.get(locusId) - if err != nil { - return nil, err - } - - return lo.Read(pointId, pos) -} - -func (l *LociManager) ReadAt(locusId string, pointId string, b []byte, off int64) (int, error) { - l.mu.Lock() - defer l.mu.Unlock() - - lo, err := l.get(locusId) - if err != nil { - return 0, err - } - - return lo.ReadAt(pointId, b, off) -} - -func (l *LociManager) Close(locusId string) error { - l.mu.Lock() - defer l.mu.Unlock() - - lo, err := l.get(locusId) - if err != nil { - return err - } - return lo.CloseAll() -} - -func (l *LociManager) ClosePoint(locusId string, pointId string) error { - l.mu.Lock() - defer l.mu.Unlock() - - lo, err := l.get(locusId) - if err != nil { - return err - } - return lo.Close(pointId) -} - -func (l *LociManager) CloseAll() error { - l.mu.Lock() - defer l.mu.Unlock() - for _, locus := range l.loci { - if err := locus.CloseAll(); err != nil { - return err - } - } - return nil -} - -func (l *LociManager) Remove(locusId string) error { - lo, err := l.get(locusId) - if err != nil { - return err - } - if err := lo.RemoveAll(); err != nil { - return err - } - delete(l.loci, locusId) - return nil -} - -func (l *LociManager) RemoveAll() error { - if err := l.CloseAll(); err != nil { - return err - } - for locusId, locus := range l.loci { - if err := locus.RemoveAll(); err != nil { - return err - } - delete(l.loci, locusId) - } - return os.RemoveAll(l.Dir) -} - -func (l *LociManager) Reset() error { - if err := l.RemoveAll(); err != nil { - return err - } - return l.setup() -} diff --git a/internal/locus/loci-manager_test.go b/internal/locus/loci-manager_test.go deleted file mode 100644 index 78b819b..0000000 --- a/internal/locus/loci-manager_test.go +++ /dev/null @@ -1,108 +0,0 @@ -package locus - -import ( - "fmt" - "io/ioutil" - "os" - "testing" - - streaming_api "github.com/Brijeshlakkad/goutube/api/streaming/v1" - . "github.com/Brijeshlakkad/goutube/internal/test_util" - "github.com/stretchr/testify/require" -) - -var ( - locusId = "goutube-client" - pointId = "sample_test_file" -) - -func TestLocusManager(t *testing.T) { - for scenario, fn := range map[string]func( - t *testing.T, lm *LociManager, - ){ - "create five loci": testCreateLoci, - "append and read a record succeeds": testLocusAppendRead, - "locus should found": testLocusShouldFound, - "locus not found": testLocusNotFoundErr, - "append on non-existing point": testNotExistingLocusAppend, - "remove locus": testRemoveLocus, - } { - t.Run(scenario, func(t *testing.T) { - parentDir, err := ioutil.TempDir("", "locus-manager-test") - require.NoError(t, err) - defer os.RemoveAll(parentDir) - - c := Config{} - loci, err := NewLociManager(parentDir, c) - require.NoError(t, err) - - fn(t, loci) - }) - } -} - -func testCreateLoci(t *testing.T, lm *LociManager) { - locusCount := 5 - for i := 0; i < locusCount; i++ { - newLocusId := fmt.Sprintf("%s-%d", locusId, i) - _, err := lm.addLocus(newLocusId) - require.NoError(t, err) - } - - require.Equal(t, locusCount, len(lm.GetLoci())) -} - -func testLocusAppendRead(t *testing.T, lm *LociManager) { - locus, err := lm.addLocus(locusId) - require.NoError(t, err) - - defer lm.Remove(locusId) - - pos, err := lm.Append(locus.locusId, pointId, testWrite) - require.Equal(t, uint64(0), pos) - require.NoError(t, err) - - b, err := lm.Read(locus.locusId, pointId, 0) - require.Equal(t, b, testWrite) -} - -func testLocusShouldFound(t *testing.T, lm *LociManager) { - locus, err := lm.addLocus(locusId) - require.NoError(t, err) - - defer lm.Remove(locus.locusId) - - _, err = lm.get(locus.locusId) - require.NoError(t, err) -} - -func testLocusNotFoundErr(t *testing.T, lm *LociManager) { - _, err := lm.get(locusId) - apiErr := err.(streaming_api.LocusNotFound) - require.Equal(t, locusId, apiErr.LocusId) -} - -func testNotExistingLocusAppend(t *testing.T, lm *LociManager) { - if err := lm.Remove(locusId); err != nil { - require.Error(t, err) - } - - got := PanicValue(func() { - _, _ = lm.Append(locusId, pointId, testWrite) - }) - _, ok := got.(error) - if ok { - t.Error("Expected No Error") - } -} - -func testRemoveLocus(t *testing.T, lm *LociManager) { - locus, err := lm.addLocus(locusId) - require.NoError(t, err) - - err = lm.Remove(locus.locusId) - - _, err = lm.Read(locus.locusId, pointId, 0) - apiErr := err.(streaming_api.LocusNotFound) - require.Equal(t, locusId, apiErr.LocusId) -} diff --git a/internal/locus/locus.go b/internal/locus/locus.go index 0be4fa3..5d850d8 100644 --- a/internal/locus/locus.go +++ b/internal/locus/locus.go @@ -3,7 +3,6 @@ package locus import ( "io/ioutil" "os" - "path/filepath" "sync" streaming_api "github.com/Brijeshlakkad/goutube/api/streaming/v1" @@ -12,22 +11,14 @@ import ( type Locus struct { mu sync.RWMutex - locusId string locusDir string Config Config points map[string]*Point } -func newLocus(parentDir string, locusId string, config Config) (*Locus, error) { - // Create a hierarchy of directories if necessary - locusDir := filepath.Join(parentDir, locusId) - if err := os.MkdirAll(locusDir, os.ModePerm); err != nil { - return nil, err - } - +func NewLocus(locusDir string, config Config) (*Locus, error) { l := &Locus{ - locusId: locusId, locusDir: locusDir, Config: config, points: make(map[string]*Point), @@ -100,17 +91,17 @@ func (l *Locus) Append(pointId string, b []byte) (n uint64, pos uint64, err erro return point.Append(b) } -func (l *Locus) Read(pointId string, pos uint64) ([]byte, error) { +func (l *Locus) Read(pointId string, pos uint64) (uint64, []byte, error) { point, err := l.get(pointId) if err != nil { - return nil, err + return 0, nil, err } defer l.Config.Point.pointScheduler.Enqueue(point) return point.Read(pos) } -func (l *Locus) ReadAt(pointId string, b []byte, off int64) (int, error) { +func (l *Locus) ReadAt(pointId string, b []byte, off uint64) (int, error) { point, err := l.get(pointId) if err != nil { return 0, err diff --git a/internal/locus/locus_test.go b/internal/locus/locus_test.go index b7ee578..ee342b2 100644 --- a/internal/locus/locus_test.go +++ b/internal/locus/locus_test.go @@ -42,8 +42,8 @@ func TestLocus(t *testing.T) { pointcronConfig.TickTime = time.Second c.Point.pointScheduler = pointcron.NewPointScheduler(pointcronConfig) c.Point.pointScheduler.StartAsync() - require.NoError(t, err) - log, err := newLocus(parentDir, locusClient, c) + + log, err := NewLocus(parentDir, c) require.NoError(t, err) fn(t, log) @@ -73,7 +73,7 @@ func testPointAppendRead(t *testing.T, locus *Locus) { require.Equal(t, uint64(0), pos) require.NoError(t, err) - b, err := locus.Read(point.pointId, 0) + _, b, err := locus.Read(point.pointId, 0) require.Equal(t, b, testWrite) } @@ -121,7 +121,7 @@ func testRemovePointer(t *testing.T, locus *Locus) { err = locus.Remove(pId) - _, err = locus.Read(pId, 0) + _, _, err = locus.Read(pId, 0) apiErr := err.(streaming_api.PointNotFound) require.Equal(t, pId, apiErr.PointId) } diff --git a/internal/locus/point.go b/internal/locus/point.go index 1d92c13..2060abc 100644 --- a/internal/locus/point.go +++ b/internal/locus/point.go @@ -84,34 +84,36 @@ func (p *Point) Append(b []byte) (n uint64, pos uint64, err error) { } w += lenWidth p.size += uint64(w) - return uint64(w), pos, nil + return p.size, pos, nil } -func (p *Point) Read(pos uint64) ([]byte, error) { +func (p *Point) Read(pos uint64) (uint64, []byte, error) { p.readWriteLock.Lock() defer p.readWriteLock.Unlock() if p.closed.Load().(bool) { if err := p.Open(); err != nil { - return nil, err + return 0, nil, err } } + var nextOffset uint64 if err := p.buf.Flush(); err != nil { - return nil, err + return 0, nil, err } size := make([]byte, lenWidth) if _, err := p.File.ReadAt(size, int64(pos)); err != nil { - return nil, err + return 0, nil, err } b := make([]byte, enc.Uint64(size)) if _, err := p.File.ReadAt(b, int64(pos+lenWidth)); err != nil { - return nil, err + return 0, nil, err } - return b, nil + nextOffset = pos + uint64(lenWidth) + uint64(len(b)) + return nextOffset, b, nil } -func (p *Point) ReadAt(b []byte, off int64) (int, error) { +func (p *Point) ReadAt(b []byte, off uint64) (int, error) { p.readWriteLock.Lock() defer p.readWriteLock.Unlock() @@ -124,7 +126,7 @@ func (p *Point) ReadAt(b []byte, off int64) (int, error) { if err := p.buf.Flush(); err != nil { return 0, err } - return p.File.ReadAt(b, off) + return p.File.ReadAt(b, int64(off)) } func (p *Point) Close() error { diff --git a/internal/locus/point_test.go b/internal/locus/point_test.go index adba1c4..aaeb338 100644 --- a/internal/locus/point_test.go +++ b/internal/locus/point_test.go @@ -1,11 +1,12 @@ package locus import ( - "github.com/stretchr/testify/require" "io/ioutil" "os" "path/filepath" "testing" + + "github.com/stretchr/testify/require" ) var ( @@ -34,9 +35,9 @@ func TestPointAppendRead(t *testing.T) { func testAppend(t *testing.T, p *Point) { t.Helper() for i := uint64(1); i < 4; i++ { - n, pos, err := p.Append(write) + n, _, err := p.Append(write) require.NoError(t, err) - require.Equal(t, pos+n, width*i) + require.Equal(t, n, width*i) } } @@ -44,21 +45,22 @@ func testRead(t *testing.T, p *Point) { t.Helper() var pos uint64 for i := uint64(1); i < 4; i++ { - read, err := p.Read(pos) + nextOffset, read, err := p.Read(pos) require.NoError(t, err) require.Equal(t, write, read) pos += width + require.Equal(t, pos, nextOffset) } } func testReadAt(t *testing.T, p *Point) { t.Helper() - for i, off := uint64(1), int64(0); i < 4; i++ { + for i, off := uint64(1), uint64(0); i < 4; i++ { b := make([]byte, lenWidth) n, err := p.ReadAt(b, off) require.NoError(t, err) require.Equal(t, lenWidth, n) - off += int64(n) + off += uint64(n) size := enc.Uint64(b) b = make([]byte, size) @@ -66,7 +68,7 @@ func testReadAt(t *testing.T, p *Point) { require.NoError(t, err) require.Equal(t, write, b) require.Equal(t, int(size), n) - off += int64(n) + off += uint64(n) } } diff --git a/internal/locus/promise.go b/internal/locus/promise.go index 8cde987..621ef6b 100644 --- a/internal/locus/promise.go +++ b/internal/locus/promise.go @@ -1,77 +1,82 @@ package locus -type CommandRequest struct { +type RecordRequest struct { Data []byte } -type CommandResponse struct { +type RecordResponse struct { + LastOff uint64 Response interface{} } +type FSMRecordResponse struct { + StoreKey interface{} + StoreValue interface{} + Response interface{} +} + type Promise interface { + Error() error Response() interface{} } -type CommandPromise struct { - req *CommandRequest - resp *CommandResponse - +type promiseError struct { err error errCh chan error responded bool ShutdownCh chan struct{} } -func NewCommandPromise(req *CommandRequest) *CommandPromise { - return &CommandPromise{ - req: req, - resp: &CommandResponse{}, - errCh: make(chan error, 1), - ShutdownCh: make(chan struct{}), +func (p *promiseError) init() { + p.errCh = make(chan error, 1) +} + +func (p *promiseError) Error() error { + if p.err != nil { + return p.err + } + if p.errCh == nil { + panic("waiting for response on nil channel") } + select { + case p.err = <-p.errCh: + case <-p.ShutdownCh: + p.err = ErrArcShutdown + } + return p.err } -func (c *CommandPromise) init() { - c.errCh = make(chan error, 1) +func (p *promiseError) respondError(err error) { + if p.responded { + return + } + if p.errCh != nil { + p.errCh <- err + close(p.errCh) + } + p.responded = true } -func (c *CommandPromise) Request() *CommandRequest { - return c.req +type RecordPromise struct { + promiseError + req *RecordRequest + resp *RecordResponse } -func (c *CommandPromise) Response() interface{} { - return c.resp +func (c *RecordPromise) init() { + c.promiseError.init() } -func (c *CommandPromise) Error() error { - if c.err != nil { - return c.err - } - if c.errCh == nil { - panic("waiting for response on nil channel") - } - select { - case c.err = <-c.errCh: - case <-c.ShutdownCh: - c.err = ErrArcShutdown - } - return c.err +func (c *RecordPromise) Request() *RecordRequest { + return c.req } -func (c *CommandPromise) respond(resp interface{}) *CommandPromise { - c.resp.Response = resp - return c +func (c *RecordPromise) Response() interface{} { + return c.resp } -func (c *CommandPromise) respondError(err error) *CommandPromise { - if c.responded { - return c - } - if c.errCh != nil { - c.errCh <- err - close(c.errCh) - } - c.responded = true +func (c *RecordPromise) respond(resp interface{}) *RecordPromise { + c.resp.Response = resp return c } @@ -93,12 +98,12 @@ func (r *RPC) Respond(resp interface{}, err error) { } type ReplicateCommandPromise struct { - req *CommandRequest + req *RecordRequest expected interface{} - resp *CommandResponse + resp *RecordResponse } -func NewReplicateCommandPromise(req *CommandRequest, expected interface{}) *ReplicateCommandPromise { +func NewReplicateCommandPromise(req *RecordRequest, expected interface{}) *ReplicateCommandPromise { return &ReplicateCommandPromise{ req: req, expected: expected, @@ -109,10 +114,46 @@ type shutdownPromise struct { arc *Arc } -func (s *shutdownPromise) Response() interface{} { +func (s *shutdownPromise) Error() error { if s.arc == nil { return nil } s.arc.waitShutdown() return nil } + +func (s *shutdownPromise) Response() interface{} { + return nil +} + +type RecordEntriesRequest struct { + Entries []*RecordRequest +} + +type RecordEntriesResponse struct { + LastOff uint64 + Response interface{} +} + +type RecordEntriesPromise struct { + promiseError + req *RecordEntriesRequest + resp *RecordEntriesResponse +} + +func (c *RecordEntriesPromise) init() { + c.promiseError.init() +} + +func (c *RecordEntriesPromise) Request() *RecordEntriesRequest { + return c.req +} + +func (c *RecordEntriesPromise) Response() interface{} { + return c.resp +} + +func (c *RecordEntriesPromise) respond(resp interface{}) *RecordEntriesPromise { + c.resp.Response = resp + return c +} diff --git a/internal/locus/redis_store.go b/internal/locus/redis_store.go new file mode 100644 index 0000000..7b401b0 --- /dev/null +++ b/internal/locus/redis_store.go @@ -0,0 +1,72 @@ +package locus + +import ( + "github.com/Brijeshlakkad/goutube/internal/log" + "github.com/go-redis/redis" + "github.com/pelletier/go-toml" +) + +// RedisPointStore manages points and its last offset +type RedisPointStore struct { + redis *redis.Client + log *log.Log // Backup to warm up the cache +} + +type redisLogPoint struct { + Point string + Offset uint64 +} + +type redisPointValue struct { + PointOffset uint64 + LogOffset uint64 +} + +func (rps *RedisPointStore) AddPointEvent(pointId string, offset uint64) error { + logOffset, err := rps.log.Append(&log.Record{Value: &redisLogPoint{Point: pointId, Offset: offset}}) + + // Store both log and point offsets. + pointValue := &redisPointValue{PointOffset: offset, LogOffset: logOffset} + t, err := toml.Marshal(pointValue) + if err != nil { + return err + } + + return rps.redis.Set(pointId, t, 0).Err() +} + +func (rps *RedisPointStore) GetPointEvent(pointId string) (uint64, error) { + t, err := rps.redis.Get(pointId).Result() + if err != nil { + return 0, err + } + var pointValue redisPointValue + err = toml.Unmarshal([]byte(t), &pointValue) + if err != nil { + return 0, err + } + return pointValue.PointOffset, err +} + +func NewRedisPointStore(address string, dir string) (*RedisPointStore, error) { + client := redis.NewClient(&redis.Options{ + Addr: address, + Password: "", // use empty password for simplicity. should come from a secret in production + DB: 0, // use default DB + }) + + _, err := client.Ping().Result() + if err != nil { + return nil, err + } + + log, err := log.NewLog(dir, log.Config{}) + if err != nil { + return nil, err + } + + return &RedisPointStore{ + redis: client, + log: log, + }, nil +} diff --git a/internal/locus/replicate.go b/internal/locus/replicate.go index 27e40c4..7cd2176 100644 --- a/internal/locus/replicate.go +++ b/internal/locus/replicate.go @@ -10,12 +10,16 @@ type Follower struct { // peerLock protects 'peer' peerLock sync.RWMutex - triggerCh chan *CommandRequest - triggerChLock sync.RWMutex + triggerCh chan interface{} // stopCh is notified/closed when this leader steps down or the follower is // removed from the cluster. stopCh chan uint64 + + // store keeps the track of the state of other servers are for the respective key. + store map[string]uint64 + + triggerChLock sync.Mutex } func NewFollower(ServerAddress ServerAddress) (*Follower, error) { @@ -24,35 +28,87 @@ func NewFollower(ServerAddress ServerAddress) (*Follower, error) { Address: ServerAddress, }, stopCh: make(chan uint64, 1), - triggerCh: make(chan *CommandRequest, 1), + triggerCh: make(chan interface{}, 1), + store: make(map[string]uint64), }, nil } func (arc *Arc) replicate(s *Follower) { + for { + select { + case storeKeyI := <-s.triggerCh: + storeKey := storeKeyI.(string) + nextOffset := s.store[storeKey] + arc.appendEntries(s, storeKey, nextOffset) + case <-s.stopCh: + return + } + } +} + +func (arc *Arc) appendEntries(s *Follower, key string, nextOffset uint64) { s.peerLock.RLock() peer := s.peer s.peerLock.RUnlock() - for { - select { - case req := <-s.triggerCh: - func() { - pipeline, err := arc.transport.PrepareCommandTransport(peer.Address) - // rpcCh := pipeline.Consumer() - if err != nil { - arc.logger.Error("failed to pipeline appendEntries", "peer", s.peer, "error", err) - return - } - // Pipeline the append entries - out := new(CommandResponse) - _, err = pipeline.SendCommand(req, out) - if err != nil { - arc.logger.Error("failed to pipeline commands", "peer", s.peer, "error", err) - return - } - }() - case <-s.stopCh: + s.triggerChLock.Lock() + defer s.triggerChLock.Unlock() + var err error + pipeline, err := arc.transport.PrepareCommandTransport(peer.Address) + if err != nil { + arc.logger.Error("failed to pipeline appendEntries", "peer", s.peer, "error", err) + return + } + // Pipeline the append entries + lastOffset, err := arc.store.GetPointEvent(key) + + if err != nil { + arc.logger.Error("Error while synchronizing the followers", "error", err) + } + var entries []*RecordRequest + for nextOffset <= lastOffset { + var chunk []byte + var tempNextOffset uint64 + tempNextOffset, chunk, err = arc.fsm.Read(key, nextOffset) + if err != nil { + break + } + if len(chunk) > 0 { + nextOffset = tempNextOffset + data, err := arc.bundler.Build(AppendRequestType, key, chunk) + if err != nil { + arc.logger.Error("failed to build request", "peer", s.peer, "error", err) + return + } + entries = append(entries, &RecordRequest{ + Data: data, + }) + } + } + + if len(entries) > 0 { + req := &RecordEntriesRequest{ + Entries: entries, + } + + out := new(RecordEntriesResponse) + _, err = pipeline.SendCommand(req, out) + if err != nil { + arc.logger.Error("failed to pipeline commands", "peer", s.peer, "error", err) return } + + // Wait for response + respCh := pipeline.Consumer() + select { + case promise := <-respCh: + err = promise.Error() + if err != nil { + arc.logger.Error("server couldn't handle the command", "peer", s.peer, "error", err) + return + } + + s.store[key] = promise.Response().(*RecordEntriesResponse).LastOff + } } } diff --git a/internal/locus/transport.go b/internal/locus/transport.go index 08def09..bdf5383 100644 --- a/internal/locus/transport.go +++ b/internal/locus/transport.go @@ -257,7 +257,7 @@ func (transport *Transport) PrepareCommandTransport(target ServerAddress) (Comma } // SendCommand implements the Transport interface. -func (transport *Transport) SendCommand(target ServerAddress, req *CommandRequest, resp *CommandResponse) error { +func (transport *Transport) SendCommand(target ServerAddress, req *RecordEntriesRequest, resp *RecordEntriesResponse) error { return transport.genericRPC(target, req, resp) } @@ -362,7 +362,7 @@ func (transport *Transport) handleConn(connCtx context.Context, conn net.Conn) { // handleCommand is used to decode and dispatch a single command. func (transport *Transport) handleCommand(dec *codec.Decoder, enc *codec.Encoder) error { - var req CommandRequest + var req RecordEntriesRequest if err := dec.Decode(&req); err != nil { return err @@ -432,7 +432,7 @@ type commandPipeline struct { trans *Transport doneCh chan Promise - inprogressCh chan *CommandPromise + inprogressCh chan *RecordEntriesPromise shutdown bool shutdownCh chan struct{} @@ -446,7 +446,7 @@ func newNetPipeline(trans *Transport, conn *netConn) *commandPipeline { conn: conn, trans: trans, doneCh: make(chan Promise, rpcMaxPipeline), - inprogressCh: make(chan *CommandPromise, rpcMaxPipeline), + inprogressCh: make(chan *RecordEntriesPromise, rpcMaxPipeline), shutdownCh: make(chan struct{}), } go n.decodeResponses() @@ -478,12 +478,12 @@ func (cp *commandPipeline) decodeResponses() { } // SendCommand is used to pipeline a new command requests. -func (cp *commandPipeline) SendCommand(req *CommandRequest, resp *CommandResponse) (Promise, error) { - commandPromise := &CommandPromise{ +func (cp *commandPipeline) SendCommand(req *RecordEntriesRequest, resp *RecordEntriesResponse) (Promise, error) { + recordPromise := &RecordEntriesPromise{ req: req, resp: resp, } - commandPromise.init() + recordPromise.init() // Add a "send" timeout if timeout := cp.trans.timeout; timeout > 0 { @@ -491,15 +491,15 @@ func (cp *commandPipeline) SendCommand(req *CommandRequest, resp *CommandRespons } // Send the RPC - if err := sendRPC(cp.conn, commandPromise.req); err != nil { + if err := sendRPC(cp.conn, recordPromise.req); err != nil { return nil, err } // Hand-off for decoding, this can also cause back-pressure // to prevent too many inflight requests select { - case cp.inprogressCh <- commandPromise: - return commandPromise, nil + case cp.inprogressCh <- recordPromise: + return recordPromise, nil case <-cp.shutdownCh: return nil, ErrPipelineShutdown } @@ -569,7 +569,7 @@ type StreamLayer interface { type CommandPipeline interface { // SendCommand is used to add another request to the pipeline. // To send may block which is an effective form of back-pressure. - SendCommand(req *CommandRequest, resp *CommandResponse) (Promise, error) + SendCommand(req *RecordEntriesRequest, resp *RecordEntriesResponse) (Promise, error) // Consumer returns a channel that can be used to consume // response futures when they are ready. diff --git a/internal/locus/transport_test.go b/internal/locus/transport_test.go index 91186b1..49e5b66 100644 --- a/internal/locus/transport_test.go +++ b/internal/locus/transport_test.go @@ -26,11 +26,15 @@ func TestTransport_SendCommand(t *testing.T) { rpcCh := trans1.Consumer() // Make the RPC request - args := CommandRequest{ - Data: DummyData, + args := RecordEntriesRequest{ + Entries: []*RecordRequest{ + { + Data: DummyData, + }, + }, } - resp := CommandResponse{ + resp := RecordEntriesResponse{ Response: int64(1), } @@ -40,7 +44,7 @@ func TestTransport_SendCommand(t *testing.T) { select { case rpc := <-rpcCh: // Verify the command - req := rpc.Command.(*CommandRequest) + req := rpc.Command.(*RecordEntriesRequest) if !reflect.DeepEqual(req, &args) { t.Errorf("command mismatch: %#v %#v", *req, args) return @@ -72,7 +76,7 @@ func TestTransport_SendCommand(t *testing.T) { appendFunc := func() { defer wg.Done() - var out CommandResponse + var out RecordEntriesResponse if err := trans2.SendCommand(trans1.LocalAddr(), &args, &out); err != nil { t.Fatalf("err: %v", err) } @@ -122,11 +126,15 @@ func TestNetworkTransport_SendCommandPipeline(t *testing.T) { rpcCh := trans1.Consumer() // Make the RPC request - args := CommandRequest{ - Data: DummyData, + args := RecordEntriesRequest{ + Entries: []*RecordRequest{ + { + Data: DummyData, + }, + }, } - resp := CommandResponse{ + resp := RecordEntriesResponse{ Response: int64(1), } @@ -136,7 +144,7 @@ func TestNetworkTransport_SendCommandPipeline(t *testing.T) { select { case rpc := <-rpcCh: // Verify the command - req := rpc.Command.(*CommandRequest) + req := rpc.Command.(*RecordEntriesRequest) if !reflect.DeepEqual(req, &args) { t.Errorf("command mismatch: %#v %#v", *req, args) return @@ -166,7 +174,7 @@ func TestNetworkTransport_SendCommandPipeline(t *testing.T) { require.NoError(t, err) //for i := 0; i < 10; i++ { - pipelineResp := new(CommandResponse) + pipelineResp := new(RecordEntriesResponse) _, err = pipeline.SendCommand(&args, pipelineResp) require.NoError(t, err) //} diff --git a/internal/locus/types.go b/internal/locus/types.go index d1260e2..18f9326 100644 --- a/internal/locus/types.go +++ b/internal/locus/types.go @@ -17,3 +17,12 @@ type Server struct { // Address is its network address that a transport can contact. Address ServerAddress } + +type Store interface { + AddPointEvent(pointId string, offset uint64) error + GetPointEvent(pointId string) (uint64, error) +} + +type Bundler interface { + Build(header interface{}, key interface{}, value interface{}) ([]byte, error) +} diff --git a/internal/locus/util.go b/internal/locus/util.go new file mode 100644 index 0000000..f544ba1 --- /dev/null +++ b/internal/locus/util.go @@ -0,0 +1,32 @@ +package locus + +import ( + "os" + "path/filepath" +) + +// min returns the minimum. +func min(a, b uint64) uint64 { + if a <= b { + return a + } + return b +} + +// max returns the maximum. +func max(a, b uint64) uint64 { + if a >= b { + return a + } + return b +} + +// createDirectory creates the directory under parentDir. +func createDirectory(parentDir, dataDir string) (string, error) { + dir := filepath.Join(parentDir, dataDir) + // Create a hierarchy of directories if necessary + if err := os.MkdirAll(dir, 0755); err != nil { + return "", err + } + return dir, nil +} diff --git a/internal/log/log.go b/internal/log/log.go index c0af8c2..502518e 100644 --- a/internal/log/log.go +++ b/internal/log/log.go @@ -13,7 +13,7 @@ import ( ) var ( - ErrOffsetOutOfRange = errors.New("the requested offset is outside the log's range.") + ErrOffsetOutOfRange = errors.New("the requested offset is outside the log's range") ) type Log struct { @@ -76,7 +76,6 @@ func (l *Log) setup() error { func (l *Log) Append(record *Record) (uint64, error) { l.mu.Lock() defer l.mu.Unlock() - // https://github.com/travisjeffery/proglog/issues/6 off, err := l.activeSegment.Append(record) if err != nil { return 0, err @@ -138,7 +137,7 @@ func (l *Log) Reset() error { return l.setup() } -// To know what nodes have the oldest and newest data and what nodes are falling behind and need to replicate. +// LowestOffset To know what nodes have the oldest and newest data and what nodes are falling behind and need to replicate. func (l *Log) LowestOffset() (uint64, error) { l.mu.RLock() defer l.mu.RUnlock() @@ -155,7 +154,7 @@ func (l *Log) HighestOffset() (uint64, error) { return off - 1, nil } -// (In the future, periodically) call Truncate to remove old segments whose data we (hopefully) have processed by then and don’t need anymore. +// Truncate (In the future, periodically) call Truncate to remove old segments whose data we (hopefully) have processed by then and don’t need anymore. func (l *Log) Truncate(lowest uint64) error { l.mu.Lock() defer l.mu.Unlock() @@ -173,6 +172,27 @@ func (l *Log) Truncate(lowest uint64) error { return nil } +func (l *Log) GetLog(index uint64, out *Record) error { + in, err := l.Read(index) + if err != nil { + return err + } + out.Value = in.Value + out.Offset = in.Offset + return nil +} + +func (l *Log) StoreLogs(records []*Record) error { + for _, record := range records { + if _, err := l.Append(&Record{ + Value: record.Value, + }); err != nil { + return err + } + } + return nil +} + func (l *Log) Reader() io.Reader { l.mu.RLock() defer l.mu.RUnlock() @@ -195,7 +215,6 @@ func (o *originReader) Read(p []byte) (int, error) { } type Record struct { - Value []byte + Value interface{} Offset uint64 - Term uint64 } diff --git a/internal/log/log_test.go b/internal/log/log_test.go index 8870ed9..d97f828 100644 --- a/internal/log/log_test.go +++ b/internal/log/log_test.go @@ -38,7 +38,7 @@ func TestLog(t *testing.T) { func testAppendRead(t *testing.T, log *Log) { append := &Record{ - Value: []byte("hello world"), + Value: write, } off, err := log.Append(append) require.NoError(t, err) @@ -57,7 +57,7 @@ func testOutOfRangeErr(t *testing.T, log *Log) { func testInitExisting(t *testing.T, o *Log) { append := &Record{ - Value: []byte("hello world"), + Value: write, } for i := 0; i < 3; i++ { _, err := o.Append(append) @@ -85,7 +85,7 @@ func testInitExisting(t *testing.T, o *Log) { func testReader(t *testing.T, log *Log) { append := &Record{ - Value: []byte("hello world"), + Value: write, } off, err := log.Append(append) require.NoError(t, err) @@ -106,7 +106,7 @@ func testReader(t *testing.T, log *Log) { func testTruncate(t *testing.T, log *Log) { append := &Record{ - Value: []byte("hello world"), + Value: write, } for i := 0; i < 3; i++ { _, err := log.Append(append) diff --git a/internal/log/segment_test.go b/internal/log/segment_test.go index 6902998..42f1c19 100644 --- a/internal/log/segment_test.go +++ b/internal/log/segment_test.go @@ -13,7 +13,9 @@ func TestSegment(t *testing.T) { dir, _ := ioutil.TempDir("", "segment-test") defer os.RemoveAll(dir) - want := &Record{Value: []byte("hello world")} + want := &Record{ + Value: write, + } c := Config{} c.Segment.MaxStoreBytes = 1024 @@ -40,7 +42,7 @@ func TestSegment(t *testing.T) { // maxed index require.True(t, s.IsMaxed()) - c.Segment.MaxStoreBytes = uint64(len(want.Value) * 3) + c.Segment.MaxStoreBytes = uint64(len([]byte(want.Value.(string))) * 3) c.Segment.MaxIndexBytes = 1024 // segment must be closed diff --git a/internal/log/store_test.go b/internal/log/store_test.go index 01a4ae7..065915d 100644 --- a/internal/log/store_test.go +++ b/internal/log/store_test.go @@ -11,7 +11,7 @@ import ( ) var ( - write = []byte("hello world") + write = "hello world" ) func TestStoreAppendRead(t *testing.T) { diff --git a/internal/server/server.go b/internal/server/server.go index 0d4a037..4b39f58 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -44,14 +44,13 @@ func NewServer(config *Config, opts ...grpc.ServerOption) (*grpc.Server, error) return gRPCServer, nil } -type LociManager interface { - GetLoci() []string - GetPoints(string) []string - Append(string, string, []byte) (uint64, error) - Read(string, string, uint64) ([]byte, error) - ReadAt(string, string, []byte, int64) (int, error) - ClosePoint(string, string) error - CloseAll() error +type Locus interface { + GetPoints() []string + Append(record *streaming_api.ProduceRequest) (pos uint64, err error) + Read(string, uint64) ([]byte, error) + ReadAt(string, []byte, uint64) (int, error) + ClosePoint(string) error + Close() error } // Interceptor reading the subject out of the client’s cert and writing it to the RPC’s context. diff --git a/internal/server/server_test.go b/internal/server/server_test.go index ab8c87b..a09e641 100644 --- a/internal/server/server_test.go +++ b/internal/server/server_test.go @@ -21,10 +21,8 @@ import ( ) var ( - locusId = "goutube-client" - pointId = "sample_file" - locusId_2 = "second-goutube-client" - lines = 10 + pointId = "sample_file" + lines = 10 ) func TestServer(t *testing.T) { @@ -74,13 +72,23 @@ func setupTest(t *testing.T, fn func()) ( locusConfig := locus.Config{} locusConfig.Point.CloseTimeout = 10 * time.Second - lociManager, err := locus.NewLociManager(dir, locusConfig) + + lociLn, err := net.Listen("tcp", "localhost:0") + require.NoError(t, err) + + locusConfig.Distributed.StreamLayer = locus.NewStreamLayer( + lociLn, + nil, + nil, + ) + locusConfig.Distributed.BindAdr = "localhost:0" + locusInstance, err := locus.NewDistributedLoci(dir, locusConfig) require.NoError(t, err) cfg := &Config{ StreamingConfig: &StreamingConfig{ - LociManager: lociManager, - Authorizer: authorizer, + Locus: locusInstance, + Authorizer: authorizer, }, } @@ -113,8 +121,7 @@ func setupTest(t *testing.T, fn func()) ( gRPCServer.Stop() conn.Close() l.Close() - err := lociManager.RemoveAll() - fmt.Println(err) + locusInstance.Remove() } } @@ -127,17 +134,19 @@ func testProduceConsumeStream( require.NoError(t, err) for i := 0; i < lines; i++ { - err := stream.Send(&streaming_api.ProduceRequest{Locus: locusId, Point: pointId, Frame: []byte(fmt.Sprintln(i))}) + err := stream.Send(&streaming_api.ProduceRequest{Point: pointId, Frame: []byte(fmt.Sprintln(i))}) require.NoError(t, err) } resp, err := stream.CloseAndRecv() require.NoError(t, err) - require.Equal(t, uint64(90), resp.Offset) + require.Equal(t, 1, len(resp.Records)) + require.Equal(t, pointId, resp.Records[0].Point) + require.Equal(t, uint64(90), resp.Records[0].Offset) // test consume stream - resStream, err := client.ConsumeStream(context.Background(), &streaming_api.ConsumeRequest{Locus: locusId, Point: pointId}) + resStream, err := client.ConsumeStream(context.Background(), &streaming_api.ConsumeRequest{Point: pointId}) if err != nil { log.Fatalf("error while calling ConsumeStream RPC: %v", err) } diff --git a/internal/server/streaming.go b/internal/server/streaming.go index 49b4a0f..b92a71c 100644 --- a/internal/server/streaming.go +++ b/internal/server/streaming.go @@ -22,8 +22,8 @@ type Authorizer interface { } type StreamingConfig struct { - LociManager LociManager - Authorizer Authorizer + Locus Locus + Authorizer Authorizer } func (s *StreamingManager) ProduceStream(stream streaming_api.Streaming_ProduceStreamServer) error { @@ -34,19 +34,21 @@ func (s *StreamingManager) ProduceStream(stream streaming_api.Streaming_ProduceS ); err != nil { return err } - points := make(map[string]map[string]bool) - defer (func() { - for locusId, pointIds := range points { - for pointId, _ := range pointIds { - _ = s.LociManager.ClosePoint(locusId, pointId) - } - } - })() + points := make(map[string]uint64) var lastOffset uint64 for { req, err := stream.Recv() if err == io.EOF { - if err := stream.SendAndClose(&streaming_api.ProduceResponse{Offset: lastOffset}); err != nil { + var resp []*streaming_api.Record + for pointId, pointOffset := range points { + record := &streaming_api.Record{ + Point: pointId, + Offset: pointOffset, + } + _ = s.Locus.ClosePoint(pointId) + resp = append(resp, record) + } + if err := stream.SendAndClose(&streaming_api.ProduceResponse{Records: resp}); err != nil { return err } return nil @@ -54,17 +56,10 @@ func (s *StreamingManager) ProduceStream(stream streaming_api.Streaming_ProduceS if err != nil { return err } - var locusId string = req.GetLocus() - var pointId string = req.GetPoint() - - if _, locusExists := points[locusId]; !locusExists { - points[locusId] = make(map[string]bool) - } - points[locusId][pointId] = true - - if lastOffset, err = s.LociManager.Append(locusId, pointId, req.GetFrame()); err != nil { + if lastOffset, err = s.Locus.Append(req); err != nil { return err } + points[req.GetPoint()] = lastOffset } } @@ -76,9 +71,9 @@ func (s *StreamingManager) ConsumeStream(req *streaming_api.ConsumeRequest, stre ); err != nil { return err } - locusId, pointId := req.GetLocus(), req.GetPoint() - defer s.LociManager.ClosePoint(locusId, pointId) - off := int64(0) + pointId := req.GetPoint() + defer s.Locus.ClosePoint(pointId) + off := uint64(0) lenWidth := 8 for { select { @@ -86,19 +81,19 @@ func (s *StreamingManager) ConsumeStream(req *streaming_api.ConsumeRequest, stre return nil default: buf := make([]byte, lenWidth) - n, err := s.LociManager.ReadAt(locusId, pointId, buf, off) + n, err := s.Locus.ReadAt(pointId, buf, off) if err != nil { return nil } - off += int64(n) + off += uint64(n) size := enc.Uint64(buf) buf = make([]byte, size) - n, err = s.LociManager.ReadAt(locusId, pointId, buf, off) + n, err = s.Locus.ReadAt(pointId, buf, off) if err != nil { return err } - off += int64(n) + off += uint64(n) if err := stream.Send(&streaming_api.ConsumeResponse{Frame: buf}); err != nil { return err