Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
akurilov committed Mar 27, 2024
2 parents 7ce679d + a2303b9 commit 07216db
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 25 deletions.
86 changes: 85 additions & 1 deletion api/grpc/tgbot/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,22 @@ package tgbot
import (
"context"
"encoding/json"
"fmt"
"github.com/awakari/bot-telegram/service"
"github.com/awakari/bot-telegram/service/chats"
"github.com/awakari/bot-telegram/service/messages"
"github.com/awakari/client-sdk-go/api"
"github.com/awakari/client-sdk-go/model/subscription"
tgverifier "github.com/electrofocus/telegram-auth-verifier"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
"gopkg.in/telebot.v3"
"log/slog"
"strconv"
"strings"
"time"
)

type Controller interface {
Expand All @@ -17,12 +28,32 @@ type Controller interface {
type controller struct {
secretToken []byte
cp messages.ChanPostHandler
chatStor chats.Storage
log *slog.Logger
clientAwk api.Client
tgBot *telebot.Bot
msgFmt messages.Format
}

func NewController(secretToken []byte, cp messages.ChanPostHandler) Controller {
const intervalDefault = 1 * time.Minute

func NewController(
secretToken []byte,
cp messages.ChanPostHandler,
chatStor chats.Storage,
log *slog.Logger,
clientAwk api.Client,
tgBot *telebot.Bot,
msgFmt messages.Format,
) Controller {
return controller{
secretToken: secretToken,
cp: cp,
chatStor: chatStor,
log: log,
clientAwk: clientAwk,
tgBot: tgBot,
msgFmt: msgFmt,
}
}

Expand Down Expand Up @@ -71,6 +102,59 @@ func (c controller) ListChannels(ctx context.Context, req *ListChannelsRequest)
return
}

func (c controller) Subscribe(ctx context.Context, req *SubscribeRequest) (resp *SubscribeResponse, err error) {
subId := req.SubId
groupId := req.GroupId
userId := req.UserId
if !strings.HasPrefix(userId, service.PrefixUserId) {
err = status.Error(codes.InvalidArgument, fmt.Sprintf("User id should have prefix: %s, got: %s", service.PrefixUserId, userId))
}
var chatId int64
if err == nil {
chatId, err = strconv.ParseInt(userId[len(service.PrefixUserId):], 10, 64)
if err != nil {
err = status.Error(codes.InvalidArgument, fmt.Sprintf("User id should end with numeric id: %s, %s", userId, err))
}
}
if err == nil {
chat := chats.Chat{
Id: chatId,
SubId: subId,
GroupId: groupId,
UserId: userId,
MinInterval: intervalDefault,
}
err = c.chatStor.LinkSubscription(ctx, chat)
err = encodeError(err)
}
if err == nil {
u := telebot.Update{
Message: &telebot.Message{
Chat: &telebot.Chat{
ID: chatId,
},
},
}
tgCtx := c.tgBot.NewContext(u)
r := chats.NewReader(tgCtx, c.clientAwk, c.chatStor, chatId, subId, groupId, userId, c.msgFmt, intervalDefault)
go r.Run(context.Background(), c.log)
groupIdCtx := metadata.AppendToOutgoingContext(ctx, service.KeyGroupId, groupId)
var d subscription.Data
d, err = c.clientAwk.ReadSubscription(groupIdCtx, userId, subId)
if err == nil {
_ = tgCtx.Send(fmt.Sprintf("New subscription \"%s\" is linked to this chat", d.Description))
}
}
return
}

func (c controller) Unsubscribe(ctx context.Context, req *UnsubscribeRequest) (resp *UnsubscribeResponse, err error) {
chats.StopChatReader(req.SubId)
err = c.chatStor.UnlinkSubscription(ctx, req.SubId)
err = encodeError(err)
return
}

func encodeError(src error) (dst error) {
switch {
case src == nil:
Expand Down
10 changes: 9 additions & 1 deletion api/grpc/tgbot/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,15 @@ var log = slog.Default()
func TestMain(m *testing.M) {
go func() {
srv := grpc.NewServer()
c := NewController([]byte("6668123457:ZAJALGCBOGw8q9k2yBidb6kepmrBVGOrBLb"), messages.ChanPostHandler{})
c := NewController(
[]byte("6668123457:ZAJALGCBOGw8q9k2yBidb6kepmrBVGOrBLb"),
messages.ChanPostHandler{},
nil,
slog.Default(),
nil,
nil,
messages.Format{},
)
RegisterServiceServer(srv, c)
reflection.Register(srv)
grpc_health_v1.RegisterHealthServer(srv, health.NewServer())
Expand Down
18 changes: 18 additions & 0 deletions api/grpc/tgbot/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import "google/protobuf/timestamp.proto";
service Service {
rpc Authenticate(AuthenticateRequest) returns (AuthenticateResponse);
rpc ListChannels(ListChannelsRequest) returns (ListChannelsResponse);
rpc Subscribe(SubscribeRequest) returns (SubscribeResponse);
rpc Unsubscribe(UnsubscribeRequest) returns (UnsubscribeResponse);
}

message AuthenticateRequest {
Expand Down Expand Up @@ -43,3 +45,19 @@ message Channel {
message Filter {
string pattern = 1;
}

message SubscribeRequest {
string subId = 1;
string groupId = 2;
string userId = 3;
}

message SubscribeResponse {
}

message UnsubscribeRequest {
string subId = 1;
}

message UnsubscribeResponse {
}
5 changes: 0 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,6 @@ type FeedsConfig struct {
Uri string `envconfig:"API_SOURCE_FEEDS_URI" default:"source-feeds:50051" required:"true"`
}

type TelegramConfig struct {
GroupId string `envconfig:"API_SOURCE_TELEGRAM_GROUP_ID" default:"com.github.awakari.source-telegram"`
Uri string `envconfig:"API_SOURCE_TELEGRAM_URI" default:"source-telegram:50051" required:"true"`
}

type SitesConfig struct {
Uri string `envconfig:"API_SOURCE_SITES_URI" default:"source-sites:50051" required:"true"`
}
Expand Down
28 changes: 18 additions & 10 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,16 +174,6 @@ func main() {
}
defer chanPostHandler.Close()

// init the Telegram Bot grpc service
controllerGrpc := grpcApiTgBot.NewController([]byte(cfg.Api.Telegram.Token), chanPostHandler)
go func() {
log.Info(fmt.Sprintf("starting to listen the grpc API @ port #%d...", cfg.Api.Telegram.Bot.Port))
err = grpcApi.Serve(cfg.Api.Telegram.Bot.Port, controllerGrpc)
if err != nil {
panic(err)
}
}()

callbackHandlers := map[string]service.ArgHandlerFunc{
subscriptions.CmdDescription: subscriptions.DescriptionHandlerFunc(clientAwk, groupId),
subscriptions.CmdExtend: subExtHandler.RequestExtensionDaysCount,
Expand Down Expand Up @@ -284,6 +274,24 @@ func main() {
panic(err)
}

// init the Telegram Bot grpc service
controllerGrpc := grpcApiTgBot.NewController(
[]byte(cfg.Api.Telegram.Token),
chanPostHandler,
chatStor,
log,
clientAwk,
b,
msgFmt,
)
go func() {
log.Info(fmt.Sprintf("starting to listen the grpc API @ port #%d...", cfg.Api.Telegram.Bot.Port))
err = grpcApi.Serve(cfg.Api.Telegram.Bot.Port, controllerGrpc)
if err != nil {
panic(err)
}
}()

// resolve the donation invoice - should be pinned in the dedicated private channel
var dCh *telebot.Chat
dCh, err = b.ChatByID(cfg.Payment.DonationChatId)
Expand Down
2 changes: 1 addition & 1 deletion scripts/cover.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash

COVERAGE=$(cat cover.tmp)
THRESHOLD=9
THRESHOLD=8
if [[ ${COVERAGE} -lt ${THRESHOLD} ]]; \
then \
echo "FAILED: test coverage ${COVERAGE}% < ${THRESHOLD}%"; \
Expand Down
1 change: 1 addition & 0 deletions service/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ package service
const PageLimit = 10
const CmdLimit = 64
const KeyGroupId = "X-Awakari-Group-Id"
const PrefixUserId = "tg://user?id="
const FmtUserId = "tg://user?id=%d"
2 changes: 1 addition & 1 deletion service/messages/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (cp ChanPostHandler) Publish(tgCtx telebot.Context) (err error) {
Id: uuid.NewString(),
Source: fmt.Sprintf("https://t.me/%s", chanUserName),
SpecVersion: attrValSpecVersion,
Type: "com.github.awakari.bot-telegram.v1",
Type: "com.awakari.bot-telegram.v1",
}
if err == nil {
err = toCloudEvent(tgCtx.Message(), tgCtx.Text(), &evt)
Expand Down
10 changes: 5 additions & 5 deletions service/messages/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,11 @@ func (f Format) convertExtraAttrs(evt *pb.CloudEvent, mode FormatMode, trunc boo
case "awakariuserid": // do not expose
case "awkhash": // internal, useless
case "awkinternal": // internal
case "feedcategories":
case "feeddescription":
case "feedimagetitle":
case "feedimageurl":
case "feedtitle":
case "srccategories":
case "srcdescription":
case "srcimagetitle":
case "srcimageurl":
case "srctitle":
default:
switch vt := attrVal.Attr.(type) {
case *pb.CloudEventAttributeValue_CeBoolean:
Expand Down
2 changes: 1 addition & 1 deletion service/messages/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func PublishBasicReplyHandlerFunc(
Id: uuid.NewString(),
Source: "https://t.me/" + tgCtx.Chat().Username,
SpecVersion: attrValSpecVersion,
Type: "com.github.awakari.bot-telegram.v1",
Type: "com.awakari.bot-telegram.v1",
}
if err == nil {
defer w.Close()
Expand Down

0 comments on commit 07216db

Please sign in to comment.