From 20f464a071a230a1891391ff942759661ca0731c Mon Sep 17 00:00:00 2001 From: Andrei Kurilov <18027129+akurilov@users.noreply.github.com> Date: Fri, 15 Mar 2024 16:40:59 +0200 Subject: [PATCH 1/4] fix: evt type attr --- config/config.go | 5 ----- service/messages/channel.go | 2 +- service/messages/publish.go | 2 +- 3 files changed, 2 insertions(+), 7 deletions(-) diff --git a/config/config.go b/config/config.go index 222391f..e1da897 100644 --- a/config/config.go +++ b/config/config.go @@ -99,11 +99,6 @@ type FeedsConfig struct { Uri string `envconfig:"API_SOURCE_FEEDS_URI" default:"source-feeds:50051" required:"true"` } -type TelegramConfig struct { - GroupId string `envconfig:"API_SOURCE_TELEGRAM_GROUP_ID" default:"com.github.awakari.source-telegram"` - Uri string `envconfig:"API_SOURCE_TELEGRAM_URI" default:"source-telegram:50051" required:"true"` -} - type SitesConfig struct { Uri string `envconfig:"API_SOURCE_SITES_URI" default:"source-sites:50051" required:"true"` } diff --git a/service/messages/channel.go b/service/messages/channel.go index 794acdb..4c24fde 100644 --- a/service/messages/channel.go +++ b/service/messages/channel.go @@ -52,7 +52,7 @@ func (cp ChanPostHandler) Publish(tgCtx telebot.Context) (err error) { Id: uuid.NewString(), Source: fmt.Sprintf("https://t.me/%s", chanUserName), SpecVersion: attrValSpecVersion, - Type: "com.github.awakari.bot-telegram.v1", + Type: "com.awakari.bot-telegram.v1", } if err == nil { err = toCloudEvent(tgCtx.Message(), tgCtx.Text(), &evt) diff --git a/service/messages/publish.go b/service/messages/publish.go index 4b82129..91a171c 100644 --- a/service/messages/publish.go +++ b/service/messages/publish.go @@ -79,7 +79,7 @@ func PublishBasicReplyHandlerFunc( Id: uuid.NewString(), Source: "https://t.me/" + tgCtx.Chat().Username, SpecVersion: attrValSpecVersion, - Type: "com.github.awakari.bot-telegram.v1", + Type: "com.awakari.bot-telegram.v1", } if err == nil { defer w.Close() From b59d36f6a5ab4107736d5a0eb86d36e758d8a9fd Mon Sep 17 00:00:00 2001 From: Andrei Kurilov <18027129+akurilov@users.noreply.github.com> Date: Sat, 16 Mar 2024 21:06:54 +0200 Subject: [PATCH 2/4] fix: evt attrs naming --- service/messages/format.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/service/messages/format.go b/service/messages/format.go index 7a75e09..12abc15 100644 --- a/service/messages/format.go +++ b/service/messages/format.go @@ -158,11 +158,11 @@ func (f Format) convertExtraAttrs(evt *pb.CloudEvent, mode FormatMode, trunc boo case "awakariuserid": // do not expose case "awkhash": // internal, useless case "awkinternal": // internal - case "feedcategories": - case "feeddescription": - case "feedimagetitle": - case "feedimageurl": - case "feedtitle": + case "srccategories": + case "srcdescription": + case "srcimagetitle": + case "srcimageurl": + case "srctitle": default: switch vt := attrVal.Attr.(type) { case *pb.CloudEventAttributeValue_CeBoolean: From 573a2608437c705694a5985afcdd7f659150e517 Mon Sep 17 00:00:00 2001 From: Andrei Kurilov <18027129+akurilov@users.noreply.github.com> Date: Wed, 20 Mar 2024 11:11:39 +0200 Subject: [PATCH 3/4] feat: grpc methods to link/unlink the subscription --- api/grpc/tgbot/controller.go | 77 ++++++++++++++++++++++++++++++- api/grpc/tgbot/controller_test.go | 10 +++- api/grpc/tgbot/service.proto | 18 ++++++++ main.go | 28 +++++++---- scripts/cover.sh | 2 +- service/common.go | 1 + 6 files changed, 123 insertions(+), 13 deletions(-) diff --git a/api/grpc/tgbot/controller.go b/api/grpc/tgbot/controller.go index 0ddcb95..d16893e 100644 --- a/api/grpc/tgbot/controller.go +++ b/api/grpc/tgbot/controller.go @@ -3,11 +3,20 @@ package tgbot import ( "context" "encoding/json" + "fmt" + "github.com/awakari/bot-telegram/service" + "github.com/awakari/bot-telegram/service/chats" "github.com/awakari/bot-telegram/service/messages" + "github.com/awakari/client-sdk-go/api" tgverifier "github.com/electrofocus/telegram-auth-verifier" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/timestamppb" + "gopkg.in/telebot.v3" + "log/slog" + "strconv" + "strings" + "time" ) type Controller interface { @@ -17,12 +26,32 @@ type Controller interface { type controller struct { secretToken []byte cp messages.ChanPostHandler + chatStor chats.Storage + log *slog.Logger + clientAwk api.Client + tgBot *telebot.Bot + msgFmt messages.Format } -func NewController(secretToken []byte, cp messages.ChanPostHandler) Controller { +const intervalDefault = 1 * time.Minute + +func NewController( + secretToken []byte, + cp messages.ChanPostHandler, + chatStor chats.Storage, + log *slog.Logger, + clientAwk api.Client, + tgBot *telebot.Bot, + msgFmt messages.Format, +) Controller { return controller{ secretToken: secretToken, cp: cp, + chatStor: chatStor, + log: log, + clientAwk: clientAwk, + tgBot: tgBot, + msgFmt: msgFmt, } } @@ -71,6 +100,52 @@ func (c controller) ListChannels(ctx context.Context, req *ListChannelsRequest) return } +func (c controller) Subscribe(ctx context.Context, req *SubscribeRequest) (resp *SubscribeResponse, err error) { + subId := req.SubId + groupId := req.GroupId + userId := req.UserId + if !strings.HasPrefix(userId, service.PrefixUserId) { + err = status.Error(codes.InvalidArgument, fmt.Sprintf("User id should have prefix: %s, got: %s", service.PrefixUserId, userId)) + } + var chatId int64 + if err == nil { + chatId, err = strconv.ParseInt(userId[:len(service.PrefixUserId)], 10, 64) + if err != nil { + err = status.Error(codes.InvalidArgument, fmt.Sprintf("User id should end with numeric id: %s, %s", userId, err)) + } + } + if err == nil { + chat := chats.Chat{ + Id: chatId, + SubId: subId, + GroupId: groupId, + UserId: userId, + MinInterval: intervalDefault, + } + err = c.chatStor.LinkSubscription(ctx, chat) + err = encodeError(err) + } + if err == nil { + u := telebot.Update{ + Message: &telebot.Message{ + Chat: &telebot.Chat{ + ID: chatId, + }, + }, + } + r := chats.NewReader(c.tgBot.NewContext(u), c.clientAwk, c.chatStor, chatId, subId, groupId, userId, c.msgFmt, intervalDefault) + go r.Run(context.Background(), c.log) + } + return +} + +func (c controller) Unsubscribe(ctx context.Context, req *UnsubscribeRequest) (resp *UnsubscribeResponse, err error) { + chats.StopChatReader(req.SubId) + err = c.chatStor.UnlinkSubscription(ctx, req.SubId) + err = encodeError(err) + return +} + func encodeError(src error) (dst error) { switch { case src == nil: diff --git a/api/grpc/tgbot/controller_test.go b/api/grpc/tgbot/controller_test.go index 50beb2f..641a42c 100644 --- a/api/grpc/tgbot/controller_test.go +++ b/api/grpc/tgbot/controller_test.go @@ -26,7 +26,15 @@ var log = slog.Default() func TestMain(m *testing.M) { go func() { srv := grpc.NewServer() - c := NewController([]byte("6668123457:ZAJALGCBOGw8q9k2yBidb6kepmrBVGOrBLb"), messages.ChanPostHandler{}) + c := NewController( + []byte("6668123457:ZAJALGCBOGw8q9k2yBidb6kepmrBVGOrBLb"), + messages.ChanPostHandler{}, + nil, + slog.Default(), + nil, + nil, + messages.Format{}, + ) RegisterServiceServer(srv, c) reflection.Register(srv) grpc_health_v1.RegisterHealthServer(srv, health.NewServer()) diff --git a/api/grpc/tgbot/service.proto b/api/grpc/tgbot/service.proto index 71f21c7..bc9fd2a 100644 --- a/api/grpc/tgbot/service.proto +++ b/api/grpc/tgbot/service.proto @@ -9,6 +9,8 @@ import "google/protobuf/timestamp.proto"; service Service { rpc Authenticate(AuthenticateRequest) returns (AuthenticateResponse); rpc ListChannels(ListChannelsRequest) returns (ListChannelsResponse); + rpc Subscribe(SubscribeRequest) returns (SubscribeResponse); + rpc Unsubscribe(UnsubscribeRequest) returns (UnsubscribeResponse); } message AuthenticateRequest { @@ -43,3 +45,19 @@ message Channel { message Filter { string pattern = 1; } + +message SubscribeRequest { + string subId = 1; + string groupId = 2; + string userId = 3; +} + +message SubscribeResponse { +} + +message UnsubscribeRequest { + string subId = 1; +} + +message UnsubscribeResponse { +} diff --git a/main.go b/main.go index 1df8964..32ea20e 100644 --- a/main.go +++ b/main.go @@ -174,16 +174,6 @@ func main() { } defer chanPostHandler.Close() - // init the Telegram Bot grpc service - controllerGrpc := grpcApiTgBot.NewController([]byte(cfg.Api.Telegram.Token), chanPostHandler) - go func() { - log.Info(fmt.Sprintf("starting to listen the grpc API @ port #%d...", cfg.Api.Telegram.Bot.Port)) - err = grpcApi.Serve(cfg.Api.Telegram.Bot.Port, controllerGrpc) - if err != nil { - panic(err) - } - }() - callbackHandlers := map[string]service.ArgHandlerFunc{ subscriptions.CmdDescription: subscriptions.DescriptionHandlerFunc(clientAwk, groupId), subscriptions.CmdExtend: subExtHandler.RequestExtensionDaysCount, @@ -284,6 +274,24 @@ func main() { panic(err) } + // init the Telegram Bot grpc service + controllerGrpc := grpcApiTgBot.NewController( + []byte(cfg.Api.Telegram.Token), + chanPostHandler, + chatStor, + log, + clientAwk, + b, + msgFmt, + ) + go func() { + log.Info(fmt.Sprintf("starting to listen the grpc API @ port #%d...", cfg.Api.Telegram.Bot.Port)) + err = grpcApi.Serve(cfg.Api.Telegram.Bot.Port, controllerGrpc) + if err != nil { + panic(err) + } + }() + // resolve the donation invoice - should be pinned in the dedicated private channel var dCh *telebot.Chat dCh, err = b.ChatByID(cfg.Payment.DonationChatId) diff --git a/scripts/cover.sh b/scripts/cover.sh index d84780f..347865a 100755 --- a/scripts/cover.sh +++ b/scripts/cover.sh @@ -1,7 +1,7 @@ #!/bin/bash COVERAGE=$(cat cover.tmp) -THRESHOLD=9 +THRESHOLD=8 if [[ ${COVERAGE} -lt ${THRESHOLD} ]]; \ then \ echo "FAILED: test coverage ${COVERAGE}% < ${THRESHOLD}%"; \ diff --git a/service/common.go b/service/common.go index c5ebf0d..7120d9e 100644 --- a/service/common.go +++ b/service/common.go @@ -3,4 +3,5 @@ package service const PageLimit = 10 const CmdLimit = 64 const KeyGroupId = "X-Awakari-Group-Id" +const PrefixUserId = "tg://user?id=" const FmtUserId = "tg://user?id=%d" From a2303b94efa9d78d68dcef895c07fa8b9cbcb6f6 Mon Sep 17 00:00:00 2001 From: Andrei Kurilov <18027129+akurilov@users.noreply.github.com> Date: Wed, 20 Mar 2024 12:29:51 +0200 Subject: [PATCH 4/4] feat: grpc methods to link/unlink the subscription [skip ci] --- api/grpc/tgbot/controller.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/api/grpc/tgbot/controller.go b/api/grpc/tgbot/controller.go index d16893e..d5385a5 100644 --- a/api/grpc/tgbot/controller.go +++ b/api/grpc/tgbot/controller.go @@ -8,8 +8,10 @@ import ( "github.com/awakari/bot-telegram/service/chats" "github.com/awakari/bot-telegram/service/messages" "github.com/awakari/client-sdk-go/api" + "github.com/awakari/client-sdk-go/model/subscription" tgverifier "github.com/electrofocus/telegram-auth-verifier" "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/timestamppb" "gopkg.in/telebot.v3" @@ -109,7 +111,7 @@ func (c controller) Subscribe(ctx context.Context, req *SubscribeRequest) (resp } var chatId int64 if err == nil { - chatId, err = strconv.ParseInt(userId[:len(service.PrefixUserId)], 10, 64) + chatId, err = strconv.ParseInt(userId[len(service.PrefixUserId):], 10, 64) if err != nil { err = status.Error(codes.InvalidArgument, fmt.Sprintf("User id should end with numeric id: %s, %s", userId, err)) } @@ -133,8 +135,15 @@ func (c controller) Subscribe(ctx context.Context, req *SubscribeRequest) (resp }, }, } - r := chats.NewReader(c.tgBot.NewContext(u), c.clientAwk, c.chatStor, chatId, subId, groupId, userId, c.msgFmt, intervalDefault) + tgCtx := c.tgBot.NewContext(u) + r := chats.NewReader(tgCtx, c.clientAwk, c.chatStor, chatId, subId, groupId, userId, c.msgFmt, intervalDefault) go r.Run(context.Background(), c.log) + groupIdCtx := metadata.AppendToOutgoingContext(ctx, service.KeyGroupId, groupId) + var d subscription.Data + d, err = c.clientAwk.ReadSubscription(groupIdCtx, userId, subId) + if err == nil { + _ = tgCtx.Send(fmt.Sprintf("New subscription \"%s\" is linked to this chat", d.Description)) + } } return }