diff --git a/collector-agent/agent/AgentRouter.go b/collector-agent/agent/AgentRouter.go index 16cd93dc2..04afd4f2d 100644 --- a/collector-agent/agent/AgentRouter.go +++ b/collector-agent/agent/AgentRouter.go @@ -91,6 +91,7 @@ type TSpan struct { ServerType int32 `json:"stp,string"` TransactionId string `json:"tid"` Uri string `json:"uri"` + UT string `json:"UT,omitempty"` EndPoint string `json:"server"` RemoteAddr string `json:"client"` AcceptorHost string `json:"Ah"` @@ -100,6 +101,35 @@ type TSpan struct { ApacheHeader string `json:"AP,omitempty"` } +func (span *TSpan) IsFailed() bool { + if span.ErrorInfo != nil || len(span.ExceptionInfo) > 0 { + return true + } + return false +} + +//note +// FindHistogramLevel must come with histogramSize +func (span *TSpan) FindHistogramLevel() int { + if span.GetElapsedTime() <= 100 { + return 0 + } else if span.GetElapsedTime() <= 300 { + return 1 + } else if span.GetElapsedTime() <= 500 { + return 2 + } else if span.GetElapsedTime() <= 1000 { + return 3 + } else if span.GetElapsedTime() <= 3000 { + return 4 + } else if span.GetElapsedTime() <= 5000 { + return 5 + } else if span.GetElapsedTime() <= 8000 { + return 6 + } else { + return 7 + } +} + func (span *TSpan) GetAppServerType() int32 { if span.AppServerTypeV2 != 0 { return span.AppServerTypeV2 @@ -237,7 +267,7 @@ func (manager *AgentRouter) DispatchPacket(packet *RawPacket) error { log.Debug("Read-lock is release") } - agent.CheckValid(appname, serverType) // CA just checking the name and ft + agent.CheckValid(span) agent.SendSpan(span) return nil diff --git a/collector-agent/agent/GrpcAgent.go b/collector-agent/agent/GrpcAgent.go index 8c310197d..7cdf85d46 100644 --- a/collector-agent/agent/GrpcAgent.go +++ b/collector-agent/agent/GrpcAgent.go @@ -29,6 +29,7 @@ type GrpcAgent struct { spanSender SpanSender AgentOnLine bool requestCounter RequestProfiler + utReport *UrlTemplateReport tasksGroup sync.WaitGroup tSpanCh chan *TSpan ExitCh chan bool @@ -64,6 +65,9 @@ func (agent *GrpcAgent) Interceptor(_ *TSpan) bool { if !agent.AgentOnLine { agent.log.Debugf("span dropped,as agent offline") } + + //note log url templated + return agent.AgentOnLine } @@ -171,6 +175,9 @@ func (agent *GrpcAgent) registerFilter() { agent.log.Debug("register requestCounter filter") agent.AddFilter(&agent.requestCounter) + // req UrlTemplateReport + agent.log.Debug("register UrlTemplate Report filter") + agent.AddFilter(agent.utReport) // send span agent.log.Debug("register spanSender filter") agent.AddFilter(&agent.spanSender) @@ -206,20 +213,45 @@ func (agent *GrpcAgent) sendStat() { return } - for { - msg := CollectPStateMessage(agent.requestCounter.GetMaxAvg, agent.requestCounter.GetReqTimeProfiler) + // todo send agentstat + var wg sync.WaitGroup + wg.Add(1) + go func() { + for { + msg := CollectPStateMessage(agent.requestCounter.GetMaxAvg, agent.requestCounter.GetReqTimeProfiler) - agent.log.Infof("%v", msg) - if err := stream.Send(msg); err != nil { - agent.log.Warn(err) - return - } - //config.StatInterval - if common.WaitChannelEvent(agent.ExitCh, 0) == common.E_AGENT_STOPPING { - return + agent.log.Debugf("%v", msg) + if err := stream.Send(msg); err != nil { + agent.log.Warn(err) + break + } + //config.StatInterval + if common.WaitChannelEvent(agent.ExitCh, 0) == common.E_AGENT_STOPPING { + break + } } - } + wg.Done() + }() + // wg.Add(1) + // todo send uri templated + wg.Add(1) + go func() { + for { + msg := agent.utReport.MoveUtReprot() + agent.log.Debugf("%v", msg) + if err := stream.Send(msg); err != nil { + agent.log.Warn(err) + break + } + //config.StatInterval + if common.WaitChannelEvent(agent.ExitCh, 30) == common.E_AGENT_STOPPING { + break + } + } + wg.Done() + }() + wg.Wait() } func (agent *GrpcAgent) uploadStatInfo() { @@ -258,6 +290,8 @@ func (agent *GrpcAgent) Init(id, _name string, _type int32, StartTime string) { "socketid": pingIdStr, }) + agent.utReport = CreateUrlTemplateReport() + config := common.GetConfig() agent.tSpanCh = make(chan *TSpan, config.AgentChannelSize) @@ -308,15 +342,15 @@ func (agent *GrpcAgent) collectorActiveThreadCount(conn *grpc.ClientConn, respon res.TimeStamp = time.Now().Unix() res.HistogramSchemaType = 2 - agent.log.Infof("try to send PCmdActiveThreadCountRes:%v", res) + agent.log.Debugf("try to send PCmdActiveThreadCountRes:%v", res) if err := activeThreadCountClient.Send(&res); err != nil { - agent.log.Infof("collectorActiveThreadCount:responseId:%d end with:%s", responseId, err) + agent.log.Warnf("collectorActiveThreadCount:responseId:%d end with:%s", responseId, err) break } if common.WaitChannelEvent(agent.ExitCh, interval) == common.E_AGENT_STOPPING { - agent.log.Info("catch exit during send collectorActiveThreadCount") + agent.log.Warnf("catch exit during send collectorActiveThreadCount") break } } @@ -351,6 +385,7 @@ func (agent *GrpcAgent) handleCommand(conn *grpc.ClientConn, wg *sync.WaitGroup) //config.AgentReTryTimeout ctx, _ := common.BuildPinpointCtx(-1, agent.pingMd) + //todo update HandleCommand to HandleCommandV2 commandClient, err := client.HandleCommand(ctx) if err != nil { @@ -427,8 +462,8 @@ func (agent *GrpcAgent) consumeJsonSpan() { } } -func (agent *GrpcAgent) CheckValid(name string, ft int32) bool { - if name != agent.agentName || ft != agent.agentType { +func (agent *GrpcAgent) CheckValid(span *TSpan) bool { + if span.GetAppname() != agent.agentName || span.GetAppServerType() != agent.agentType { agent.log.Warn("name or FT not equal") return false } else { diff --git a/collector-agent/agent/Stat.go b/collector-agent/agent/Stat.go index f9f690d0c..8105b2382 100644 --- a/collector-agent/agent/Stat.go +++ b/collector-agent/agent/Stat.go @@ -15,9 +15,9 @@ import ( type GetMaxAvg func() (max, avg uint32) type GetReqTimeCounter func() [4]uint16 -func CollectPStateMessage(getMacAvr GetMaxAvg, getReqTimeCounter GetReqTimeCounter) *v1.PStatMessage { +func CollectPStateMessage(getMaxAvr GetMaxAvg, getReqTimeCounter GetReqTimeCounter) *v1.PStatMessage { config := common.GetConfig() - max, avg := getMacAvr() + max, avg := getMaxAvr() responseTime := v1.PResponseTime{ Max: int64(max), Avg: int64(avg), @@ -35,6 +35,7 @@ func CollectPStateMessage(getMacAvr GetMaxAvg, getReqTimeCounter GetReqTimeCount JvmGcOldTime: 0, JvmGcDetailed: &v1.PJvmGcDetailed{}, } + // cpu.Percent calcuate cpu in config.StatInterval totalPer, _ := cpu.Percent(config.StatInterval*time.Second, false) totalCpuUsage := totalPer[0] / 100 cpuload := v1.PCpuLoad{ diff --git a/collector-agent/agent/UrlTemplate.go b/collector-agent/agent/UrlTemplate.go new file mode 100644 index 000000000..0748d126d --- /dev/null +++ b/collector-agent/agent/UrlTemplate.go @@ -0,0 +1,111 @@ +package agent + +import ( + "sync" + "time" + + v1 "github.com/pinpoint-apm/pinpoint-c-agent/collector-agent/pinpoint-grpc-idl/proto/v1" +) + +const bucketVersion = 0 +const histogramSize = 8 + +type uriStatHistogram struct { + Total int64 + Max int64 + TimestampHistogram [histogramSize]int32 +} + +func (ust *uriStatHistogram) Update(span *TSpan) { + elapseTime := span.GetElapsedTime() + ust.Total += int64(elapseTime) + + if int64(elapseTime) > ust.Max { + ust.Max = int64(elapseTime) + } + ust.TimestampHistogram[span.FindHistogramLevel()] += 1 +} + +func (ust *uriStatHistogram) ToUriHistogrm() *v1.PUriHistogram { + pbUriHistogram := &v1.PUriHistogram{ + Total: ust.Total, + Max: ust.Max, + Histogram: ust.TimestampHistogram[:], + } + return pbUriHistogram +} + +type statHisograms struct { + TotalHistogram uriStatHistogram + FailedHistogram uriStatHistogram +} + +func (st *statHisograms) Update(span *TSpan) { + st.TotalHistogram.Update(span) + if span.IsFailed() { + st.FailedHistogram.Update(span) + } +} + +type UrlTemplateReport struct { + uriMap map[string]*statHisograms + BucketVersion int32 + mu sync.Mutex +} + +func (utr *UrlTemplateReport) Interceptor(span *TSpan) bool { + if len(span.UT) > 0 { + // found uri templated + utr.updateUriSnapshot(span) + } + return true +} + +func (utr *UrlTemplateReport) updateUriSnapshot(span *TSpan) { + utr.mu.Lock() + defer utr.mu.Unlock() + ut := span.UT + var st *statHisograms + var ok bool + if st, ok = utr.uriMap[ut]; !ok { + st = &statHisograms{} + utr.uriMap[ut] = st + } + st.Update(span) +} + +func (utr *UrlTemplateReport) MoveUtReprot() *v1.PStatMessage { + utr.mu.Lock() + defer utr.mu.Unlock() + + agentUriStat := &v1.PAgentUriStat{ + BucketVersion: int32(utr.BucketVersion), + } + + for url, st := range utr.uriMap { + eachUriStat := &v1.PEachUriStat{ + Uri: url, + TotalHistogram: st.TotalHistogram.ToUriHistogrm(), + FailedHistogram: st.FailedHistogram.ToUriHistogrm(), + Timestamp: time.Now().UnixMilli(), + } + agentUriStat.EachUriStat = append(agentUriStat.EachUriStat, eachUriStat) + } + //note: create a new one + utr.uriMap = make(map[string]*statHisograms) + pbStat := &v1.PStatMessage{ + Field: &v1.PStatMessage_AgentUriStat{ + AgentUriStat: agentUriStat, + }, + } + + return pbStat +} + +func CreateUrlTemplateReport() *UrlTemplateReport { + ut := &UrlTemplateReport{ + uriMap: make(map[string]*statHisograms), + BucketVersion: bucketVersion, + } + return ut +} diff --git a/collector-agent/agent/UrlTemplate_test.go b/collector-agent/agent/UrlTemplate_test.go new file mode 100644 index 000000000..a483624cc --- /dev/null +++ b/collector-agent/agent/UrlTemplate_test.go @@ -0,0 +1,62 @@ +package agent + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestUrlTemplateReport(t *testing.T) { + spans := []TSpan{ + { + UT: "/hello", + Uri: "/hello", + ElapsedTime: 32, + }, + { + UT: "/hello", + Uri: "/hello", + ElapsedTime: 320, + }, + { + UT: "/hello", + Uri: "/hello", + ElapsedTime: 3200, + }, + { + UT: "/hello_exp", + Uri: "/hello", + ElapsedTime: 32000, + ExceptionInfo: "exp", + }, + } + + ut := CreateUrlTemplateReport() + for _, span := range spans { + ut.Interceptor(&span) + } + + if len(ut.uriMap) < 2 { + t.Log(len(ut.uriMap)) + } + + pbStatMessage := ut.MoveUtReprot() + t.Log(pbStatMessage) + assert.NotEqual(t, pbStatMessage.GetAgentUriStat(), nil, "GetAgentUriStat") + + pbUriStat := pbStatMessage.GetAgentUriStat() + + assert.Equal(t, pbUriStat.GetBucketVersion(), int32(0), "GetBucketVersion") + + eachUriStat := pbUriStat.GetEachUriStat() + + assert.Equal(t, len(eachUriStat), 2, "len(eachUriStat)") + + assert.NotEqual(t, eachUriStat[0].GetFailedHistogram(), nil, "GetFailedHistogram") + assert.NotEqual(t, eachUriStat[0].GetTotalHistogram(), nil, "GetTotalHistogram") + totalHis := eachUriStat[0].GetTotalHistogram() + assert.Equal(t, len(totalHis.GetHistogram()), histogramSize, "len(totalHis.GetHistogram())") + + assert.Equal(t, totalHis.Max, int64(3200), "totalHis.Max") + +} diff --git a/collector-agent/go.mod b/collector-agent/go.mod index 14bc67460..a5c339283 100644 --- a/collector-agent/go.mod +++ b/collector-agent/go.mod @@ -10,6 +10,7 @@ require ( github.com/shirou/gopsutil v3.21.2+incompatible github.com/shirou/gopsutil/v3 v3.21.2 github.com/sirupsen/logrus v1.8.1 + github.com/stretchr/testify v1.6.1 github.com/x-cray/logrus-prefixed-formatter v0.5.2 google.golang.org/genproto v0.0.0-20200806141610-86f49bd18e98 // indirect google.golang.org/grpc v1.36.0 diff --git a/collector-agent/start-collector-agent.sh b/collector-agent/start-collector-agent.sh index 1473b1326..c9597e6ce 100644 --- a/collector-agent/start-collector-agent.sh +++ b/collector-agent/start-collector-agent.sh @@ -9,4 +9,6 @@ export PP_COLLECTOR_AGENT_ISDOCKER=false # export PP_LOG_DIR=/tmp/ export PP_Log_Level=DEBUG export PP_ADDRESS=0.0.0.0@9999 +export GO_PATH=~/go +export PATH=$PATH:$GO_PATH/bin make && ./collector-agent \ No newline at end of file diff --git a/src/CPP/pinpoint_define.h b/src/CPP/pinpoint_define.h index 0f1644576..1d0258cfd 100644 --- a/src/CPP/pinpoint_define.h +++ b/src/CPP/pinpoint_define.h @@ -1,12 +1,12 @@ //////////////////////////////////////////////////////////////////////////////// // Copyright 2020 NAVER Corp -// +// // Licensed under the Apache License, Version 2.0 (the "License"); you may not // use this file except in compliance with the License. You may obtain a copy // of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -14,64 +14,65 @@ // the License. //////////////////////////////////////////////////////////////////////////////// -const static char* PP_PHP_ARGS ="-1"; +const static char* PP_PHP_ARGS = "-1"; const static int PP_PHP_RETURN = 14; -const static int PP_PROXY_HTTP_HEADER= 300; -const static int PP_SQL_ID=20; -const static int PP_SQL= 21; -const static int PP_SQL_METADATA= 22; -const static int PP_SQL_PARAM= 23; -const static int PP_SQL_BINDVALUE= 24; -const static int PP_STRING_ID= 30; -const static char* PP_HTTP_URL= "40"; -const static int PP_HTTP_PARAM= 41; -const static int PP_HTTP_PARAM_ENTITY= 42; -const static int PP_HTTP_COOKIE= 45; -const static char* PP_HTTP_STATUS_CODE= "46"; -const static int PP_HTTP_INTERNAL_DISPLAY= 48; -const static int PP_HTTP_IO= 49; -const static int PP_MESSAGE_QUEUE_URI =100; -const static int PP_KAFKA_TOPIC=140; +const static int PP_PROXY_HTTP_HEADER = 300; +const static int PP_SQL_ID = 20; +const static int PP_SQL = 21; +const static int PP_SQL_METADATA = 22; +const static int PP_SQL_PARAM = 23; +const static int PP_SQL_BINDVALUE = 24; +const static int PP_STRING_ID = 30; +const static char* PP_HTTP_URL = "40"; +const static int PP_HTTP_PARAM = 41; +const static int PP_HTTP_PARAM_ENTITY = 42; +const static int PP_HTTP_COOKIE = 45; +const static char* PP_HTTP_STATUS_CODE = "46"; +const static int PP_HTTP_INTERNAL_DISPLAY = 48; +const static int PP_HTTP_IO = 49; +const static int PP_MESSAGE_QUEUE_URI = 100; +const static int PP_KAFKA_TOPIC = 140; -const static char* PP_SQL_FORMAT="SQL"; -const static char* PP_C_CPP="1300"; -const static char* PP_C_CPP_METHOD="1301"; -const static char* PP_MYSQL="2101"; -const static char* PP_KAFKA="8660"; -const static char* PP_REDIS="8200"; -const static char* PP_REDIS_REDISSON="8203"; -const static char* PP_REDIS_REDISSON_INTERNAL="8204"; -const static char* PP_MEMCACHED="8050"; -const static char* PP_C_CPP_REMOTE_METHOD="9800"; +const static char* PP_SQL_FORMAT = "SQL"; +const static char* PP_C_CPP = "1300"; +const static char* PP_C_CPP_METHOD = "1301"; +const static char* PP_MYSQL = "2101"; +const static char* PP_KAFKA = "8660"; +const static char* PP_REDIS = "8200"; +const static char* PP_REDIS_REDISSON = "8203"; +const static char* PP_REDIS_REDISSON_INTERNAL = "8204"; +const static char* PP_MEMCACHED = "8050"; +const static char* PP_C_CPP_REMOTE_METHOD = "9800"; -const static char* PP_HEADER_NGINX_PROXY="Pinpoint-ProxyNginx"; -const static char* PP_HEADER_APACHE_PROXY="HTTP_PINPOINT_PROXYAPACHE"; -const static char* PP_HEADER_SAMPLED="HTTP_PINPOINT_SAMPLED"; -const static char* PP_HEADER_PINPOINT_HOST="HTTP_PINPOINT_HOST"; -const static char* PP_HEADER_PAPPTYPE="HTTP_PINPOINT_PAPPTYPE"; -const static char* PP_HEADER_PAPPNAME="HTTP_PINPOINT_PAPPNAME"; -const static char* PP_HEADER_TRACEID="HTTP_PINPOINT_TRACEID"; -const static char* PP_HEADER_SPANID="HTTP_PINPOINT_SPANID"; -const static char* PP_HEADER_PSPANID="HTTP_PINPOINT_PSPANID"; +const static char* PP_HEADER_NGINX_PROXY = "Pinpoint-ProxyNginx"; +const static char* PP_HEADER_APACHE_PROXY = "HTTP_PINPOINT_PROXYAPACHE"; +const static char* PP_HEADER_SAMPLED = "HTTP_PINPOINT_SAMPLED"; +const static char* PP_HEADER_PINPOINT_HOST = "HTTP_PINPOINT_HOST"; +const static char* PP_HEADER_PAPPTYPE = "HTTP_PINPOINT_PAPPTYPE"; +const static char* PP_HEADER_PAPPNAME = "HTTP_PINPOINT_PAPPNAME"; +const static char* PP_HEADER_TRACEID = "HTTP_PINPOINT_TRACEID"; +const static char* PP_HEADER_SPANID = "HTTP_PINPOINT_SPANID"; +const static char* PP_HEADER_PSPANID = "HTTP_PINPOINT_PSPANID"; -const static char* PP_SERVER_TYPE="stp"; -const static char* PP_NEXT_SPAN_ID="nsid"; -const static char* PP_DESTINATION="dst"; -const static char* PP_INTERCEPTOR_NAME="name"; -const static char* PP_ADD_EXCEPTION="EXP"; -const static int PP_ROOT_LOC=1; -const static char* PP_REQ_URI="uri"; -const static char* PP_REQ_CLIENT="client"; -const static char* PP_REQ_SERVER="server"; -const static char* PP_APP_NAME="appname"; -const static char* PP_APP_ID="appid"; -const static char* PP_PARENT_SPAN_ID="psid"; -const static char* PP_PARENT_NAME="pname"; -const static char* PP_PARENT_TYPE="ptype"; -const static char* PP_PARENT_HOST="Ah"; -const static char* PP_NGINX_PROXY="NP"; -const static char* PP_APACHE_PROXY="AP"; -const static char* PP_TRANSCATION_ID="tid"; -const static char* PP_SPAN_ID="sid"; -const static char* PP_NOT_SAMPLED="s0"; +const static char* PP_SERVER_TYPE = "stp"; +const static char* PP_NEXT_SPAN_ID = "nsid"; +const static char* PP_DESTINATION = "dst"; +const static char* PP_INTERCEPTOR_NAME = "name"; +const static char* PP_ADD_EXCEPTION = "EXP"; +const static int PP_ROOT_LOC = 1; +const static char* PP_REQ_URI = "uri"; +const static char* PP_REQ_CLIENT = "client"; +const static char* PP_REQ_SERVER = "server"; +const static char* PP_APP_NAME = "appname"; +const static char* PP_APP_ID = "appid"; +const static char* PP_UT = "UT"; +const static char* PP_PARENT_SPAN_ID = "psid"; +const static char* PP_PARENT_NAME = "pname"; +const static char* PP_PARENT_TYPE = "ptype"; +const static char* PP_PARENT_HOST = "Ah"; +const static char* PP_NGINX_PROXY = "NP"; +const static char* PP_APACHE_PROXY = "AP"; +const static char* PP_TRANSCATION_ID = "tid"; +const static char* PP_SPAN_ID = "sid"; +const static char* PP_NOT_SAMPLED = "s0"; const static char* PP_SAMPLED = "s1"; diff --git a/src/CPP/test_pinpoint.c b/src/CPP/test_pinpoint.c index d568a119f..3b9df7140 100644 --- a/src/CPP/test_pinpoint.c +++ b/src/CPP/test_pinpoint.c @@ -39,9 +39,8 @@ char* get_tid() { } void random_sleep() { - // int32_t delay = rand()%10; - // usleep(delay* 1000); - // sleep(3); + int32_t delay = rand() % 10; + usleep(delay * 10000); } void test_httpclient() { @@ -89,7 +88,9 @@ void test_req() { pinpoint_add_clue(id, PP_INTERCEPTOR_NAME, "C_CPP Request", E_LOC_CURRENT); pinpoint_add_clue(id, PP_APP_NAME, app_name, E_LOC_CURRENT); pinpoint_add_clue(id, PP_APP_ID, app_id, E_LOC_CURRENT); - + char ut[64] = {0}; + snprintf(ut, 64, "/user/?/add/%d", rand() % 10); + pinpoint_add_clue(id, PP_UT, ut, E_LOC_CURRENT); random_sleep(); test_func(); @@ -107,7 +108,7 @@ void test_req() { } int main(int argc, char const* argv[]) { - PPAgentT agent_info = {"tcp:127.0.0.1:9999", 1000, -1, 1700, 1, NULL, NULL, NULL}; + PPAgentT agent_info = {"tcp:127.0.0.1:9999", 1000, -1, 1300, 1, NULL, NULL, NULL}; global_agent_info = agent_info; char appid[] = "cd.dev.test"; char appname[] = "cd.dev.test"; @@ -121,7 +122,6 @@ int main(int argc, char const* argv[]) { int i = 0; for (; i < 10; i++) { test_req(); - sleep(1); } return 0; } diff --git a/src/CPP/test_pinpoint.cpp b/src/CPP/test_pinpoint.cpp index 5d0bff812..95aba3b49 100644 --- a/src/CPP/test_pinpoint.cpp +++ b/src/CPP/test_pinpoint.cpp @@ -35,8 +35,8 @@ std::string get_tid() { } void random_sleep() { - int32_t delay = rand() % 10; - usleep(delay * 1000); + int32_t delay = rand() % 100; + usleep(delay * 10000); } void test_httpclient() { @@ -106,13 +106,12 @@ void test_req() { } int main(int argc, char const* argv[]) { - PPAgentT agent_info = {"tcp:127.0.0.1:9999", 1000, -1, 1700, 0, NULL, NULL, NULL}; + PPAgentT agent_info = {"tcp:127.0.0.1:9999", 1000, -1, 1300, 0, NULL, NULL, NULL}; global_agent_info = agent_info; srand(time(nullptr)); int i = 0; for (; i < 3; i++) { test_req(); - sleep(1); } return 0; }