Skip to content

Commit

Permalink
fix: incorrect stats captured at gateway (#2710)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth authored and atzoum committed Nov 17, 2022
1 parent 3376dc9 commit a6c1a16
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 57 deletions.
105 changes: 58 additions & 47 deletions gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,32 +191,32 @@ type HandleT struct {
whProxy http.Handler
}

func (gateway *HandleT) updateSourceStats(sourceStats map[string]int, bucket string, sourceTagMap map[string]string) {
for sourceTag, count := range sourceStats {
func (gateway *HandleT) updateSourceStats(sourceStats map[string]int, bucket string, sourceTagMap map[string]map[string]string) {
for sourceTag := range sourceStats {
tags := map[string]string{
"source": sourceTag,
"writeKey": sourceTagMap[sourceTag],
"reqType": sourceTagMap["reqType"],
"workspaceId": sourceTagMap["workspaceId"],
"sourceID": sourceTagMap["sourceID"],
"writeKey": sourceTagMap[sourceTag]["writeKey"],
"reqType": sourceTagMap[sourceTag]["reqType"],
"workspaceId": sourceTagMap[sourceTag]["workspaceId"],
"sourceID": sourceTagMap[sourceTag]["sourceID"],
}
sourceStatsD := gateway.stats.NewTaggedStat(bucket, stats.CountType, tags)
sourceStatsD.Count(count)
sourceStatsD.Count(sourceStats[sourceTag])
}
}

func (gateway *HandleT) updateFailedSourceStats(sourceStats map[string]int, bucket string, sourceTagMap map[string]string) {
for sourceTag, count := range sourceStats {
func (gateway *HandleT) updateFailedSourceStats(sourceStats map[string]int, bucket string, sourceTagMap map[string]map[string]string) {
for sourceTag := range sourceStats {
tags := map[string]string{
"source": sourceTag,
"writeKey": sourceTagMap[sourceTag],
"reqType": sourceTagMap["reqType"],
"workspaceId": sourceTagMap["workspaceId"],
"reason": sourceTagMap["reason"],
"sourceID": sourceTagMap["sourceID"],
"writeKey": sourceTagMap[sourceTag]["writeKey"],
"reqType": sourceTagMap[sourceTag]["reqType"],
"workspaceId": sourceTagMap[sourceTag]["workspaceId"],
"reason": sourceTagMap[sourceTag]["reason"],
"sourceID": sourceTagMap[sourceTag]["sourceID"],
}
sourceStatsD := gateway.stats.NewTaggedStat(bucket, stats.CountType, tags)
sourceStatsD.Count(count)
sourceStatsD.Count(sourceStats[sourceTag])
}
}

Expand Down Expand Up @@ -458,7 +458,7 @@ func (gateway *HandleT) userWebRequestWorkerProcess(userWebRequestWorker *userWe
sourceFailStats := make(map[string]int)
sourceFailEventStats := make(map[string]int)
workspaceDropRequestStats := make(map[string]int)
sourceTagMap := make(map[string]string)
sourceTagMap := make(map[string]map[string]string)
var preDbStoreCount int
// Saving the event data read from req.request.Body to the splice.
// Using this to send event schema to the config backend.
Expand All @@ -468,25 +468,28 @@ func (gateway *HandleT) userWebRequestWorkerProcess(userWebRequestWorker *userWe
writeKey := req.writeKey
sourceTag := gateway.getSourceTagFromWriteKey(writeKey)
sourceID := gateway.getSourceIDForWriteKey(writeKey)
sourceTagMap[sourceTag] = writeKey
sourceTagMap["sourceID"] = sourceID
sourceTagMap["reqType"] = req.reqType
if _, ok := sourceTagMap[sourceTag]; !ok {
sourceTagMap[sourceTag] = make(map[string]string)
}
sourceTagMap[sourceTag][`writeKey`] = writeKey
sourceTagMap[sourceTag]["sourceID"] = sourceID
sourceTagMap[sourceTag]["reqType"] = req.reqType
userIDHeader := req.userIDHeader
misc.IncrementMapByKey(sourceStats, sourceTag, 1)
// Should be function of body
configSubscriberLock.RLock()
workspaceId := enabledWriteKeyWorkspaceMap[writeKey]
configSubscriberLock.RUnlock()

sourceTagMap["workspaceId"] = workspaceId
sourceTagMap[sourceTag]["workspaceId"] = workspaceId
ipAddr := req.ipAddr

body := req.requestPayload

if !gjson.ValidBytes(body) {
req.done <- response.GetStatus(response.InvalidJSON)
preDbStoreCount++
sourceTagMap["reason"] = "invalidJSON"
sourceTagMap[sourceTag]["reason"] = "invalidJSON"
misc.IncrementMapByKey(sourceFailStats, sourceTag, 1)
continue
}
Expand All @@ -496,15 +499,15 @@ func (gateway *HandleT) userWebRequestWorkerProcess(userWebRequestWorker *userWe
if req.reqType != "batch" {
body, err = sjson.SetBytes(body, "type", req.reqType)
if err != nil {
sourceTagMap["reason"] = "notRudderEvent"
sourceTagMap[sourceTag]["reason"] = "notRudderEvent"
req.done <- response.GetStatus(response.NotRudderEvent)
preDbStoreCount++
misc.IncrementMapByKey(sourceFailStats, sourceTag, 1)
continue
}
body, err = sjson.SetRawBytes(BatchEvent, "batch.0", body)
if err != nil {
sourceTagMap["reason"] = "notRudderEvent"
sourceTagMap[sourceTag]["reason"] = "notRudderEvent"
req.done <- response.GetStatus(response.NotRudderEvent)
preDbStoreCount++
misc.IncrementMapByKey(sourceFailStats, sourceTag, 1)
Expand All @@ -518,7 +521,7 @@ func (gateway *HandleT) userWebRequestWorkerProcess(userWebRequestWorker *userWe
// this prevents not setting sourceID in gw job if disabled before setting it

if !gateway.isValidWriteKey(writeKey) {
sourceTagMap["reason"] = "invalidWriteKey"
sourceTagMap[sourceTag]["reason"] = "invalidWriteKey"
req.done <- response.GetStatus(response.InvalidWriteKey)
preDbStoreCount++
misc.IncrementMapByKey(sourceFailStats, sourceTag, 1)
Expand All @@ -527,7 +530,7 @@ func (gateway *HandleT) userWebRequestWorkerProcess(userWebRequestWorker *userWe
}

if !gateway.isWriteKeyEnabled(writeKey) {
sourceTagMap["reason"] = "sourceDisabled"
sourceTagMap[sourceTag]["reason"] = "sourceDisabled"
req.done <- response.GetStatus(response.SourceDisabled)
preDbStoreCount++
misc.IncrementMapByKey(sourceFailStats, sourceTag, 1)
Expand Down Expand Up @@ -594,7 +597,7 @@ func (gateway *HandleT) userWebRequestWorkerProcess(userWebRequestWorker *userWe
})

if len(body) > maxReqSize && !containsAudienceList {
sourceTagMap["reason"] = "requestBodyTooLarge"
sourceTagMap[sourceTag]["reason"] = "requestBodyTooLarge"
req.done <- response.GetStatus(response.RequestBodyTooLarge)
preDbStoreCount++
misc.IncrementMapByKey(sourceFailStats, sourceTag, 1)
Expand All @@ -605,15 +608,15 @@ func (gateway *HandleT) userWebRequestWorkerProcess(userWebRequestWorker *userWe
body, _ = sjson.SetBytes(body, "batch", out)

if notIdentifiable {
sourceTagMap["reason"] = "nonIdentifiableRequest"
sourceTagMap[sourceTag]["reason"] = "nonIdentifiableRequest"
req.done <- response.GetStatus(response.NonIdentifiableRequest)
preDbStoreCount++
misc.IncrementMapByKey(sourceFailStats, "notIdentifiable", 1)
continue
}

if nonRudderEvent {
sourceTagMap["reason"] = "notRudderEvent"
sourceTagMap[sourceTag]["reason"] = "notRudderEvent"
req.done <- response.GetStatus(response.NotRudderEvent)
preDbStoreCount++
misc.IncrementMapByKey(sourceFailStats, sourceTag, 1)
Expand Down Expand Up @@ -1128,16 +1131,20 @@ func (gateway *HandleT) getPayloadAndWriteKey(_ http.ResponseWriter, r *http.Req
if !ok || writeKey == "" {
err = errors.New(response.NoWriteKeyInBasicAuth)
misc.IncrementMapByKey(sourceFailStats, "noWriteKey", 1)
gateway.updateFailedSourceStats(sourceFailStats, "gateway.write_key_failed_requests", map[string]string{
"noWriteKey": "noWriteKey",
"reqType": reqType,
"reason": "noWriteKeyInBasicAuth",
"sourceID": sourceID,
gateway.updateFailedSourceStats(sourceFailStats, "gateway.write_key_failed_requests", map[string]map[string]string{
"noWriteKey": {
"writeKey": "noWriteKey",
"reqType": reqType,
"reason": "noWriteKeyInBasicAuth",
"sourceID": sourceID,
},
})
gateway.updateSourceStats(sourceFailStats, "gateway.write_key_requests", map[string]string{
"noWriteKey": "noWriteKey",
"reqType": reqType,
"sourceID": sourceID,
gateway.updateSourceStats(sourceFailStats, "gateway.write_key_requests", map[string]map[string]string{
"noWriteKey": {
"writeKey": "noWriteKey",
"reqType": reqType,
"sourceID": sourceID,
},
})

return []byte{}, "", err
Expand All @@ -1146,16 +1153,20 @@ func (gateway *HandleT) getPayloadAndWriteKey(_ http.ResponseWriter, r *http.Req
if err != nil {
sourceTag := gateway.getSourceTagFromWriteKey(writeKey)
misc.IncrementMapByKey(sourceFailStats, sourceTag, 1)
gateway.updateSourceStats(sourceFailStats, "gateway.write_key_failed_requests", map[string]string{
sourceTag: writeKey,
"reqType": reqType,
"reason": "requestBodyReadFailed",
"sourceID": sourceID,
gateway.updateSourceStats(sourceFailStats, "gateway.write_key_failed_requests", map[string]map[string]string{
sourceTag: {
"reqType": reqType,
"reason": "requestBodyReadFailed",
"sourceID": sourceID,
"writeKey": writeKey,
},
})
gateway.updateSourceStats(sourceFailStats, "gateway.write_key_requests", map[string]string{
sourceTag: writeKey,
"reqType": reqType,
"sourceID": sourceID,
gateway.updateSourceStats(sourceFailStats, "gateway.write_key_requests", map[string]map[string]string{
sourceTag: {
"reqType": reqType,
"sourceID": sourceID,
"writeKey": writeKey,
},
})
return []byte{}, writeKey, err
}
Expand Down Expand Up @@ -1591,7 +1602,7 @@ func (gateway *HandleT) IncrementAckCount(count uint64) {
}

// UpdateSourceStats creates a new stat for every writekey and updates it with the corresponding count
func (gateway *HandleT) UpdateSourceStats(sourceStats map[string]int, bucket string, sourceTagMap map[string]string) {
func (gateway *HandleT) UpdateSourceStats(sourceStats map[string]int, bucket string, sourceTagMap map[string]map[string]string) {
gateway.updateSourceStats(sourceStats, bucket, sourceTagMap)
}

Expand Down
4 changes: 2 additions & 2 deletions gateway/webhook/setup.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//go:generate mockgen --build_flags=--mod=mod -destination=../../../mocks/gateway/webhook/mock_webhook.go -package mock_webhook github.com/rudderlabs/rudder-server/gateway/webhook GatewayI
//go:generate mockgen --build_flags=--mod=mod -destination=./../../mocks/gateway/webhook/mock_webhook.go -package mock_webhook github.com/rudderlabs/rudder-server/gateway/webhook GatewayI

package webhook

Expand All @@ -17,7 +17,7 @@ import (
type GatewayI interface {
IncrementRecvCount(count uint64)
IncrementAckCount(count uint64)
UpdateSourceStats(writeKeyStats map[string]int, bucket string, sourceTagMap map[string]string)
UpdateSourceStats(writeKeyStats map[string]int, bucket string, sourceTagMap map[string]map[string]string)
TrackRequestMetrics(errorMessage string)
ProcessWebRequest(writer *http.ResponseWriter, req *http.Request, reqType string, requestPayload []byte, writeKey string) string
GetWebhookSourceDefName(writeKey string) (name string, ok bool)
Expand Down
59 changes: 52 additions & 7 deletions gateway/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,17 @@ func parseWriteKey(req *http.Request) (writeKey string, found bool) {

func (webhook *HandleT) failRequest(w http.ResponseWriter, r *http.Request, reason string, code int, stat string) {
writeKeyFailStats := make(map[string]int)
statTags := map[string]map[string]string{
stat: {
"reqType": "webhook",
"reason": reason,
},
}
misc.IncrementMapByKey(writeKeyFailStats, stat, 1)
webhook.gwHandle.UpdateSourceStats(writeKeyFailStats, "gateway.write_key_failed_requests", map[string]string{stat: stat, "reqType": "webhook"})
webhook.gwHandle.UpdateSourceStats(writeKeyFailStats,
"gateway.write_key_failed_requests",
statTags,
)
statusCode := http.StatusBadRequest
if code != 0 {
statusCode = code
Expand All @@ -119,14 +128,26 @@ func (webhook *HandleT) RequestHandler(w http.ResponseWriter, r *http.Request) {

writeKey, ok := parseWriteKey(r)
if !ok {
webhook.failRequest(w, r, response.GetStatus(response.NoWriteKeyInQueryParams), response.GetErrorStatusCode(response.NoWriteKeyInQueryParams), "noWriteKey")
webhook.failRequest(
w,
r,
response.GetStatus(response.NoWriteKeyInQueryParams),
response.GetErrorStatusCode(response.NoWriteKeyInQueryParams),
"noWriteKey",
)
atomic.AddUint64(&webhook.ackCount, 1)
return
}

sourceDefName, ok := webhook.gwHandle.GetWebhookSourceDefName(writeKey)
if !ok {
webhook.failRequest(w, r, response.GetStatus(response.InvalidWriteKey), response.GetErrorStatusCode(response.InvalidWriteKey), writeKey)
webhook.failRequest(
w,
r,
response.GetStatus(response.InvalidWriteKey),
response.GetErrorStatusCode(response.InvalidWriteKey),
"invalidWriteKey",
)
atomic.AddUint64(&webhook.ackCount, 1)
return
}
Expand All @@ -140,14 +161,26 @@ func (webhook *HandleT) RequestHandler(w http.ResponseWriter, r *http.Request) {
contentType := r.Header.Get("Content-Type")
if strings.Contains(strings.ToLower(contentType), "application/x-www-form-urlencoded") {
if err := r.ParseForm(); err != nil {
webhook.failRequest(w, r, response.GetStatus(response.ErrorInParseForm), response.GetErrorStatusCode(response.ErrorInParseForm), "couldNotParseForm")
webhook.failRequest(
w,
r,
response.GetStatus(response.ErrorInParseForm),
response.GetErrorStatusCode(response.ErrorInParseForm),
"couldNotParseForm",
)
atomic.AddUint64(&webhook.ackCount, 1)
return
}
postFrom = r.PostForm
} else if strings.Contains(strings.ToLower(contentType), "multipart/form-data") {
if err := r.ParseMultipartForm(32 << 20); err != nil {
webhook.failRequest(w, r, response.GetStatus(response.ErrorInParseMultiform), response.GetErrorStatusCode(response.ErrorInParseMultiform), "couldNotParseMultiform")
webhook.failRequest(
w,
r,
response.GetStatus(response.ErrorInParseMultiform),
response.GetErrorStatusCode(response.ErrorInParseMultiform),
"couldNotParseMultiform",
)
atomic.AddUint64(&webhook.ackCount, 1)
return
}
Expand All @@ -160,14 +193,26 @@ func (webhook *HandleT) RequestHandler(w http.ResponseWriter, r *http.Request) {
if r.MultipartForm != nil {
jsonByte, err = json.Marshal(multipartForm)
if err != nil {
webhook.failRequest(w, r, response.GetStatus(response.ErrorInMarshal), response.GetErrorStatusCode(response.ErrorInMarshal), "couldNotMarshal")
webhook.failRequest(
w,
r,
response.GetStatus(response.ErrorInMarshal),
response.GetErrorStatusCode(response.ErrorInMarshal),
"couldNotMarshal",
)
atomic.AddUint64(&webhook.ackCount, 1)
return
}
} else if len(postFrom) != 0 {
jsonByte, err = json.Marshal(postFrom)
if err != nil {
webhook.failRequest(w, r, response.GetStatus(response.ErrorInMarshal), response.GetErrorStatusCode(response.ErrorInMarshal), "couldNotMarshal")
webhook.failRequest(
w,
r,
response.GetStatus(response.ErrorInMarshal),
response.GetErrorStatusCode(response.ErrorInMarshal),
"couldNotMarshal",
)
atomic.AddUint64(&webhook.ackCount, 1)
return
}
Expand Down
2 changes: 1 addition & 1 deletion mocks/gateway/webhook/mock_webhook.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit a6c1a16

Please sign in to comment.