Skip to content

Commit

Permalink
feature: split writer to pipeline (#7)
Browse files Browse the repository at this point in the history
* feature: split writer to pipeline
  • Loading branch information
akurilov committed Aug 1, 2023
1 parent 40784c0 commit 9430a25
Show file tree
Hide file tree
Showing 17 changed files with 276 additions and 202 deletions.
14 changes: 14 additions & 0 deletions .github/workflows/staging.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ jobs:
repository: awakari/conditions-text
token: ${{ secrets.CLONE_PAT }}

- name: clone evaluator
uses: actions/checkout@v3
with:
path: "evaluator"
repository: awakari/evaluator
token: ${{ secrets.CLONE_PAT }}

- name: clone matches
uses: actions/checkout@v3
with:
Expand Down Expand Up @@ -77,6 +84,13 @@ jobs:
repository: awakari/reader
token: ${{ secrets.CLONE_PAT }}

- name: clone resolver
uses: actions/checkout@v3
with:
path: "resolver"
repository: awakari/resolver
token: ${{ secrets.CLONE_PAT }}

- name: clone subscriptions-proxy
uses: actions/checkout@v3
with:
Expand Down
14 changes: 14 additions & 0 deletions .github/workflows/testing.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ jobs:
repository: awakari/conditions-text
token: ${{ secrets.CLONE_PAT }}

- name: clone evaluator
uses: actions/checkout@v3
with:
path: "evaluator"
repository: awakari/evaluator
token: ${{ secrets.CLONE_PAT }}

- name: clone matches
uses: actions/checkout@v3
with:
Expand Down Expand Up @@ -52,6 +59,13 @@ jobs:
repository: awakari/reader
token: ${{ secrets.CLONE_PAT }}

- name: clone resolver
uses: actions/checkout@v3
with:
path: "resolver"
repository: awakari/resolver
token: ${{ secrets.CLONE_PAT }}

- name: clone subscriptions-proxy
uses: actions/checkout@v3
with:
Expand Down
27 changes: 15 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ For a component-specific options see the corresponding sub-chart configuration.
| mongodb.internal | `true` | Defines whether to deploy the MongoDB internally or use external one. |
| queue.backend.nats | `true` | Enables the NATS JetStream queue wrapper service. Exclusive, can not be used together with other queue backends. |
| semaphore.backend.nats | `true` | Enables the NATS-based distributed semaphore service. Exclusive, can not be used together with other semaphore backends. |
| tracing.enabled | `false` | Enables the distributed tracing, internal Jaeger and Cassandra deployment as well to collect the spans. |

# 3. Deployment

Expand Down Expand Up @@ -140,11 +141,11 @@ Refer to [Client SDK Usage](https://github.com/awakari/client-sdk-go#3-usage).
2. Download the necessary proto files and save to the current directory:
1. [Cloud Events](https://awakari.com/proto/cloudevent.proto)
2. [Reader](https://awakari.com/proto/reader.proto)
3. [Writer](https://awakari.com/proto/writer.proto)
3. [Resolver](https://awakari.com/proto/resolver.proto)
3. Port-forward services to local:
1. `core-reader` -> 50051
2. `core-subscriptionsproxy` -> 50052
3. `core-writer` -> 50053
2. `core-resolver` -> 50052
3. `core-subscriptionsproxy` -> 50053

### 4.2.2. Subscriptions

Expand All @@ -154,7 +155,7 @@ grpcurl \
-plaintext \
-H 'X-Awakari-User-Id: john.doe@company1.com' \
-d @ \
localhost:50052 \
localhost:50053 \
awakari.subscriptions.Service/Create
```

Expand Down Expand Up @@ -218,8 +219,8 @@ grpcurl \
-proto writer.proto \
-H 'X-Awakari-User-Id: john.doe@company1.com' \
-d @ \
localhost:50053 \
awakari.writer.Service/SubmitMessages
localhost:50052 \
awakari.resolver.Service/SubmitMessages
```

Specify the events to write in the payload:
Expand Down Expand Up @@ -264,15 +265,17 @@ The core of Awakari consist of:
* [Matches](https://github.com/awakari/matches)
* [Messages](https://github.com/awakari/messages)
* Stateless components
* [Subscriptions-Proxy](https://github.com/awakari/subscriptions-proxy)
* [Writer](https://github.com/awakari/writer)
* [Subscriptions-Proxy](https://github.com/awakari/subscriptions-proxy)
* [Reader](https://github.com/awakari/reader)
* [Evaluator](https://github.com/awakari/evaluator)
* [Writer](https://github.com/awakari/writer)
* [Resolver](https://github.com/awakari/resolver)
* 3-rd part components
* Mongodb (sharded)
* Redis in-memory cache
* NATS message bus

![components](doc/components-core.png)
![components](doc/components-2023-07-27.png)

# 6. Contributing

Expand All @@ -292,7 +295,7 @@ TODO

Build a helm package:
```shell
for i in core conditions-text matches messages queue-nats reader subscriptions-proxy semaphore-nats writer; do git clone git@github.com:awakari/$i.git; done
for i in core conditions-text evaluator matches messages queue-nats reader resolver subscriptions-proxy semaphore-nats writer; do git clone git@github.com:awakari/$i.git; done
cd core
helm dependency update helm/core
helm package helm/core
Expand All @@ -307,8 +310,8 @@ The repo contains core functional end-to-end tests.
To run the tests locally:

1. Port-forward the reader API to local port 50051
2. Port-forward the subscriptions API to local port 50052
3. Port-forward the writer API to local port 50053
2. Port-forward the resolver API to local port 50052
3. Port-forward the subscriptions API to local port 50053
4.
```shell
make test
Expand Down
Binary file added doc/components-2023-07-27.odg
Binary file not shown.
Binary file added doc/components-2023-07-27.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file removed doc/components-core.odg
Binary file not shown.
Binary file removed doc/components-core.png
Binary file not shown.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/awakari/core
go 1.20

require (
github.com/awakari/client-sdk-go v1.0.1
github.com/awakari/client-sdk-go v1.0.2
github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.14.0
github.com/google/uuid v1.3.0
github.com/kelseyhightower/envconfig v1.4.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/awakari/client-sdk-go v1.0.1 h1:4UULsLPSceCpdeDDWTqqKu9peXigoMVJz78aF5ghNtg=
github.com/awakari/client-sdk-go v1.0.1/go.mod h1:QMPZGt4vNYscux4MjeSQUloFY8iYhR81ztG44FuUmzI=
github.com/awakari/client-sdk-go v1.0.2 h1:lrt3fuhlok+Pa+cHNenK8QUdnfnnTbFG1akeGIm3cEk=
github.com/awakari/client-sdk-go v1.0.2/go.mod h1:QMPZGt4vNYscux4MjeSQUloFY8iYhR81ztG44FuUmzI=
github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.14.0 h1:dEopBSOSjB5fM9r76ufM44AVj9Dnz2IOM0Xs6FVxZRM=
github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.14.0/go.mod h1:qDSbb0fgIfFNjZrNTPtS5MOMScAGyQtn1KlSvoOdqYw=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
8 changes: 8 additions & 0 deletions helm/core/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ dependencies:
version: 0.0.0
repository: "file://../../../reader/helm/reader"

- name: evaluator
version: 0.0.0
repository: "file://../../../evaluator/helm/evaluator"

- name: writer
version: 0.0.0
repository: "file://../../../writer/helm/writer"

- name: resolver
version: 0.0.0
repository: "file://../../../resolver/helm/resolver"
12 changes: 12 additions & 0 deletions helm/core/values-tracing.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
tracing:
enabled: true

evaluator:
tracing:
enabled: true
collector:
uri: "http://core-jaeger-collector:14268/api/traces"

writer:
tracing:
enabled: true
collector:
uri: "http://core-jaeger-collector:14268/api/traces"

resolver:
tracing:
enabled: true
collector:
uri: "http://core-jaeger-collector:14268/api/traces"
32 changes: 28 additions & 4 deletions helm/core/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,33 @@ reader:
semaphore:
uri: "core-semaphore:50051"

evaluator:
image:
pullPolicy: "IfNotPresent"
api:
matches:
uri: "core-matches:50051"
reader:
uri: "core-reader:50051"
queue:
batchSize: 64
limit: 4096
uri: "core-queue:50051"

writer:
image:
pullPolicy: "IfNotPresent"
api:
messages:
uri: "core-messages:50051"
evaluator:
uri: "core-evaluator:50051"
queue:
batchSize: 64
limit: 4096
uri: "core-queue:50051"

resolver:
image:
pullPolicy: "IfNotPresent"
api:
Expand All @@ -180,10 +206,8 @@ writer:
uri: "core-subscriptionsproxy:50051"
matches:
uri: "core-matches:50051"
messages:
uri: "core-messages:50051"
reader:
uri: "core-reader:50051"
writer:
uri: "core-writer:50051"
queue:
batchSize: 64
limit: 4096
Expand Down
4 changes: 2 additions & 2 deletions test/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import "github.com/kelseyhightower/envconfig"
type Config struct {
Uri struct {
Reader string `envconfig:"URI_READER" default:"localhost:50051" required:"true"`
Subscriptions string `envconfig:"URI_SUBSCRIPTIONS" default:"localhost:50052" required:"true"`
Writer string `envconfig:"URI_WRITER" default:"localhost:50053" required:"true"`
Resolver string `envconfig:"URI_RESOLVER" default:"localhost:50052" required:"true"`
Subscriptions string `envconfig:"URI_SUBSCRIPTIONS" default:"localhost:50053" required:"true"`
}
}

Expand Down
2 changes: 1 addition & 1 deletion test/data/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var Msgs = []*pb.CloudEvent{
Attributes: map[string]*pb.CloudEventAttributeValue{
"summary": {
Attr: &pb.CloudEventAttributeValue_CeString{
CeString: "NBCUniversal's former head of advertising is revealed as the new boss of the social network.",
CeString: "NBCUniversal's former head of advertising is revealed as the new boss of the social network. Propose",
},
},
"time": {
Expand Down
3 changes: 1 addition & 2 deletions test/data/subs.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ var Subs = []subscription.Data{
condition.
NewBuilder().
MatchAttrKey("summary").
MatchText("of").
MatchExact().
MatchText("propose").
BuildTextCondition(),
},
).
Expand Down
4 changes: 2 additions & 2 deletions test/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func Test_MessageDelivery(t *testing.T) {
client, err = api.
NewClientBuilder().
SubscriptionsUri(cfg.Uri.Subscriptions).
WriterUri(cfg.Uri.Writer).
WriterUri(cfg.Uri.Resolver).
ReaderUri(cfg.Uri.Reader).
Build()
require.Nil(t, err)
Expand Down Expand Up @@ -97,7 +97,7 @@ func Test_MessageDelivery(t *testing.T) {
for k, c := range cases {
t.Run(k, func(t *testing.T) {
//
ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.TODO(), 1*time.Minute)
defer cancel()
var msgsReader model.Reader[[]*pb.CloudEvent]
msgsReader, err = client.OpenMessagesReader(ctx, "test-user-0", c.subId, 4)
Expand Down
Loading

0 comments on commit 9430a25

Please sign in to comment.