Skip to content

Commit

Permalink
metrics,storage,sqlite: add cache metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
n8maninger committed Jun 19, 2023
1 parent f1a9424 commit 8444c26
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 10 deletions.
3 changes: 3 additions & 0 deletions host/metrics/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ type (

Reads uint64 `json:"reads"`
Writes uint64 `json:"writes"`

SectorCacheHits uint64 `json:"sectorCacheHits"`
SectorCacheMisses uint64 `json:"sectorCacheMisses"`
}

// RevenueMetrics is a collection of metrics related to revenue.
Expand Down
4 changes: 2 additions & 2 deletions host/storage/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ type (
// ExpireTempSectors removes all temporary sectors that expired before
// the given height.
ExpireTempSectors(height uint64) error
// IncrementSectorAccess increments the read and write access counters
IncrementSectorAccess(reads, writes uint64) error
// IncrementSectorStats increments sector stats
IncrementSectorStats(reads, writes, cacheHit, cacheMiss uint64) error
}
)

Expand Down
19 changes: 18 additions & 1 deletion host/storage/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,27 @@ type (
mu sync.Mutex
r uint64
w uint64

cacheHit uint64
cacheMiss uint64
}
)

// Flush persists the number of sectors read and written.
func (sr *sectorAccessRecorder) Flush() {
sr.mu.Lock()
r, w := sr.r, sr.w
cacheHit, cacheMiss := sr.cacheHit, sr.cacheMiss
sr.r, sr.w = 0, 0
sr.cacheHit, sr.cacheMiss = 0, 0
sr.mu.Unlock()

// no need to persist if there is no change
if r == 0 && w == 0 {
return
}

if err := sr.store.IncrementSectorAccess(r, w); err != nil {
if err := sr.store.IncrementSectorStats(r, w, cacheHit, cacheMiss); err != nil {
sr.log.Error("failed to persist sector access", zap.Error(err))
return
}
Expand All @@ -52,6 +57,18 @@ func (sr *sectorAccessRecorder) AddWrite() {
sr.w++
}

func (sr *sectorAccessRecorder) AddCacheHit() {
sr.mu.Lock()
defer sr.mu.Unlock()
sr.cacheHit++
}

func (sr *sectorAccessRecorder) AddCacheMiss() {
sr.mu.Lock()
defer sr.mu.Unlock()
sr.cacheMiss++
}

// Run starts the recorder, flushing data at regular intervals.
func (sr *sectorAccessRecorder) Run(stop <-chan struct{}) {
t := time.NewTicker(flushInterval)
Expand Down
16 changes: 11 additions & 5 deletions host/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ type (
// changedVolumes tracks volumes that need to be fsynced
changedVolumes map[int]bool
cache *lru.Cache[types.Hash256, *[rhpv2.SectorSize]byte] // Added cache
cacheHits uint64 // Cache hit counter
cacheMisses uint64 // Cache miss counter
cacheHits uint64
cacheMisses uint64
}
)

Expand Down Expand Up @@ -847,6 +847,11 @@ func (vm *VolumeManager) LockSector(root types.Hash256) (func() error, error) {
return release, err
}

// CacheStats returns the number of cache hits and misses.
func (vm *VolumeManager) CacheStats() (hits, misses uint64) {
return atomic.LoadUint64(&vm.cacheHits), atomic.LoadUint64(&vm.cacheMisses)
}

// Read reads the sector with the given root
func (vm *VolumeManager) Read(root types.Hash256) (*[rhpv2.SectorSize]byte, error) {
done, err := vm.tg.Add()
Expand All @@ -857,7 +862,8 @@ func (vm *VolumeManager) Read(root types.Hash256) (*[rhpv2.SectorSize]byte, erro

// Check the cache first
if sector, ok := vm.cache.Get(root); ok {
atomic.AddUint64(&vm.cacheHits, 1) // Increment cache hit counter
vm.recorder.AddCacheHit()
atomic.AddUint64(&vm.cacheHits, 1)
return sector, nil
}

Expand All @@ -882,8 +888,8 @@ func (vm *VolumeManager) Read(root types.Hash256) (*[rhpv2.SectorSize]byte, erro

// Add sector to cache
vm.cache.Add(root, sector)
atomic.AddUint64(&vm.cacheMisses, 1) // Increment cache miss counter

vm.recorder.AddCacheMiss()
atomic.AddUint64(&vm.cacheMisses, 1)
vm.recorder.AddRead()
return sector, nil
}
Expand Down
136 changes: 136 additions & 0 deletions host/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,142 @@ func TestVolumeManagerReadWrite(t *testing.T) {
}
}

func TestSectorCache(t *testing.T) {
const sectors = 10
dir := t.TempDir()

// create the database
log := zaptest.NewLogger(t)
db, err := sqlite.OpenDatabase(filepath.Join(dir, "hostd.db"), log.Named("sqlite"))
if err != nil {
t.Fatal(err)
}
defer db.Close()

g, err := gateway.New(":0", false, filepath.Join(dir, "gateway"))
if err != nil {
t.Fatal(err)
}
defer g.Close()

cs, errCh := consensus.New(g, false, filepath.Join(dir, "consensus"))
select {
case err := <-errCh:
if err != nil {
t.Fatal(err)
}
default:
}
cm, err := chain.NewManager(cs)
if err != nil {
t.Fatal(err)
}
defer cm.Close()

// initialize the storage manager
am := alerts.NewManager()
vm, err := storage.NewVolumeManager(db, am, cm, log.Named("volumes"), sectors/2) // cache half the sectors
if err != nil {
t.Fatal(err)
}
defer vm.Close()

result := make(chan error, 1)
volumeFilePath := filepath.Join(t.TempDir(), "hostdata.dat")
vol, err := vm.AddVolume(volumeFilePath, sectors, result)
if err != nil {
t.Fatal(err)
} else if err := <-result; err != nil {
t.Fatal(err)
}

volume, err := vm.Volume(vol.ID)
if err != nil {
t.Fatal(err)
}

if err := checkFileSize(volumeFilePath, int64(sectors*rhpv2.SectorSize)); err != nil {
t.Fatal(err)
} else if volume.TotalSectors != sectors {
t.Fatalf("expected %v total sectors, got %v", sectors, volume.TotalSectors)
} else if volume.UsedSectors != 0 {
t.Fatalf("expected 0 used sectors, got %v", volume.UsedSectors)
}

roots := make([]types.Hash256, 0, sectors)
// fill the volume
for i := 0; i < cap(roots); i++ {
var sector [rhpv2.SectorSize]byte
if _, err := frand.Read(sector[:256]); err != nil {
t.Fatal(err)
}
root := rhpv2.SectorRoot(&sector)
release, err := vm.Write(root, &sector)
if err != nil {
t.Fatal(i, err)
}
defer release()
roots = append(roots, root)

// validate the volume stats are correct
volumes, err := vm.Volumes()
if err != nil {
t.Fatal(err)
}
if volumes[0].UsedSectors != uint64(i+1) {
t.Fatalf("expected %v used sectors, got %v", i+1, volumes[0].UsedSectors)
} else if err := release(); err != nil {
t.Fatal(err)
}
}

// read the last 5 sectors all sectors should be cached
for i, root := range roots[5:] {
_, err := vm.Read(root)
if err != nil {
t.Fatal(err)
}

hits, misses := vm.CacheStats()
if hits != uint64(i+1) {
t.Fatalf("expected %v cache hits, got %v", i+1, hits)
} else if misses != 0 {
t.Fatalf("expected 0 cache misses, got %v", misses)
}
}

// read the first 5 sectors all sectors should be missed
for i, root := range roots[:5] {
_, err := vm.Read(root)
if err != nil {
t.Fatal(err)
}

hits, misses := vm.CacheStats()
if hits != 5 {
t.Fatalf("expected 5 cache hits, got %v", hits) // existing 5 cache hits
} else if misses != uint64(i+1) {
t.Fatalf("expected %v cache misses, got %v", i+1, misses)
}
}

// read the first 5 sectors again all sectors should be cached
for i, root := range roots[:5] {
_, err := vm.Read(root)
if err != nil {
t.Fatal(err)
}

expectedHits := 5 + (uint64(i) + 1) // 5 original hits, plus the new hit
hits, misses := vm.CacheStats()
if hits != expectedHits {
t.Fatalf("expected %d cache hits, got %v", expectedHits, hits)
} else if misses != 5 {
t.Fatalf("expected %v cache misses, got %v", 5, misses) // existing 5 cache misses
}
}
}

func BenchmarkVolumeManagerWrite(b *testing.B) {
dir := b.TempDir()

Expand Down
22 changes: 20 additions & 2 deletions persist/sqlite/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const (
metricTempSectors = "tempSectors"
metricSectorReads = "sectorReads"
metricSectorWrites = "sectorWrites"
metricSectorCacheHit = "sectorCacheHit"
metricSectorCacheMiss = "sectorCacheMiss"

// registry
metricMaxRegistryEntries = "maxRegistryEntries"
Expand Down Expand Up @@ -230,8 +232,8 @@ func (s *Store) IncrementRHP3DataUsage(ingress, egress uint64) error {
})
}

// IncrementSectorAccess increments the sector read and write metrics.
func (s *Store) IncrementSectorAccess(reads, writes uint64) error {
// IncrementSectorStats increments the sector read, write and cache metrics.
func (s *Store) IncrementSectorStats(reads, writes, cacheHit, cacheMiss uint64) error {
return s.transaction(func(tx txn) error {
if reads > 0 {
if err := incrementNumericStat(tx, metricSectorReads, int(reads), time.Now()); err != nil {
Expand All @@ -243,6 +245,18 @@ func (s *Store) IncrementSectorAccess(reads, writes uint64) error {
return fmt.Errorf("failed to track writes: %w", err)
}
}

if cacheHit > 0 {
if err := incrementNumericStat(tx, metricSectorCacheHit, int(cacheHit), time.Now()); err != nil {
return fmt.Errorf("failed to track cache hits: %w", err)
}
}

if cacheMiss > 0 {
if err := incrementNumericStat(tx, metricSectorCacheMiss, int(cacheMiss), time.Now()); err != nil {
return fmt.Errorf("failed to track cache misses: %w", err)
}
}
return nil
})
}
Expand Down Expand Up @@ -324,6 +338,10 @@ func mustParseMetricValue(stat string, buf []byte, m *metrics.Metrics) {
m.Storage.Reads = mustScanUint64(buf)
case metricSectorWrites:
m.Storage.Writes = mustScanUint64(buf)
case metricSectorCacheHit:
m.Storage.SectorCacheHits = mustScanUint64(buf)
case metricSectorCacheMiss:
m.Storage.SectorCacheMisses = mustScanUint64(buf)
// registry
case metricRegistryEntries:
m.Registry.Entries = mustScanUint64(buf)
Expand Down

0 comments on commit 8444c26

Please sign in to comment.