From 9f9c213cd978deddae7db564740e745906b37fc7 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Fri, 12 Nov 2021 00:37:07 +0530 Subject: [PATCH] feat(telemetry): send txpool.import telemetry msg (#1966) From this commit, a telemetry message of type `txpool.import` will be sent when a new transaction is imported in the transaction pool. Added tests for `txpool.import` and improved TestHandler_SendMulti test. --- dot/state/block_finalisation.go | 2 +- dot/state/transaction.go | 13 ++- dot/telemetry/prepared_block_for_proposing.go | 5 +- dot/telemetry/telemetry.go | 12 +- dot/telemetry/telemetry_test.go | 103 ++++++++---------- dot/telemetry/txpool_import.go | 36 ++++++ 6 files changed, 105 insertions(+), 66 deletions(-) create mode 100644 dot/telemetry/txpool_import.go diff --git a/dot/state/block_finalisation.go b/dot/state/block_finalisation.go index e4e948e327..0db98dfc6e 100644 --- a/dot/state/block_finalisation.go +++ b/dot/state/block_finalisation.go @@ -191,7 +191,7 @@ func (bs *BlockState) SetFinalisedHash(hash common.Hash, round, setID uint64) er ), ) if err != nil { - return fmt.Errorf("could not send 'notify.finalized' telemetry message, error: %s", err) + logger.Debugf("could not send 'notify.finalized' telemetry message, error: %s", err) } bs.lastFinalised = hash diff --git a/dot/state/transaction.go b/dot/state/transaction.go index 73e0d6e0c0..6d2589cd6e 100644 --- a/dot/state/transaction.go +++ b/dot/state/transaction.go @@ -3,6 +3,8 @@ package state import ( "sync" + "github.com/ChainSafe/gossamer/dot/telemetry" + "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/transaction" @@ -68,7 +70,16 @@ func (s *TransactionState) RemoveExtrinsicFromPool(ext types.Extrinsic) { // AddToPool adds a transaction to the pool func (s *TransactionState) AddToPool(vt *transaction.ValidTransaction) common.Hash { s.notifyStatus(vt.Extrinsic, transaction.Future) - return s.pool.Insert(vt) + + hash := s.pool.Insert(vt) + + if err := telemetry.GetInstance().SendMessage( + telemetry.NewTxpoolImportTM(uint(s.queue.Len()), uint(s.pool.Len())), + ); err != nil { + logger.Debugf("problem sending txpool.import telemetry message, error: %s", err) + } + + return hash } // GetStatusNotifierChannel creates and returns a status notifier channel. diff --git a/dot/telemetry/prepared_block_for_proposing.go b/dot/telemetry/prepared_block_for_proposing.go index c74cd00932..b1a265124c 100644 --- a/dot/telemetry/prepared_block_for_proposing.go +++ b/dot/telemetry/prepared_block_for_proposing.go @@ -34,10 +34,9 @@ func NewPreparedBlockForProposingTM(hash common.Hash, number string) Message { return &preparedBlockForProposingTM{ Hash: hash, Number: number, - Msg: "prepared_block_for_proposing", } } -func (tm *preparedBlockForProposingTM) messageType() string { - return tm.Msg +func (preparedBlockForProposingTM) messageType() string { + return preparedBlockForProposingMsg } diff --git a/dot/telemetry/telemetry.go b/dot/telemetry/telemetry.go index 1d52e09d2b..67dfed4074 100644 --- a/dot/telemetry/telemetry.go +++ b/dot/telemetry/telemetry.go @@ -29,11 +29,13 @@ import ( // telemetry message types const ( - notifyFinalizedMsg = "notify.finalized" - blockImportMsg = "block.import" - systemNetworkStateMsg = "system.network_state" - systemConnectedMsg = "system.connected" - systemIntervalMsg = "system.interval" + systemConnectedMsg = "system.connected" + systemIntervalMsg = "system.interval" + systemNetworkStateMsg = "system.network_state" + blockImportMsg = "block.import" + notifyFinalizedMsg = "notify.finalized" + txPoolImportMsg = "txpool.import" + preparedBlockForProposingMsg = "prepared_block_for_proposing" ) type telemetryConnection struct { diff --git a/dot/telemetry/telemetry_test.go b/dot/telemetry/telemetry_test.go index 898f7127c5..2ab312e652 100644 --- a/dot/telemetry/telemetry_test.go +++ b/dot/telemetry/telemetry_test.go @@ -42,75 +42,66 @@ func TestMain(m *testing.M) { } func TestHandler_SendMulti(t *testing.T) { - var wg sync.WaitGroup - wg.Add(6) - - resultCh = make(chan []byte) - - go func() { - genesisHash := common.MustHexToHash("0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3") - - GetInstance().SendMessage(NewSystemConnectedTM(false, "chain", &genesisHash, - "systemName", "nodeName", "netID", "startTime", "0.1")) - - wg.Done() - }() - - go func() { - bh := common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6") - GetInstance().SendMessage(NewBlockImportTM(&bh, big.NewInt(2), "NetworkInitialSync")) - - wg.Done() - }() - - go func() { - GetInstance().SendMessage(NewBandwidthTM(2, 3, 1)) - - wg.Done() - }() - - go func() { - bestHash := common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6") - finalisedHash := common.MustHexToHash("0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2") - GetInstance().SendMessage(NewBlockIntervalTM(&bestHash, big.NewInt(32375), &finalisedHash, - big.NewInt(32256), big.NewInt(0), big.NewInt(1234))) - - wg.Done() - }() - - go func() { - bestHash := common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6") - GetInstance().SendMessage(NewNotifyFinalizedTM(bestHash, "32375")) + expected := [][]byte{ + []byte(`{"authority":false,"chain":"chain","genesis_hash":"0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3","implementation":"systemName","msg":"system.connected","name":"nodeName","network_id":"netID","startup_time":"startTime","ts":`), + []byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","height":2,"msg":"block.import","origin":"NetworkInitialSync","ts":`), + []byte(`{"bandwidth_download":2,"bandwidth_upload":3,"msg":"system.interval","peers":1,"ts":`), + []byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","finalized_hash":"0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2","finalized_height":32256,"height":32375,"msg":"system.interval","ts":`), //nolint + []byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","height":"32375","msg":"notify.finalized","ts":`), + []byte(`{"hash":"0x5814aec3e28527f81f65841e034872f3a30337cf6c33b2d258bba6071e37e27c","msg":"prepared_block_for_proposing","number":"1","ts":`), + []byte(`{"future":2,"msg":"txpool.import","ready":1,"ts":`), + } - wg.Done() - }() + messages := []Message{ + NewBandwidthTM(2, 3, 1), + NewTxpoolImportTM(1, 2), + + func(genesisHash common.Hash) Message { + return NewSystemConnectedTM(false, "chain", &genesisHash, + "systemName", "nodeName", "netID", "startTime", "0.1") + }(common.MustHexToHash("0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3")), + + func(bh common.Hash) Message { + return NewBlockImportTM(&bh, big.NewInt(2), "NetworkInitialSync") + }(common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6")), + + func(bestHash, finalisedHash common.Hash) Message { + return NewBlockIntervalTM(&bestHash, big.NewInt(32375), &finalisedHash, + big.NewInt(32256), big.NewInt(0), big.NewInt(1234)) + }( + common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6"), + common.MustHexToHash("0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2"), + ), + + NewNotifyFinalizedTM(common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6"), "32375"), + NewPreparedBlockForProposingTM(common.MustHexToHash("0x5814aec3e28527f81f65841e034872f3a30337cf6c33b2d258bba6071e37e27c"), "1"), + } - go func() { - bestHash := common.MustHexToHash("0x5814aec3e28527f81f65841e034872f3a30337cf6c33b2d258bba6071e37e27c") - GetInstance().SendMessage(NewPreparedBlockForProposingTM(bestHash, "1")) + resultCh = make(chan []byte) - wg.Done() - }() + var wg sync.WaitGroup + for _, message := range messages { + wg.Add(1) + go func(msg Message) { + GetInstance().SendMessage(msg) + wg.Done() + }(message) + } wg.Wait() - expected1 := []byte(`{"authority":false,"chain":"chain","genesis_hash":"0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3","implementation":"systemName","msg":"system.connected","name":"nodeName","network_id":"netID","startup_time":"startTime","ts":`) - expected2 := []byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","height":2,"msg":"block.import","origin":"NetworkInitialSync","ts":`) - expected3 := []byte(`{"bandwidth_download":2,"bandwidth_upload":3,"msg":"system.interval","peers":1,"ts":`) - expected4 := []byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","finalized_hash":"0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2","finalized_height":32256,"height":32375,"msg":"system.interval","ts":`) // nolint - expected5 := []byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","height":"32375","msg":"notify.finalized","ts":`) - expected6 := []byte(`{"hash":"0x5814aec3e28527f81f65841e034872f3a30337cf6c33b2d258bba6071e37e27c","msg":"prepared_block_for_proposing","number":"1","ts":`) - - expected := [][]byte{expected1, expected3, expected4, expected5, expected2, expected6} - var actual [][]byte for data := range resultCh { actual = append(actual, data) - if len(actual) == 6 { + if len(actual) == len(expected) { break } } + sort.Slice(expected, func(i, j int) bool { + return bytes.Compare(expected[i], expected[j]) < 0 + }) + sort.Slice(actual, func(i, j int) bool { return bytes.Compare(actual[i], actual[j]) < 0 }) diff --git a/dot/telemetry/txpool_import.go b/dot/telemetry/txpool_import.go new file mode 100644 index 0000000000..eb5abec889 --- /dev/null +++ b/dot/telemetry/txpool_import.go @@ -0,0 +1,36 @@ +// Copyright 2021 ChainSafe Systems (ON) Corp. +// This file is part of gossamer. +// +// The gossamer library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The gossamer library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the gossamer library. If not, see . + +package telemetry + +// txpoolImportTM holds `txpool.import` telemetry message, which is supposed to be +// sent when a new transaction gets imported in the transaction pool. +type txpoolImportTM struct { + Ready uint `json:"ready"` + Future uint `json:"future"` +} + +// NewTxpoolImportTM creates a new txpoolImportTM struct +func NewTxpoolImportTM(ready, future uint) Message { + return &txpoolImportTM{ + Ready: ready, + Future: future, + } +} + +func (txpoolImportTM) messageType() string { + return txPoolImportMsg +}