diff --git a/.github/workflows/staging.yaml b/.github/workflows/staging.yaml index 6c95b71..a126d88 100644 --- a/.github/workflows/staging.yaml +++ b/.github/workflows/staging.yaml @@ -49,6 +49,13 @@ jobs: repository: awakari/conditions-text token: ${{ secrets.CLONE_PAT }} + - name: clone evaluator + uses: actions/checkout@v3 + with: + path: "evaluator" + repository: awakari/evaluator + token: ${{ secrets.CLONE_PAT }} + - name: clone matches uses: actions/checkout@v3 with: @@ -77,6 +84,13 @@ jobs: repository: awakari/reader token: ${{ secrets.CLONE_PAT }} + - name: clone resolver + uses: actions/checkout@v3 + with: + path: "resolver" + repository: awakari/resolver + token: ${{ secrets.CLONE_PAT }} + - name: clone subscriptions-proxy uses: actions/checkout@v3 with: diff --git a/.github/workflows/testing.yaml b/.github/workflows/testing.yaml index 832012d..0f60737 100644 --- a/.github/workflows/testing.yaml +++ b/.github/workflows/testing.yaml @@ -24,6 +24,13 @@ jobs: repository: awakari/conditions-text token: ${{ secrets.CLONE_PAT }} + - name: clone evaluator + uses: actions/checkout@v3 + with: + path: "evaluator" + repository: awakari/evaluator + token: ${{ secrets.CLONE_PAT }} + - name: clone matches uses: actions/checkout@v3 with: @@ -52,6 +59,13 @@ jobs: repository: awakari/reader token: ${{ secrets.CLONE_PAT }} + - name: clone resolver + uses: actions/checkout@v3 + with: + path: "resolver" + repository: awakari/resolver + token: ${{ secrets.CLONE_PAT }} + - name: clone subscriptions-proxy uses: actions/checkout@v3 with: diff --git a/README.md b/README.md index 7af9859..91dc064 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,7 @@ For a component-specific options see the corresponding sub-chart configuration. | mongodb.internal | `true` | Defines whether to deploy the MongoDB internally or use external one. | | queue.backend.nats | `true` | Enables the NATS JetStream queue wrapper service. Exclusive, can not be used together with other queue backends. | | semaphore.backend.nats | `true` | Enables the NATS-based distributed semaphore service. Exclusive, can not be used together with other semaphore backends. | +| tracing.enabled | `false` | Enables the distributed tracing, internal Jaeger and Cassandra deployment as well to collect the spans. | # 3. Deployment @@ -140,11 +141,11 @@ Refer to [Client SDK Usage](https://github.com/awakari/client-sdk-go#3-usage). 2. Download the necessary proto files and save to the current directory: 1. [Cloud Events](https://awakari.com/proto/cloudevent.proto) 2. [Reader](https://awakari.com/proto/reader.proto) - 3. [Writer](https://awakari.com/proto/writer.proto) + 3. [Resolver](https://awakari.com/proto/resolver.proto) 3. Port-forward services to local: 1. `core-reader` -> 50051 - 2. `core-subscriptionsproxy` -> 50052 - 3. `core-writer` -> 50053 + 2. `core-resolver` -> 50052 + 3. `core-subscriptionsproxy` -> 50053 ### 4.2.2. Subscriptions @@ -154,7 +155,7 @@ grpcurl \ -plaintext \ -H 'X-Awakari-User-Id: john.doe@company1.com' \ -d @ \ - localhost:50052 \ + localhost:50053 \ awakari.subscriptions.Service/Create ``` @@ -218,8 +219,8 @@ grpcurl \ -proto writer.proto \ -H 'X-Awakari-User-Id: john.doe@company1.com' \ -d @ \ - localhost:50053 \ - awakari.writer.Service/SubmitMessages + localhost:50052 \ + awakari.resolver.Service/SubmitMessages ``` Specify the events to write in the payload: @@ -264,15 +265,17 @@ The core of Awakari consist of: * [Matches](https://github.com/awakari/matches) * [Messages](https://github.com/awakari/messages) * Stateless components - * [Subscriptions-Proxy](https://github.com/awakari/subscriptions-proxy) - * [Writer](https://github.com/awakari/writer) + * [Subscriptions-Proxy](https://github.com/awakari/subscriptions-proxy) * [Reader](https://github.com/awakari/reader) + * [Evaluator](https://github.com/awakari/evaluator) + * [Writer](https://github.com/awakari/writer) + * [Resolver](https://github.com/awakari/resolver) * 3-rd part components * Mongodb (sharded) * Redis in-memory cache * NATS message bus -![components](doc/components-core.png) +![components](doc/components-2023-07-27.png) # 6. Contributing @@ -292,7 +295,7 @@ TODO Build a helm package: ```shell -for i in core conditions-text matches messages queue-nats reader subscriptions-proxy semaphore-nats writer; do git clone git@github.com:awakari/$i.git; done +for i in core conditions-text evaluator matches messages queue-nats reader resolver subscriptions-proxy semaphore-nats writer; do git clone git@github.com:awakari/$i.git; done cd core helm dependency update helm/core helm package helm/core @@ -307,8 +310,8 @@ The repo contains core functional end-to-end tests. To run the tests locally: 1. Port-forward the reader API to local port 50051 -2. Port-forward the subscriptions API to local port 50052 -3. Port-forward the writer API to local port 50053 +2. Port-forward the resolver API to local port 50052 +3. Port-forward the subscriptions API to local port 50053 4. ```shell make test diff --git a/doc/components-2023-07-27.odg b/doc/components-2023-07-27.odg new file mode 100644 index 0000000..5afc968 Binary files /dev/null and b/doc/components-2023-07-27.odg differ diff --git a/doc/components-2023-07-27.png b/doc/components-2023-07-27.png new file mode 100644 index 0000000..4c6c1b2 Binary files /dev/null and b/doc/components-2023-07-27.png differ diff --git a/doc/components-core.odg b/doc/components-core.odg deleted file mode 100644 index 0495893..0000000 Binary files a/doc/components-core.odg and /dev/null differ diff --git a/doc/components-core.png b/doc/components-core.png deleted file mode 100644 index a6814fc..0000000 Binary files a/doc/components-core.png and /dev/null differ diff --git a/go.mod b/go.mod index b346bf3..086c61a 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/awakari/core go 1.20 require ( - github.com/awakari/client-sdk-go v1.0.1 + github.com/awakari/client-sdk-go v1.0.2 github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.14.0 github.com/google/uuid v1.3.0 github.com/kelseyhightower/envconfig v1.4.0 diff --git a/go.sum b/go.sum index 864b8e6..2d9fcfe 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/awakari/client-sdk-go v1.0.1 h1:4UULsLPSceCpdeDDWTqqKu9peXigoMVJz78aF5ghNtg= -github.com/awakari/client-sdk-go v1.0.1/go.mod h1:QMPZGt4vNYscux4MjeSQUloFY8iYhR81ztG44FuUmzI= +github.com/awakari/client-sdk-go v1.0.2 h1:lrt3fuhlok+Pa+cHNenK8QUdnfnnTbFG1akeGIm3cEk= +github.com/awakari/client-sdk-go v1.0.2/go.mod h1:QMPZGt4vNYscux4MjeSQUloFY8iYhR81ztG44FuUmzI= github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.14.0 h1:dEopBSOSjB5fM9r76ufM44AVj9Dnz2IOM0Xs6FVxZRM= github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.14.0/go.mod h1:qDSbb0fgIfFNjZrNTPtS5MOMScAGyQtn1KlSvoOdqYw= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/helm/core/Chart.yaml b/helm/core/Chart.yaml index 4fbd5c3..488e617 100644 --- a/helm/core/Chart.yaml +++ b/helm/core/Chart.yaml @@ -81,6 +81,14 @@ dependencies: version: 0.0.0 repository: "file://../../../reader/helm/reader" + - name: evaluator + version: 0.0.0 + repository: "file://../../../evaluator/helm/evaluator" + - name: writer version: 0.0.0 repository: "file://../../../writer/helm/writer" + + - name: resolver + version: 0.0.0 + repository: "file://../../../resolver/helm/resolver" diff --git a/helm/core/values-tracing.yaml b/helm/core/values-tracing.yaml index 88b91f6..5713d82 100644 --- a/helm/core/values-tracing.yaml +++ b/helm/core/values-tracing.yaml @@ -1,8 +1,20 @@ tracing: enabled: true +evaluator: + tracing: + enabled: true + collector: + uri: "http://core-jaeger-collector:14268/api/traces" + writer: tracing: enabled: true collector: uri: "http://core-jaeger-collector:14268/api/traces" + +resolver: + tracing: + enabled: true + collector: + uri: "http://core-jaeger-collector:14268/api/traces" diff --git a/helm/core/values.yaml b/helm/core/values.yaml index 2134bd4..225defa 100644 --- a/helm/core/values.yaml +++ b/helm/core/values.yaml @@ -169,7 +169,33 @@ reader: semaphore: uri: "core-semaphore:50051" +evaluator: + image: + pullPolicy: "IfNotPresent" + api: + matches: + uri: "core-matches:50051" + reader: + uri: "core-reader:50051" + queue: + batchSize: 64 + limit: 4096 + uri: "core-queue:50051" + writer: + image: + pullPolicy: "IfNotPresent" + api: + messages: + uri: "core-messages:50051" + evaluator: + uri: "core-evaluator:50051" + queue: + batchSize: 64 + limit: 4096 + uri: "core-queue:50051" + +resolver: image: pullPolicy: "IfNotPresent" api: @@ -180,10 +206,8 @@ writer: uri: "core-subscriptionsproxy:50051" matches: uri: "core-matches:50051" - messages: - uri: "core-messages:50051" - reader: - uri: "core-reader:50051" + writer: + uri: "core-writer:50051" queue: batchSize: 64 limit: 4096 diff --git a/test/config/config.go b/test/config/config.go index 383ec89..fb75f3e 100644 --- a/test/config/config.go +++ b/test/config/config.go @@ -5,8 +5,8 @@ import "github.com/kelseyhightower/envconfig" type Config struct { Uri struct { Reader string `envconfig:"URI_READER" default:"localhost:50051" required:"true"` - Subscriptions string `envconfig:"URI_SUBSCRIPTIONS" default:"localhost:50052" required:"true"` - Writer string `envconfig:"URI_WRITER" default:"localhost:50053" required:"true"` + Resolver string `envconfig:"URI_RESOLVER" default:"localhost:50052" required:"true"` + Subscriptions string `envconfig:"URI_SUBSCRIPTIONS" default:"localhost:50053" required:"true"` } } diff --git a/test/data/msgs.go b/test/data/msgs.go index 924a380..74fdc12 100644 --- a/test/data/msgs.go +++ b/test/data/msgs.go @@ -42,7 +42,7 @@ var Msgs = []*pb.CloudEvent{ Attributes: map[string]*pb.CloudEventAttributeValue{ "summary": { Attr: &pb.CloudEventAttributeValue_CeString{ - CeString: "NBCUniversal's former head of advertising is revealed as the new boss of the social network.", + CeString: "NBCUniversal's former head of advertising is revealed as the new boss of the social network. Propose", }, }, "time": { diff --git a/test/data/subs.go b/test/data/subs.go index 0c3842e..53814de 100644 --- a/test/data/subs.go +++ b/test/data/subs.go @@ -94,8 +94,7 @@ var Subs = []subscription.Data{ condition. NewBuilder(). MatchAttrKey("summary"). - MatchText("of"). - MatchExact(). + MatchText("propose"). BuildTextCondition(), }, ). diff --git a/test/e2e_test.go b/test/e2e_test.go index 9ebe329..2e331dc 100644 --- a/test/e2e_test.go +++ b/test/e2e_test.go @@ -23,7 +23,7 @@ func Test_MessageDelivery(t *testing.T) { client, err = api. NewClientBuilder(). SubscriptionsUri(cfg.Uri.Subscriptions). - WriterUri(cfg.Uri.Writer). + WriterUri(cfg.Uri.Resolver). ReaderUri(cfg.Uri.Reader). Build() require.Nil(t, err) @@ -97,7 +97,7 @@ func Test_MessageDelivery(t *testing.T) { for k, c := range cases { t.Run(k, func(t *testing.T) { // - ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.TODO(), 1*time.Minute) defer cancel() var msgsReader model.Reader[[]*pb.CloudEvent] msgsReader, err = client.OpenMessagesReader(ctx, "test-user-0", c.subId, 4) diff --git a/test/perf_e2e_test.go b/test/perf_e2e_test.go index c2b9fd6..ccb3ef3 100644 --- a/test/perf_e2e_test.go +++ b/test/perf_e2e_test.go @@ -31,7 +31,7 @@ func Test_Perf_EndToEnd(t *testing.T) { client, err = api. NewClientBuilder(). SubscriptionsUri(cfg.Uri.Subscriptions). - WriterUri(cfg.Uri.Writer). + WriterUri(cfg.Uri.Resolver). ReaderUri(cfg.Uri.Reader). Build() require.Nil(t, err) @@ -52,195 +52,195 @@ func Test_Perf_EndToEnd(t *testing.T) { batchSize: 1, duration: 300 * time.Second, }, - "subCount = 1, writeRate = 5": { - subCount: 1, - writeRate: 5, - batchSize: 1, - duration: 200 * time.Second, - }, - "subCount = 1, writeRate = 10": { - subCount: 1, - writeRate: 10, - batchSize: 2, - duration: 200 * time.Second, - }, + //"subCount = 1, writeRate = 5": { + // subCount: 1, + // writeRate: 5, + // batchSize: 1, + // duration: 200 * time.Second, + //}, + //"subCount = 1, writeRate = 10": { + // subCount: 1, + // writeRate: 10, + // batchSize: 2, + // duration: 200 * time.Second, + //}, "subCount = 1, writeRate = 20": { subCount: 1, writeRate: 20, batchSize: 16, duration: 200 * time.Second, }, - "subCount = 1, writeRate = 50": { - subCount: 1, - writeRate: 50, - batchSize: 16, - duration: 200 * time.Second, - }, - "subCount = 1, writeRate = 100": { - subCount: 1, - writeRate: 100, - batchSize: 16, - duration: 200 * time.Second, - }, + //"subCount = 1, writeRate = 50": { + // subCount: 1, + // writeRate: 50, + // batchSize: 16, + // duration: 200 * time.Second, + //}, + //"subCount = 1, writeRate = 100": { + // subCount: 1, + // writeRate: 100, + // batchSize: 16, + // duration: 200 * time.Second, + //}, "subCount = 1, writeRate = 200": { subCount: 1, writeRate: 200, batchSize: 16, duration: 200 * time.Second, }, - "subCount = 1, writeRate = 500": { - subCount: 1, - writeRate: 500, - batchSize: 16, - duration: 200 * time.Second, - }, + //"subCount = 1, writeRate = 500": { + // subCount: 1, + // writeRate: 500, + // batchSize: 16, + // duration: 200 * time.Second, + //}, // subCount = 10 - "subCount = 10, writeRate = 2": { - subCount: 10, - writeRate: 2, - batchSize: 1, - duration: 300 * time.Second, - }, - "subCount = 10, writeRate = 5": { - subCount: 10, - writeRate: 5, - batchSize: 1, - duration: 200 * time.Second, - }, - "subCount = 10, writeRate = 10": { - subCount: 10, - writeRate: 10, - batchSize: 2, - duration: 200 * time.Second, - }, - "subCount = 10, writeRate = 20": { - subCount: 10, - writeRate: 20, - batchSize: 16, - duration: 200 * time.Second, - }, - "subCount = 10, writeRate = 50": { - subCount: 10, - writeRate: 50, - batchSize: 16, - duration: 200 * time.Second, - }, - "subCount = 10, writeRate = 100": { - subCount: 10, - writeRate: 100, - batchSize: 16, - duration: 200 * time.Second, - }, - "subCount = 10, writeRate = 200": { - subCount: 10, - writeRate: 200, - batchSize: 16, - duration: 200 * time.Second, - }, - "subCount = 10, writeRate = 500": { - subCount: 10, - writeRate: 500, - batchSize: 16, - duration: 200 * time.Second, - }, + //"subCount = 10, writeRate = 2": { + // subCount: 10, + // writeRate: 2, + // batchSize: 1, + // duration: 300 * time.Second, + //}, + //"subCount = 10, writeRate = 5": { + // subCount: 10, + // writeRate: 5, + // batchSize: 1, + // duration: 200 * time.Second, + //}, + //"subCount = 10, writeRate = 10": { + // subCount: 10, + // writeRate: 10, + // batchSize: 2, + // duration: 200 * time.Second, + //}, + //"subCount = 10, writeRate = 20": { + // subCount: 10, + // writeRate: 20, + // batchSize: 16, + // duration: 200 * time.Second, + //}, + //"subCount = 10, writeRate = 50": { + // subCount: 10, + // writeRate: 50, + // batchSize: 16, + // duration: 200 * time.Second, + //}, + //"subCount = 10, writeRate = 100": { + // subCount: 10, + // writeRate: 100, + // batchSize: 16, + // duration: 200 * time.Second, + //}, + //"subCount = 10, writeRate = 200": { + // subCount: 10, + // writeRate: 200, + // batchSize: 16, + // duration: 200 * time.Second, + //}, + //"subCount = 10, writeRate = 500": { + // subCount: 10, + // writeRate: 500, + // batchSize: 16, + // duration: 200 * time.Second, + //}, // subCount = 100 - "subCount = 100, writeRate = 2": { - subCount: 100, - writeRate: 2, - batchSize: 1, - duration: 300 * time.Second, - }, - "subCount = 100, writeRate = 5": { - subCount: 100, - writeRate: 5, - batchSize: 1, - duration: 200 * time.Second, - }, - "subCount = 100, writeRate = 10": { - subCount: 100, - writeRate: 10, - batchSize: 2, - duration: 200 * time.Second, - }, - "subCount = 100, writeRate = 20": { - subCount: 100, - writeRate: 20, - batchSize: 16, - duration: 200 * time.Second, - }, - "subCount = 100, writeRate = 50": { - subCount: 100, - writeRate: 50, - batchSize: 16, - duration: 200 * time.Second, - }, - "subCount = 100, writeRate = 100": { - subCount: 100, - writeRate: 100, - batchSize: 16, - duration: 200 * time.Second, - }, - "subCount = 100, writeRate = 200": { - subCount: 100, - writeRate: 200, - batchSize: 16, - duration: 200 * time.Second, - }, - "subCount = 100, writeRate = 500": { - subCount: 100, - writeRate: 500, - batchSize: 16, - duration: 200 * time.Second, - }, + //"subCount = 100, writeRate = 2": { + // subCount: 100, + // writeRate: 2, + // batchSize: 1, + // duration: 300 * time.Second, + //}, + //"subCount = 100, writeRate = 5": { + // subCount: 100, + // writeRate: 5, + // batchSize: 1, + // duration: 200 * time.Second, + //}, + //"subCount = 100, writeRate = 10": { + // subCount: 100, + // writeRate: 10, + // batchSize: 2, + // duration: 200 * time.Second, + //}, + //"subCount = 100, writeRate = 20": { + // subCount: 100, + // writeRate: 20, + // batchSize: 16, + // duration: 200 * time.Second, + //}, + //"subCount = 100, writeRate = 50": { + // subCount: 100, + // writeRate: 50, + // batchSize: 16, + // duration: 200 * time.Second, + //}, + //"subCount = 100, writeRate = 100": { + // subCount: 100, + // writeRate: 100, + // batchSize: 16, + // duration: 200 * time.Second, + //}, + //"subCount = 100, writeRate = 200": { + // subCount: 100, + // writeRate: 200, + // batchSize: 16, + // duration: 200 * time.Second, + //}, + //"subCount = 100, writeRate = 500": { + // subCount: 100, + // writeRate: 500, + // batchSize: 16, + // duration: 200 * time.Second, + //}, // subCount = 1000 - "subCount = 1000, writeRate = 2": { - subCount: 1000, - writeRate: 2, - batchSize: 1, - duration: 300 * time.Second, - }, - "subCount = 1000, writeRate = 5": { - subCount: 1000, - writeRate: 5, - batchSize: 1, - duration: 200 * time.Second, - }, - "subCount = 1000, writeRate = 10": { - subCount: 1000, - writeRate: 10, - batchSize: 2, - duration: 200 * time.Second, - }, - "subCount = 1000, writeRate = 20": { - subCount: 1000, - writeRate: 20, - batchSize: 16, - duration: 200 * time.Second, - }, - "subCount = 1000, writeRate = 50": { - subCount: 1000, - writeRate: 50, - batchSize: 16, - duration: 200 * time.Second, - }, - "subCount = 1000, writeRate = 100": { - subCount: 1000, - writeRate: 100, - batchSize: 16, - duration: 200 * time.Second, - }, - "subCount = 1000, writeRate = 200": { - subCount: 1000, - writeRate: 200, - batchSize: 16, - duration: 200 * time.Second, - }, - "subCount = 1000, writeRate = 500": { - subCount: 1000, - writeRate: 500, - batchSize: 16, - duration: 200 * time.Second, - }, + //"subCount = 1000, writeRate = 2": { + // subCount: 1000, + // writeRate: 2, + // batchSize: 1, + // duration: 300 * time.Second, + //}, + //"subCount = 1000, writeRate = 5": { + // subCount: 1000, + // writeRate: 5, + // batchSize: 1, + // duration: 200 * time.Second, + //}, + //"subCount = 1000, writeRate = 10": { + // subCount: 1000, + // writeRate: 10, + // batchSize: 2, + // duration: 200 * time.Second, + //}, + //"subCount = 1000, writeRate = 20": { + // subCount: 1000, + // writeRate: 20, + // batchSize: 16, + // duration: 200 * time.Second, + //}, + //"subCount = 1000, writeRate = 50": { + // subCount: 1000, + // writeRate: 50, + // batchSize: 16, + // duration: 200 * time.Second, + //}, + //"subCount = 1000, writeRate = 100": { + // subCount: 1000, + // writeRate: 100, + // batchSize: 16, + // duration: 200 * time.Second, + //}, + //"subCount = 1000, writeRate = 200": { + // subCount: 1000, + // writeRate: 200, + // batchSize: 16, + // duration: 200 * time.Second, + //}, + //"subCount = 1000, writeRate = 500": { + // subCount: 1000, + // writeRate: 500, + // batchSize: 16, + // duration: 200 * time.Second, + //}, } // for k, c := range cases { @@ -392,7 +392,7 @@ func Test_Perf_MaxRate_WriteRead(t *testing.T) { client, err = api. NewClientBuilder(). SubscriptionsUri(cfg.Uri.Subscriptions). - WriterUri(cfg.Uri.Writer). + WriterUri(cfg.Uri.Resolver). ReaderUri(cfg.Uri.Reader). Build() require.Nil(t, err)