Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Protect all non-write locked atomic memory accesses
Browse files Browse the repository at this point in the history
  • Loading branch information
shanson7 authored and Dieterbe committed Aug 27, 2018
1 parent ff9c8e1 commit dbd7440
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 17 deletions.
39 changes: 23 additions & 16 deletions idx/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/grafana/metrictank/errors"
Expand Down Expand Up @@ -213,8 +214,8 @@ func (m *MemoryIdx) Stop() {
func (m *MemoryIdx) Update(point schema.MetricPoint, partition int32) (idx.Archive, int32, bool) {
pre := time.Now()

m.Lock()
defer m.Unlock()
m.RLock()
defer m.RUnlock()

existing, ok := m.defById[point.MKey]
if ok {
Expand All @@ -223,10 +224,10 @@ func (m *MemoryIdx) Update(point schema.MetricPoint, partition int32) (idx.Archi
log.Debug("memory-idx: metricDef with id %v already in index", point.MKey)
}

if existing.LastUpdate < int64(point.Time) {
existing.LastUpdate = int64(point.Time)
if atomic.LoadInt64(existing.LastUpdate) < int64(point.Time) {
atomic.SwapInt64(&existing.LastUpdate, int64(point.Time))
}
existing.Partition = partition
oldPart := atomic.SwapInt32(&existing.Partition, partition)
statUpdate.Inc()
statUpdateDuration.Value(time.Since(pre))
return *existing, oldPart, true
Expand All @@ -240,22 +241,28 @@ func (m *MemoryIdx) Update(point schema.MetricPoint, partition int32) (idx.Archi
// if was new -> adds new MetricDefinition to index
func (m *MemoryIdx) AddOrUpdate(mkey schema.MKey, data *schema.MetricData, partition int32) (idx.Archive, int32, bool) {
pre := time.Now()
m.Lock()
defer m.Unlock()

// Optimistically read lock
m.RLock()

existing, ok := m.defById[mkey]
if ok {
oldPart := existing.Partition
log.Debug("memory-idx: metricDef with id %s already in index.", mkey)
if existing.LastUpdate < int64(data.Time) {
existing.LastUpdate = int64(data.Time)
if atomic.LoadInt64(&existing.LastUpdate) < int64(data.Time) {
atomic.SwapInt64(&existing.LastUpdate, int64(data.Time))
}
existing.Partition = partition
statUpdate.Inc()
statUpdateDuration.Value(time.Since(pre))
m.RUnlock()
return *existing, oldPart, ok
}

m.RUnlock()
m.Lock()
defer m.Unlock()

def := schema.MetricDefinitionFromMetricData(data)
def.Partition = partition
archive := m.add(def)
Expand Down Expand Up @@ -541,7 +548,7 @@ func (m *MemoryIdx) TagDetails(orgId uint32, key, filter string, from int64) (ma
continue
}

if def.LastUpdate < from {
if atomic.LoadInt64(&def.LastUpdate) < from {
continue
}

Expand Down Expand Up @@ -814,7 +821,7 @@ func (m *MemoryIdx) hasOneMetricFrom(tags TagIndex, tag string, from int64) bool

// as soon as we found one metric definition with LastUpdate >= from
// we can return true
if def.LastUpdate >= from {
if atomic.LoadInt64(&def.LastUpdate) >= from {
return true
}
}
Expand Down Expand Up @@ -910,9 +917,9 @@ func (m *MemoryIdx) Find(orgId uint32, pattern string, from int64) ([]idx.Node,
idxNode.Defs = make([]idx.Archive, 0, len(n.Defs))
for _, id := range n.Defs {
def := m.defById[id]
if from != 0 && def.LastUpdate < from {
if from != 0 && atomic.LoadInt64(&def.LastUpdate) < from {
statFiltered.Inc()
log.Debug("memory-idx: from is %d, so skipping %s which has LastUpdate %d", from, def.Id, def.LastUpdate)
log.Debug("memory-idx: from is %d, so skipping %s which has LastUpdate %d", from, def.Id, atomic.LoadInt64(&def.LastUpdate))
continue
}
log.Debug("memory-idx Find: adding to path %s archive id=%s name=%s int=%d schemaId=%d aggId=%d lastSave=%d", n.Path, def.Id, def.Name, def.Interval, def.SchemaId, def.AggId, def.LastSave)
Expand Down Expand Up @@ -1253,7 +1260,7 @@ func (m *MemoryIdx) Prune(oldest time.Time) ([]idx.Archive, error) {
m.RLock()
DEFS:
for _, def := range m.defById {
if def.LastUpdate >= oldestUnix {
if atomic.LoadInt64(&def.LastUpdate) >= oldestUnix {
continue DEFS
}

Expand All @@ -1269,7 +1276,7 @@ DEFS:
}

for _, id := range n.Defs {
if m.defById[id].LastUpdate >= oldestUnix {
if atomic.LoadInt64(&m.defById[id].LastUpdate) >= oldestUnix {
continue DEFS
}
}
Expand All @@ -1280,7 +1287,7 @@ DEFS:
// if any other MetricDef with the same tag set is not expired yet,
// then we do not want to prune any of them
for def := range defs {
if def.LastUpdate >= oldestUnix {
if atomic.LoadInt64(&def.LastUpdate) >= oldestUnix {
continue DEFS
}
}
Expand Down
2 changes: 1 addition & 1 deletion idx/memory/tag_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ func (q *TagQuery) testByTagMatch(def *idx.Archive) bool {

// testByFrom filters a given metric by its LastUpdate time
func (q *TagQuery) testByFrom(def *idx.Archive) bool {
return q.from <= def.LastUpdate
return q.from <= atomic.LoadInt64(&def.LastUpdate)
}

// testByPrefix filters a given metric by matching prefixes against the values
Expand Down

0 comments on commit dbd7440

Please sign in to comment.