Skip to content

Commit

Permalink
feat(collect-agent ): UT for collector-agent (#536) (#537)
Browse files Browse the repository at this point in the history
* collector-agent and testcase
* c/cpp changed

close #535
  • Loading branch information
eeliu committed Mar 28, 2024
1 parent c36b4d8 commit dfc5e0a
Show file tree
Hide file tree
Showing 10 changed files with 330 additions and 88 deletions.
32 changes: 31 additions & 1 deletion collector-agent/agent/AgentRouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type TSpan struct {
ServerType int32 `json:"stp,string"`
TransactionId string `json:"tid"`
Uri string `json:"uri"`
UT string `json:"UT,omitempty"`
EndPoint string `json:"server"`
RemoteAddr string `json:"client"`
AcceptorHost string `json:"Ah"`
Expand All @@ -100,6 +101,35 @@ type TSpan struct {
ApacheHeader string `json:"AP,omitempty"`
}

func (span *TSpan) IsFailed() bool {
if span.ErrorInfo != nil || len(span.ExceptionInfo) > 0 {
return true
}
return false
}

//note
// FindHistogramLevel must come with histogramSize
func (span *TSpan) FindHistogramLevel() int {
if span.GetElapsedTime() <= 100 {
return 0
} else if span.GetElapsedTime() <= 300 {
return 1
} else if span.GetElapsedTime() <= 500 {
return 2
} else if span.GetElapsedTime() <= 1000 {
return 3
} else if span.GetElapsedTime() <= 3000 {
return 4
} else if span.GetElapsedTime() <= 5000 {
return 5
} else if span.GetElapsedTime() <= 8000 {
return 6
} else {
return 7
}
}

func (span *TSpan) GetAppServerType() int32 {
if span.AppServerTypeV2 != 0 {
return span.AppServerTypeV2
Expand Down Expand Up @@ -237,7 +267,7 @@ func (manager *AgentRouter) DispatchPacket(packet *RawPacket) error {
log.Debug("Read-lock is release")
}

agent.CheckValid(appname, serverType) // CA just checking the name and ft
agent.CheckValid(span)
agent.SendSpan(span)
return nil

Expand Down
67 changes: 51 additions & 16 deletions collector-agent/agent/GrpcAgent.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type GrpcAgent struct {
spanSender SpanSender
AgentOnLine bool
requestCounter RequestProfiler
utReport *UrlTemplateReport
tasksGroup sync.WaitGroup
tSpanCh chan *TSpan
ExitCh chan bool
Expand Down Expand Up @@ -64,6 +65,9 @@ func (agent *GrpcAgent) Interceptor(_ *TSpan) bool {
if !agent.AgentOnLine {
agent.log.Debugf("span dropped,as agent offline")
}

//note log url templated

return agent.AgentOnLine
}

Expand Down Expand Up @@ -171,6 +175,9 @@ func (agent *GrpcAgent) registerFilter() {
agent.log.Debug("register requestCounter filter")
agent.AddFilter(&agent.requestCounter)

// req UrlTemplateReport
agent.log.Debug("register UrlTemplate Report filter")
agent.AddFilter(agent.utReport)
// send span
agent.log.Debug("register spanSender filter")
agent.AddFilter(&agent.spanSender)
Expand Down Expand Up @@ -206,20 +213,45 @@ func (agent *GrpcAgent) sendStat() {
return
}

for {
msg := CollectPStateMessage(agent.requestCounter.GetMaxAvg, agent.requestCounter.GetReqTimeProfiler)
// todo send agentstat
var wg sync.WaitGroup
wg.Add(1)
go func() {
for {
msg := CollectPStateMessage(agent.requestCounter.GetMaxAvg, agent.requestCounter.GetReqTimeProfiler)

agent.log.Infof("%v", msg)
if err := stream.Send(msg); err != nil {
agent.log.Warn(err)
return
}
//config.StatInterval
if common.WaitChannelEvent(agent.ExitCh, 0) == common.E_AGENT_STOPPING {
return
agent.log.Debugf("%v", msg)
if err := stream.Send(msg); err != nil {
agent.log.Warn(err)
break
}
//config.StatInterval
if common.WaitChannelEvent(agent.ExitCh, 0) == common.E_AGENT_STOPPING {
break
}
}
}
wg.Done()
}()
// wg.Add(1)
// todo send uri templated
wg.Add(1)
go func() {
for {
msg := agent.utReport.MoveUtReprot()

agent.log.Debugf("%v", msg)
if err := stream.Send(msg); err != nil {
agent.log.Warn(err)
break
}
//config.StatInterval
if common.WaitChannelEvent(agent.ExitCh, 30) == common.E_AGENT_STOPPING {
break
}
}
wg.Done()
}()
wg.Wait()
}

func (agent *GrpcAgent) uploadStatInfo() {
Expand Down Expand Up @@ -258,6 +290,8 @@ func (agent *GrpcAgent) Init(id, _name string, _type int32, StartTime string) {
"socketid": pingIdStr,
})

agent.utReport = CreateUrlTemplateReport()

config := common.GetConfig()

agent.tSpanCh = make(chan *TSpan, config.AgentChannelSize)
Expand Down Expand Up @@ -308,15 +342,15 @@ func (agent *GrpcAgent) collectorActiveThreadCount(conn *grpc.ClientConn, respon
res.TimeStamp = time.Now().Unix()
res.HistogramSchemaType = 2

agent.log.Infof("try to send PCmdActiveThreadCountRes:%v", res)
agent.log.Debugf("try to send PCmdActiveThreadCountRes:%v", res)

if err := activeThreadCountClient.Send(&res); err != nil {
agent.log.Infof("collectorActiveThreadCount:responseId:%d end with:%s", responseId, err)
agent.log.Warnf("collectorActiveThreadCount:responseId:%d end with:%s", responseId, err)
break
}

if common.WaitChannelEvent(agent.ExitCh, interval) == common.E_AGENT_STOPPING {
agent.log.Info("catch exit during send collectorActiveThreadCount")
agent.log.Warnf("catch exit during send collectorActiveThreadCount")
break
}
}
Expand Down Expand Up @@ -351,6 +385,7 @@ func (agent *GrpcAgent) handleCommand(conn *grpc.ClientConn, wg *sync.WaitGroup)
//config.AgentReTryTimeout
ctx, _ := common.BuildPinpointCtx(-1, agent.pingMd)

//todo update HandleCommand to HandleCommandV2
commandClient, err := client.HandleCommand(ctx)

if err != nil {
Expand Down Expand Up @@ -427,8 +462,8 @@ func (agent *GrpcAgent) consumeJsonSpan() {
}
}

func (agent *GrpcAgent) CheckValid(name string, ft int32) bool {
if name != agent.agentName || ft != agent.agentType {
func (agent *GrpcAgent) CheckValid(span *TSpan) bool {
if span.GetAppname() != agent.agentName || span.GetAppServerType() != agent.agentType {
agent.log.Warn("name or FT not equal")
return false
} else {
Expand Down
5 changes: 3 additions & 2 deletions collector-agent/agent/Stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (
type GetMaxAvg func() (max, avg uint32)
type GetReqTimeCounter func() [4]uint16

func CollectPStateMessage(getMacAvr GetMaxAvg, getReqTimeCounter GetReqTimeCounter) *v1.PStatMessage {
func CollectPStateMessage(getMaxAvr GetMaxAvg, getReqTimeCounter GetReqTimeCounter) *v1.PStatMessage {
config := common.GetConfig()
max, avg := getMacAvr()
max, avg := getMaxAvr()
responseTime := v1.PResponseTime{
Max: int64(max),
Avg: int64(avg),
Expand All @@ -35,6 +35,7 @@ func CollectPStateMessage(getMacAvr GetMaxAvg, getReqTimeCounter GetReqTimeCount
JvmGcOldTime: 0,
JvmGcDetailed: &v1.PJvmGcDetailed{},
}
// cpu.Percent calcuate cpu in config.StatInterval
totalPer, _ := cpu.Percent(config.StatInterval*time.Second, false)
totalCpuUsage := totalPer[0] / 100
cpuload := v1.PCpuLoad{
Expand Down
111 changes: 111 additions & 0 deletions collector-agent/agent/UrlTemplate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package agent

import (
"sync"
"time"

v1 "github.com/pinpoint-apm/pinpoint-c-agent/collector-agent/pinpoint-grpc-idl/proto/v1"
)

const bucketVersion = 0
const histogramSize = 8

type uriStatHistogram struct {
Total int64
Max int64
TimestampHistogram [histogramSize]int32
}

func (ust *uriStatHistogram) Update(span *TSpan) {
elapseTime := span.GetElapsedTime()
ust.Total += int64(elapseTime)

if int64(elapseTime) > ust.Max {
ust.Max = int64(elapseTime)
}
ust.TimestampHistogram[span.FindHistogramLevel()] += 1
}

func (ust *uriStatHistogram) ToUriHistogrm() *v1.PUriHistogram {
pbUriHistogram := &v1.PUriHistogram{
Total: ust.Total,
Max: ust.Max,
Histogram: ust.TimestampHistogram[:],
}
return pbUriHistogram
}

type statHisograms struct {
TotalHistogram uriStatHistogram
FailedHistogram uriStatHistogram
}

func (st *statHisograms) Update(span *TSpan) {
st.TotalHistogram.Update(span)
if span.IsFailed() {
st.FailedHistogram.Update(span)
}
}

type UrlTemplateReport struct {
uriMap map[string]*statHisograms
BucketVersion int32
mu sync.Mutex
}

func (utr *UrlTemplateReport) Interceptor(span *TSpan) bool {
if len(span.UT) > 0 {
// found uri templated
utr.updateUriSnapshot(span)
}
return true
}

func (utr *UrlTemplateReport) updateUriSnapshot(span *TSpan) {
utr.mu.Lock()
defer utr.mu.Unlock()
ut := span.UT
var st *statHisograms
var ok bool
if st, ok = utr.uriMap[ut]; !ok {
st = &statHisograms{}
utr.uriMap[ut] = st
}
st.Update(span)
}

func (utr *UrlTemplateReport) MoveUtReprot() *v1.PStatMessage {
utr.mu.Lock()
defer utr.mu.Unlock()

agentUriStat := &v1.PAgentUriStat{
BucketVersion: int32(utr.BucketVersion),
}

for url, st := range utr.uriMap {
eachUriStat := &v1.PEachUriStat{
Uri: url,
TotalHistogram: st.TotalHistogram.ToUriHistogrm(),
FailedHistogram: st.FailedHistogram.ToUriHistogrm(),
Timestamp: time.Now().UnixMilli(),
}
agentUriStat.EachUriStat = append(agentUriStat.EachUriStat, eachUriStat)
}
//note: create a new one
utr.uriMap = make(map[string]*statHisograms)
pbStat := &v1.PStatMessage{
Field: &v1.PStatMessage_AgentUriStat{
AgentUriStat: agentUriStat,
},
}

return pbStat
}

func CreateUrlTemplateReport() *UrlTemplateReport {
ut := &UrlTemplateReport{
uriMap: make(map[string]*statHisograms),
BucketVersion: bucketVersion,
}
return ut
}
62 changes: 62 additions & 0 deletions collector-agent/agent/UrlTemplate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package agent

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestUrlTemplateReport(t *testing.T) {
spans := []TSpan{
{
UT: "/hello",
Uri: "/hello",
ElapsedTime: 32,
},
{
UT: "/hello",
Uri: "/hello",
ElapsedTime: 320,
},
{
UT: "/hello",
Uri: "/hello",
ElapsedTime: 3200,
},
{
UT: "/hello_exp",
Uri: "/hello",
ElapsedTime: 32000,
ExceptionInfo: "exp",
},
}

ut := CreateUrlTemplateReport()
for _, span := range spans {
ut.Interceptor(&span)
}

if len(ut.uriMap) < 2 {
t.Log(len(ut.uriMap))
}

pbStatMessage := ut.MoveUtReprot()
t.Log(pbStatMessage)
assert.NotEqual(t, pbStatMessage.GetAgentUriStat(), nil, "GetAgentUriStat")

pbUriStat := pbStatMessage.GetAgentUriStat()

assert.Equal(t, pbUriStat.GetBucketVersion(), int32(0), "GetBucketVersion")

eachUriStat := pbUriStat.GetEachUriStat()

assert.Equal(t, len(eachUriStat), 2, "len(eachUriStat)")

assert.NotEqual(t, eachUriStat[0].GetFailedHistogram(), nil, "GetFailedHistogram")
assert.NotEqual(t, eachUriStat[0].GetTotalHistogram(), nil, "GetTotalHistogram")
totalHis := eachUriStat[0].GetTotalHistogram()
assert.Equal(t, len(totalHis.GetHistogram()), histogramSize, "len(totalHis.GetHistogram())")

assert.Equal(t, totalHis.Max, int64(3200), "totalHis.Max")

}
1 change: 1 addition & 0 deletions collector-agent/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/shirou/gopsutil v3.21.2+incompatible
github.com/shirou/gopsutil/v3 v3.21.2
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.6.1
github.com/x-cray/logrus-prefixed-formatter v0.5.2
google.golang.org/genproto v0.0.0-20200806141610-86f49bd18e98 // indirect
google.golang.org/grpc v1.36.0
Expand Down
2 changes: 2 additions & 0 deletions collector-agent/start-collector-agent.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ export PP_COLLECTOR_AGENT_ISDOCKER=false
# export PP_LOG_DIR=/tmp/
export PP_Log_Level=DEBUG
export PP_ADDRESS=0.0.0.0@9999
export GO_PATH=~/go
export PATH=$PATH:$GO_PATH/bin
make && ./collector-agent
Loading

0 comments on commit dfc5e0a

Please sign in to comment.