-
Notifications
You must be signed in to change notification settings - Fork 0
/
streaming.go
116 lines (104 loc) · 2.61 KB
/
streaming.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package goutube
import (
"context"
"io"
streaming_api "github.com/Brijeshlakkad/goutube/api/streaming/v1"
)
type StreamingManager struct {
streaming_api.UnimplementedStreamingServer
*StreamingConfig
}
const (
objectWildCard = "*"
produceAction = "produce"
consumeAction = "consume"
)
type StreamingConfig struct {
Locus LocusHelper
Authorizer *authorizer
}
type LocusHelper interface {
Append(*streaming_api.ProduceRequest) (uint64, error)
GetMetadata(string) (PointMetadata, error)
ReadWithLimit(string, uint64, uint64, uint64) (uint64, []byte, error)
ClosePoint(string) error
}
func (s *StreamingManager) ProduceStream(stream streaming_api.Streaming_ProduceStreamServer) error {
if err := s.Authorizer.Authorize(
subject(stream.Context()),
objectWildCard,
produceAction,
); err != nil {
return err
}
points := make(map[string]uint64)
var lastOffset uint64
for {
req, err := stream.Recv()
if err == io.EOF {
var resp []*streaming_api.Record
for pointId, pointOffset := range points {
record := &streaming_api.Record{
Point: pointId,
Offset: pointOffset,
}
_ = s.Locus.ClosePoint(pointId)
resp = append(resp, record)
}
if err := stream.SendAndClose(&streaming_api.ProduceResponse{Records: resp}); err != nil {
return err
}
return nil
}
if err != nil {
return err
}
if lastOffset, err = s.Locus.Append(req); err != nil {
return err
}
points[req.GetPoint()] = lastOffset
}
}
func (s *StreamingManager) ConsumeStream(req *streaming_api.ConsumeRequest, stream streaming_api.Streaming_ConsumeStreamServer) error {
if err := s.Authorizer.Authorize(
subject(stream.Context()),
objectWildCard,
consumeAction,
); err != nil {
return err
}
pointId := req.GetPoint()
defer s.Locus.ClosePoint(pointId)
off := req.GetOffset()
limit := req.GetLimit()
chunkSize := req.GetChunkSize()
for {
select {
case <-stream.Context().Done():
return nil
default:
nextOff, buf, err := s.Locus.ReadWithLimit(pointId, off, chunkSize, limit)
if err != nil {
return nil
}
off = nextOff
if err := stream.Send(&streaming_api.ConsumeResponse{Frame: buf}); err != nil {
return err
}
}
}
}
func (s *StreamingManager) GetMetadata(ctx context.Context, req *streaming_api.MetadataRequest) (*streaming_api.MetadataResponse, error) {
metadata, err := s.Locus.GetMetadata(req.GetPoint())
if err != nil {
return nil, err
}
return &streaming_api.MetadataResponse{
Size: metadata.size,
}, nil
}
func NewStreamingServer(config *StreamingConfig) (*StreamingManager, error) {
return &StreamingManager{
StreamingConfig: config,
}, nil
}