Skip to content

Commit

Permalink
feat: webhooks
Browse files Browse the repository at this point in the history
  • Loading branch information
akurilov committed May 17, 2024
1 parent fa0b7d6 commit 43b9406
Show file tree
Hide file tree
Showing 27 changed files with 622 additions and 1,418 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/staging.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,4 @@ jobs:
run: |
helm upgrade --install ${COMPONENT} ${COMPONENT}-0.0.0.tgz \
--values helm/bot-telegram/values-gke-cluster-0.yaml \
--set podAnnotations.commit=$(git rev-parse --short HEAD),api.telegram.support.chat.id=${{ secrets.TG_BOT_SUPPORT_CHAT_ID }},api.telegram.webhook.token=${{ secrets.API_TELEGRAM_WEBHOOK_TOKEN }},chats.db.password.secret.enabled=false,chats.db.protocol=mongodb+srv,chats.db.tls.enabled=true,chats.db.tls.insecure=true,chats.db.hostname=${{ secrets.DB_HOST_DEMO }},chats.db.username=${{ secrets.DB_USERNAME_DEMO }},chats.db.password.raw=${{ secrets.DB_PASSWORD_DEMO }}
--set podAnnotations.commit=$(git rev-parse --short HEAD),api.telegram.support.chat.id=${{ secrets.TG_BOT_SUPPORT_CHAT_ID }},api.telegram.webhook.token=${{ secrets.API_TELEGRAM_WEBHOOK_TOKEN }}
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.22.1-alpine3.19 AS builder
FROM golang:1.22.3-alpine3.19 AS builder
WORKDIR /go/src/bot-telegram
COPY . .
RUN \
Expand Down
76 changes: 25 additions & 51 deletions api/grpc/tgbot/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,57 +4,54 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/awakari/bot-telegram/api/http/reader"
"github.com/awakari/bot-telegram/service"
"github.com/awakari/bot-telegram/service/chats"
"github.com/awakari/bot-telegram/service/messages"
"github.com/awakari/bot-telegram/service/subscriptions"
"github.com/awakari/client-sdk-go/api"
"github.com/awakari/client-sdk-go/model/subscription"
tgverifier "github.com/electrofocus/telegram-auth-verifier"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
"gopkg.in/telebot.v3"
"log/slog"
"strconv"
"strings"
"time"
)

type Controller interface {
ServiceServer
}

type controller struct {
secretToken []byte
cp messages.ChanPostHandler
chatStor chats.Storage
log *slog.Logger
clientAwk api.Client
tgBot *telebot.Bot
msgFmt messages.Format
secretToken []byte
cp messages.ChanPostHandler
svcReader reader.Service
urlCallbackBase string
log *slog.Logger
clientAwk api.Client
tgBot *telebot.Bot
msgFmt messages.Format
}

const intervalDefault = 1 * time.Minute

func NewController(
secretToken []byte,
cp messages.ChanPostHandler,
chatStor chats.Storage,
svcReader reader.Service,
urlCallbackBase string,
log *slog.Logger,
clientAwk api.Client,
tgBot *telebot.Bot,
msgFmt messages.Format,
) Controller {
return controller{
secretToken: secretToken,
cp: cp,
chatStor: chatStor,
log: log,
clientAwk: clientAwk,
tgBot: tgBot,
msgFmt: msgFmt,
secretToken: secretToken,
cp: cp,
svcReader: svcReader,
urlCallbackBase: urlCallbackBase,
log: log,
clientAwk: clientAwk,
tgBot: tgBot,
msgFmt: msgFmt,
}
}

Expand Down Expand Up @@ -105,7 +102,6 @@ func (c controller) ListChannels(ctx context.Context, req *ListChannelsRequest)

func (c controller) Subscribe(ctx context.Context, req *SubscribeRequest) (resp *SubscribeResponse, err error) {
subId := req.SubId
groupId := req.GroupId
userId := req.UserId
if !strings.HasPrefix(userId, service.PrefixUserId) {
err = status.Error(codes.InvalidArgument, fmt.Sprintf("User id should have prefix: %s, got: %s", service.PrefixUserId, userId))
Expand All @@ -118,40 +114,18 @@ func (c controller) Subscribe(ctx context.Context, req *SubscribeRequest) (resp
}
}
if err == nil {
chat := chats.Chat{
Id: chatId,
SubId: subId,
GroupId: groupId,
UserId: userId,
MinInterval: intervalDefault,
}
err = c.chatStor.LinkSubscription(ctx, chat)
err = c.svcReader.CreateCallback(ctx, subId, reader.MakeCallbackUrl(c.urlCallbackBase, chatId))
err = encodeError(err)
}
if err == nil {
u := telebot.Update{
Message: &telebot.Message{
Chat: &telebot.Chat{
ID: chatId,
},
},
}
tgCtx := c.tgBot.NewContext(u)
r := chats.NewReader(tgCtx, c.clientAwk, c.chatStor, chatId, subId, groupId, userId, c.msgFmt, intervalDefault)
go r.Run(context.Background(), c.log)
groupIdCtx := metadata.AppendToOutgoingContext(ctx, service.KeyGroupId, groupId)
var d subscription.Data
d, err = c.clientAwk.ReadSubscription(groupIdCtx, userId, subId)
if err == nil {
_ = tgCtx.Send(fmt.Sprintf(subscriptions.MsgFmtChatLinked, d.Description, intervalDefault), telebot.ModeHTML, telebot.NoPreview)
}
}
return
}

func (c controller) Unsubscribe(ctx context.Context, req *UnsubscribeRequest) (resp *UnsubscribeResponse, err error) {
chats.StopChatReader(req.SubId)
err = c.chatStor.UnlinkSubscription(ctx, req.SubId)
var cb reader.Callback
cb, err = c.svcReader.GetCallback(ctx, req.SubId)
if err == nil {
err = c.svcReader.DeleteCallback(ctx, req.SubId, cb.Url)
}
err = encodeError(err)
return
}
Expand Down
1 change: 1 addition & 0 deletions api/grpc/tgbot/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func TestMain(m *testing.M) {
[]byte("6668123457:ZAJALGCBOGw8q9k2yBidb6kepmrBVGOrBLb"),
messages.ChanPostHandler{},
nil,
"",
slog.Default(),
nil,
nil,
Expand Down
29 changes: 29 additions & 0 deletions api/http/reader/callback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package reader

import (
"fmt"
"strconv"
"strings"
)

type Callback struct {
Url string `json:"url"`
Format string `json:"fmt"`
}

func MakeCallbackUrl(urlBase string, chatId int64) string {
return urlBase + "/" + strconv.FormatInt(chatId, 10)
}

func GetCallbackUrlChatId(cbUrl string) (chatId int64, err error) {
cbUrlParts := strings.Split(cbUrl, "/")
cbUrlPartsLen := len(cbUrlParts)
if cbUrlPartsLen < 1 {
err = fmt.Errorf("invalid callback url")
}
if err == nil {
chatIdStr := cbUrlParts[cbUrlPartsLen-1]
chatId, err = strconv.ParseInt(chatIdStr, 10, 64)
}
return
}
50 changes: 50 additions & 0 deletions api/http/reader/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package reader

import (
"context"
"fmt"
"log/slog"
)

type serviceLogging struct {
svc Service
log *slog.Logger
}

func NewServiceLogging(svc Service, log *slog.Logger) Service {
return serviceLogging{
svc: svc,
log: log,
}
}

func (sl serviceLogging) CreateCallback(ctx context.Context, subId, url string) (err error) {
err = sl.svc.CreateCallback(ctx, subId, url)
ll := sl.logLevel(err)
sl.log.Log(ctx, ll, fmt.Sprintf("reader.CreateCallback(%s, %s): err=%s", subId, url, err))
return
}

func (sl serviceLogging) GetCallback(ctx context.Context, subId string) (cb Callback, err error) {
cb, err = sl.svc.GetCallback(ctx, subId)
ll := sl.logLevel(err)
sl.log.Log(ctx, ll, fmt.Sprintf("reader.GetCallback(%s): %+v, err=%s", subId, cb, err))
return
}

func (sl serviceLogging) DeleteCallback(ctx context.Context, subId, url string) (err error) {
err = sl.svc.DeleteCallback(ctx, subId, url)
ll := sl.logLevel(err)
sl.log.Log(ctx, ll, fmt.Sprintf("reader.DeleteCallback(%s, %s): err=%s", subId, url, err))
return
}

func (sl serviceLogging) logLevel(err error) (lvl slog.Level) {
switch err {
case nil:
lvl = slog.LevelInfo
default:
lvl = slog.LevelError
}
return
}
105 changes: 105 additions & 0 deletions api/http/reader/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package reader

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
)

type Service interface {
CreateCallback(ctx context.Context, subId, url string) (err error)
GetCallback(ctx context.Context, subId string) (cb Callback, err error)
DeleteCallback(ctx context.Context, subId, url string) (err error)
}

type service struct {
clientHttp *http.Client
uriBase string
}

const keyHubCallback = "hub.callback"
const KeyHubMode = "hub.mode"
const KeyHubTopic = "hub.topic"
const modeSubscribe = "subscribe"
const modeUnsubscribe = "unsubscribe"
const fmtTopicUri = "%s/sub/%s/%s"
const FmtJson = "json"

var ErrInternal = errors.New("internal failure")
var ErrConflict = errors.New("conflict")
var ErrNotFound = errors.New("not found")

func NewService(clientHttp *http.Client, uriBase string) Service {
return service{
clientHttp: clientHttp,
uriBase: uriBase,
}
}

func (svc service) CreateCallback(ctx context.Context, subId, callbackUrl string) (err error) {
err = svc.updateCallback(ctx, subId, callbackUrl, modeSubscribe)
return
}

func (svc service) GetCallback(ctx context.Context, subId string) (cb Callback, err error) {
var resp *http.Response
resp, err = svc.clientHttp.Get(fmt.Sprintf("/callbacks/%s/%s", svc.uriBase, subId))
switch err {
case nil:
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusOK:
err = json.NewDecoder(resp.Body).Decode(&cb)
if err != nil {
err = fmt.Errorf("%w: %s", ErrInternal, err)
}
case http.StatusNotFound:
err = ErrNotFound
default:
err = fmt.Errorf("%w: response status %d", ErrInternal, resp.StatusCode)
}
err = json.NewDecoder(resp.Body).Decode(&cb)
default:
err = fmt.Errorf("%w: %s", ErrInternal, err)
}
return
}

func (svc service) DeleteCallback(ctx context.Context, subId, callbackUrl string) (err error) {
err = svc.updateCallback(ctx, subId, callbackUrl, modeUnsubscribe)
return
}

func (svc service) updateCallback(_ context.Context, subId, url, mode string) (err error) {
topicUri := fmt.Sprintf(fmtTopicUri, svc.uriBase, FmtJson, subId)
data := map[string][]string{
keyHubCallback: {
url,
},
KeyHubMode: {
mode,
},
KeyHubTopic: {
topicUri,
},
}
var resp *http.Response
resp, err = svc.clientHttp.PostForm(topicUri, data)
switch err {
case nil:
switch resp.StatusCode {
case http.StatusAccepted, http.StatusNoContent:
case http.StatusNotFound:
err = fmt.Errorf("%w: callback not found for the subscription %s", ErrConflict, subId)
case http.StatusConflict:
err = fmt.Errorf("%w: callback already registered for the subscription %s", ErrConflict, subId)
default:
err = fmt.Errorf("%w: unexpected create callback response %d", ErrInternal, resp.StatusCode)
}
default:
err = fmt.Errorf("%w: %s", ErrInternal, err)
}
return
}
Loading

0 comments on commit 43b9406

Please sign in to comment.