Skip to content

Commit

Permalink
Merge pull request #3189 from ipfs/feat/metrics/interface
Browse files Browse the repository at this point in the history
metrics: introduce go-metrics-interface
  • Loading branch information
whyrusleeping committed Sep 20, 2016
2 parents 3857ded + 9cbeffa commit f23cd5c
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 26 deletions.
16 changes: 13 additions & 3 deletions blocks/blockstore/arc_cache.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package blockstore

import (
"github.com/ipfs/go-ipfs/blocks"
key "gx/ipfs/Qmce4Y4zg3sYr7xKM5UueS67vhNni6EeWgCRnb7MbLJMew/go-key"

"github.com/ipfs/go-ipfs/blocks"

"gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore"
Expand All @@ -12,15 +14,21 @@ import (
type arccache struct {
arc *lru.ARCCache
blockstore Blockstore

hits metrics.Counter
total metrics.Counter
}

func arcCached(bs Blockstore, lruSize int) (*arccache, error) {
func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache, error) {
arc, err := lru.NewARC(lruSize)
if err != nil {
return nil, err
}
c := &arccache{arc: arc, blockstore: bs}
c.hits = metrics.NewCtx(ctx, "arc.hits_total", "Number of ARC cache hits").Counter()
c.total = metrics.NewCtx(ctx, "arc_total", "Total number of ARC cache requests").Counter()

return &arccache{arc: arc, blockstore: bs}, nil
return c, nil
}

func (b *arccache) DeleteBlock(k key.Key) error {
Expand All @@ -42,6 +50,7 @@ func (b *arccache) DeleteBlock(k key.Key) error {
// if ok == false has is inconclusive
// if ok == true then has respons to question: is it contained
func (b *arccache) hasCached(k key.Key) (has bool, ok bool) {
b.total.Inc()
if k == "" {
// Return cache invalid so the call to blockstore happens
// in case of invalid key and correct error is created.
Expand All @@ -50,6 +59,7 @@ func (b *arccache) hasCached(k key.Key) (has bool, ok bool) {

h, ok := b.arc.Get(k)
if ok {
b.hits.Inc()
return h.(bool), true
}
return false, false
Expand Down
2 changes: 1 addition & 1 deletion blocks/blockstore/arc_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestGetAndDeleteFalseShortCircuit(t *testing.T) {
}

func TestArcCreationFailure(t *testing.T) {
if arc, err := arcCached(nil, -1); arc != nil || err == nil {
if arc, err := newARCCachedBS(context.TODO(), nil, -1); arc != nil || err == nil {
t.Fatal("expected error and no cache")
}
}
Expand Down
34 changes: 30 additions & 4 deletions blocks/blockstore/bloom_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package blockstore

import (
"sync/atomic"
"time"

"github.com/ipfs/go-ipfs/blocks"
key "gx/ipfs/Qmce4Y4zg3sYr7xKM5UueS67vhNni6EeWgCRnb7MbLJMew/go-key"

bloom "gx/ipfs/QmWQ2SJisXwcCLsUXLwYCKSfyExXjFRW2WbBH5sqCUnwX5/bbloom"
"gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
bloom "gx/ipfs/QmeiMCBkYHxkDkDfnDadzz4YxY5ruL5Pj499essE4vRsGM/bbloom"
)

// bloomCached returns Blockstore that caches Has requests using Bloom filter
Expand All @@ -18,9 +20,31 @@ func bloomCached(bs Blockstore, ctx context.Context, bloomSize, hashCount int) (
return nil, err
}
bc := &bloomcache{blockstore: bs, bloom: bl}
bc.hits = metrics.NewCtx(ctx, "bloom.hits_total",
"Number of cache hits in bloom cache").Counter()
bc.total = metrics.NewCtx(ctx, "bloom_total",
"Total number of requests to bloom cache").Counter()

bc.Invalidate()
go bc.Rebuild(ctx)

if metrics.Active() {
go func() {
fill := metrics.NewCtx(ctx, "bloom_fill_ratio",
"Ratio of bloom filter fullnes, (updated once a minute)").Gauge()

<-bc.rebuildChan
t := time.NewTicker(1 * time.Minute)
for {
select {
case <-ctx.Done():
t.Stop()
return
case <-t.C:
fill.Set(bc.bloom.FillRatio())
}
}
}()
}
return bc, nil
}

Expand All @@ -33,8 +57,8 @@ type bloomcache struct {
blockstore Blockstore

// Statistics
hits uint64
misses uint64
hits metrics.Counter
total metrics.Counter
}

func (b *bloomcache) Invalidate() {
Expand Down Expand Up @@ -84,6 +108,7 @@ func (b *bloomcache) DeleteBlock(k key.Key) error {
// if ok == false has is inconclusive
// if ok == true then has respons to question: is it contained
func (b *bloomcache) hasCached(k key.Key) (has bool, ok bool) {
b.total.Inc()
if k == "" {
// Return cache invalid so call to blockstore
// in case of invalid key is forwarded deeper
Expand All @@ -92,6 +117,7 @@ func (b *bloomcache) hasCached(k key.Key) (has bool, ok bool) {
if b.BloomActive() {
blr := b.bloom.HasTS([]byte(k))
if blr == false { // not contained in bloom is only conclusive answer bloom gives
b.hits.Inc()
return false, true
}
}
Expand Down
10 changes: 7 additions & 3 deletions blocks/blockstore/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package blockstore
import (
"errors"

"gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
)

Expand Down Expand Up @@ -33,12 +34,15 @@ func CachedBlockstore(bs GCBlockstore,
if opts.HasBloomFilterSize != 0 && opts.HasBloomFilterHashes == 0 {
return nil, errors.New("bloom filter hash count can't be 0 when there is size set")
}

ctx = metrics.CtxSubScope(ctx, "bs.cache")

if opts.HasARCCacheSize > 0 {
cbs, err = newARCCachedBS(ctx, cbs, opts.HasARCCacheSize)
}
if opts.HasBloomFilterSize != 0 {
cbs, err = bloomCached(cbs, ctx, opts.HasBloomFilterSize, opts.HasBloomFilterHashes)
}
if opts.HasARCCacheSize > 0 {
cbs, err = arcCached(cbs, opts.HasARCCacheSize)
}

return cbs, err
}
21 changes: 13 additions & 8 deletions cmd/ipfs/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@ import (
"sort"
"sync"

"gx/ipfs/QmPpRcbNUXauP3zWZ1NJMLWpe4QnmEHrd2ba2D3yqWznw7/go-multiaddr-net"
_ "gx/ipfs/QmV3NSS3A1kX5s28r7yLczhDsXzkgo65cqRgKFXYunWZmD/metrics/runtime"

ma "gx/ipfs/QmYzDkkgAEmrcNzFCiYo6L1dTX4EAG1gZkbtdbd9trL4vd/go-multiaddr"

cmds "github.com/ipfs/go-ipfs/commands"
"github.com/ipfs/go-ipfs/core"
commands "github.com/ipfs/go-ipfs/core/commands"
Expand All @@ -25,10 +20,17 @@ import (
nodeMount "github.com/ipfs/go-ipfs/fuse/node"
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
migrate "github.com/ipfs/go-ipfs/repo/fsrepo/migrations"

"gx/ipfs/QmPpRcbNUXauP3zWZ1NJMLWpe4QnmEHrd2ba2D3yqWznw7/go-multiaddr-net"
"gx/ipfs/QmR3KwhXCRLTNZB59vELb2HhEWrGy9nuychepxFtj3wWYa/client_golang/prometheus"

conn "gx/ipfs/QmUuwQUJmtvC6ReYcu7xaYKEUM3pD46H18dFn3LBhVt2Di/go-libp2p/p2p/net/conn"
mprome "gx/ipfs/QmXWro6iddJRbGWUoZDpTu6tjo5EXX4xJHHR9VczeoGZbw/go-metrics-prometheus"
ma "gx/ipfs/QmYzDkkgAEmrcNzFCiYo6L1dTX4EAG1gZkbtdbd9trL4vd/go-multiaddr"
util "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util"
pstore "gx/ipfs/QmdMfSLMDBDYhtc4oF3NYGCZr5dy4wQb6Ji26N4D4mdxa2/go-libp2p-peerstore"
prometheus "gx/ipfs/QmdhsRK1EK2fvAz2i2SH5DEfkL6seDuyMYEsxKa9Braim3/client_golang/prometheus"

_ "gx/ipfs/QmV3NSS3A1kX5s28r7yLczhDsXzkgo65cqRgKFXYunWZmD/metrics/runtime"
)

const (
Expand Down Expand Up @@ -359,8 +361,11 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
}

// initialize metrics collector
prometheus.MustRegisterOrGet(&corehttp.IpfsNodeCollector{Node: node})
prometheus.EnableCollectChecks(true)
err = mprome.Inject()
if err != nil {
log.Warningf("Injecting prometheus handler for metrics failed with message: %s\n", err.Error())
}
prometheus.MustRegister(&corehttp.IpfsNodeCollector{Node: node})

fmt.Printf("Daemon is ready\n")
// collect long-running errors and block for shutdown
Expand Down
4 changes: 3 additions & 1 deletion core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ import (
pin "github.com/ipfs/go-ipfs/pin"
repo "github.com/ipfs/go-ipfs/repo"
cfg "github.com/ipfs/go-ipfs/repo/config"
key "gx/ipfs/Qmce4Y4zg3sYr7xKM5UueS67vhNni6EeWgCRnb7MbLJMew/go-key"

retry "gx/ipfs/QmPF5kxTYFkzhaY5LmkExood7aTTZBHWQC6cjdDQBuGrjp/retry-datastore"
metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
goprocessctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
ci "gx/ipfs/QmVoi5es8D5fNHZDqoW6DgDAEPEV5hQp8GBz161vZXiwpQ/go-libp2p-crypto"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore"
dsync "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/sync"
key "gx/ipfs/Qmce4Y4zg3sYr7xKM5UueS67vhNni6EeWgCRnb7MbLJMew/go-key"
pstore "gx/ipfs/QmdMfSLMDBDYhtc4oF3NYGCZr5dy4wQb6Ji26N4D4mdxa2/go-libp2p-peerstore"
)

Expand Down Expand Up @@ -109,6 +110,7 @@ func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) {
if err != nil {
return nil, err
}
ctx = metrics.CtxScope(ctx, "ipfs")

n := &IpfsNode{
mode: offlineMode,
Expand Down
7 changes: 5 additions & 2 deletions core/corehttp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"net"
"net/http"

prometheus "gx/ipfs/QmdhsRK1EK2fvAz2i2SH5DEfkL6seDuyMYEsxKa9Braim3/client_golang/prometheus"

core "github.com/ipfs/go-ipfs/core"

prometheus "gx/ipfs/QmR3KwhXCRLTNZB59vELb2HhEWrGy9nuychepxFtj3wWYa/client_golang/prometheus"
)

// This adds the scraping endpoint which Prometheus uses to fetch metrics.
Expand Down Expand Up @@ -53,6 +53,9 @@ func (c IpfsNodeCollector) Collect(ch chan<- prometheus.Metric) {

func (c IpfsNodeCollector) PeersTotalValues() map[string]float64 {
vals := make(map[string]float64)
if c.Node.PeerHost == nil {
return vals
}
for _, conn := range c.Node.PeerHost.Network().Conns() {
tr := ""
for _, proto := range conn.RemoteMultiaddr().Protocols() {
Expand Down
20 changes: 16 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@
"version": "0.0.0"
},
{
"hash": "QmdhsRK1EK2fvAz2i2SH5DEfkL6seDuyMYEsxKa9Braim3",
"hash": "QmR3KwhXCRLTNZB59vELb2HhEWrGy9nuychepxFtj3wWYa",
"name": "client_golang",
"version": "0.0.0"
"version": "0.1.0"
},
{
"hash": "Qma1FrGRasghpuETfCtsKdFtXKQffpNnakv3wG3QaMwCVi",
Expand Down Expand Up @@ -170,9 +170,9 @@
},
{
"author": "kubuxu",
"hash": "QmWQ2SJisXwcCLsUXLwYCKSfyExXjFRW2WbBH5sqCUnwX5",
"hash": "QmeiMCBkYHxkDkDfnDadzz4YxY5ruL5Pj499essE4vRsGM",
"name": "bbloom",
"version": "0.0.2"
"version": "0.1.0"
},
{
"author": "whyrusleeping",
Expand Down Expand Up @@ -239,6 +239,18 @@
"hash": "QmYrv4LgCC8FhG2Ab4bwuq5DqBdwMtx3hMb3KKJDZcr2d7",
"name": "go-libp2p-loggables",
"version": "1.0.11"
},
{
"author": "ipfs",
"hash": "QmXWro6iddJRbGWUoZDpTu6tjo5EXX4xJHHR9VczeoGZbw",
"name": "go-metrics-prometheus",
"version": "0.3.0"
},
{
"author": "ipfs",
"hash": "QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5",
"name": "go-metrics-interface",
"version": "0.1.2"
}
],
"gxVersion": "0.4.0",
Expand Down

0 comments on commit f23cd5c

Please sign in to comment.