Skip to content

Commit

Permalink
[metricbeat] migrate Redis to reporterV2 with error (#11792)
Browse files Browse the repository at this point in the history
* migrate redis to reporter V2 with error return
  • Loading branch information
fearful-symmetry committed Apr 15, 2019
1 parent 5836641 commit 2499449
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 48 deletions.
17 changes: 6 additions & 11 deletions metricbeat/module/redis/info/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,11 @@ import (

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
"github.com/elastic/beats/metricbeat/module/redis"
)

var (
debugf = logp.MakeDebug("redis-info")
)

func init() {
mb.Registry.MustAddMetricSet("redis", "info", New,
mb.WithHostParser(parse.PassThruHostParser),
Expand All @@ -54,12 +49,11 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
}

// Fetch fetches metrics from Redis by issuing the INFO command.
func (m *MetricSet) Fetch(r mb.ReporterV2) {
func (m *MetricSet) Fetch(r mb.ReporterV2) error {
// Fetch default INFO.
info, err := redis.FetchRedisInfo("default", m.Connection())
if err != nil {
logp.Err("Failed to fetch redis info: %s", err)
return
return errors.Wrap(err, "failed to fetch redis info")
}

// In 5.0 some fields are renamed, maintain both names, old ones will be deprecated
Expand All @@ -79,11 +73,12 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) {

slowLogLength, err := redis.FetchSlowLogLength(m.Connection())
if err != nil {
logp.Err("Failed to fetch slow log length: %s", err)
return
return errors.Wrap(err, "failed to fetch slow log length")

}
info["slowlog_len"] = strconv.FormatInt(slowLogLength, 10)

debugf("Redis INFO from %s: %+v", m.Host(), info)
m.Logger().Debugf("Redis INFO from %s: %+v", m.Host(), info)
eventMapping(r, info)
return nil
}
8 changes: 4 additions & 4 deletions metricbeat/module/redis/info/info_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ var redisHost = redis.GetRedisEnvHost() + ":" + redis.GetRedisEnvPort()
func TestFetch(t *testing.T) {
compose.EnsureUp(t, "redis")

ms := mbtest.NewReportingMetricSetV2(t, getConfig())
events, err := mbtest.ReportingFetchV2(ms)
ms := mbtest.NewReportingMetricSetV2Error(t, getConfig())
events, err := mbtest.ReportingFetchV2Error(ms)
if err != nil {
t.Fatal("fetch", err)
}
Expand All @@ -56,8 +56,8 @@ func TestFetch(t *testing.T) {
func TestData(t *testing.T) {
compose.EnsureUp(t, "redis")

ms := mbtest.NewReportingMetricSetV2(t, getConfig())
err := mbtest.WriteEventsReporterV2(ms, t, "")
ms := mbtest.NewReportingMetricSetV2Error(t, getConfig())
err := mbtest.WriteEventsReporterV2Error(ms, t, "")
if err != nil {
t.Fatal("write", err)
}
Expand Down
28 changes: 16 additions & 12 deletions metricbeat/module/redis/key/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,15 @@
package key

import (
"fmt"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
"github.com/elastic/beats/metricbeat/module/redis"
)

var (
debugf = logp.MakeDebug("redis-key")
)

func init() {
mb.Registry.MustAddMetricSet("redis", "key", New,
mb.WithHostParser(parse.PassThruHostParser),
Expand Down Expand Up @@ -71,35 +68,42 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
}

// Fetch fetches information from Redis keys
func (m *MetricSet) Fetch(r mb.ReporterV2) {
func (m *MetricSet) Fetch(r mb.ReporterV2) error {
conn := m.Connection()
for _, p := range m.patterns {
if err := redis.Select(conn, p.Keyspace); err != nil {
logp.Err("Failed to select keyspace %d: %s", p.Keyspace, err)
msg := errors.Wrapf(err, "Failed to select keyspace %d", p.Keyspace)
m.Logger().Error(msg)
r.Error(err)
continue
}

keys, err := redis.FetchKeys(conn, p.Pattern, p.Limit)
if err != nil {
logp.Err("Failed to list keys in keyspace %d with pattern '%s': %s", p.Keyspace, p.Pattern, err)
msg := errors.Wrapf(err, "Failed to list keys in keyspace %d with pattern '%s'", p.Keyspace, p.Pattern)
m.Logger().Error(msg)
r.Error(err)
continue
}
if p.Limit > 0 && len(keys) > int(p.Limit) {
debugf("Collecting stats for %d keys, but there are more available for pattern '%s' in keyspace %d", p.Limit)
m.Logger().Debugf("Collecting stats for %d keys, but there are more available for pattern '%s' in keyspace %d", p.Limit)
keys = keys[:p.Limit]
}

for _, key := range keys {
keyInfo, err := redis.FetchKeyInfo(conn, key)
if err != nil {
logp.Err("Failed to fetch key info for key %s in keyspace %d", key, p.Keyspace)
msg := fmt.Errorf("Failed to fetch key info for key %s in keyspace %d", key, p.Keyspace)
m.Logger().Error(msg)
r.Error(err)
continue
}
event := eventMapping(p.Keyspace, keyInfo)
if !r.Event(event) {
debugf("Failed to report event, interrupting Fetch")
return
return errors.New("metricset has closed")
}
}
}

return nil
}
8 changes: 4 additions & 4 deletions metricbeat/module/redis/key/key_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func TestFetch(t *testing.T) {

addEntry(t)

ms := mbtest.NewReportingMetricSetV2(t, getConfig())
events, err := mbtest.ReportingFetchV2(ms)
ms := mbtest.NewReportingMetricSetV2Error(t, getConfig())
events, err := mbtest.ReportingFetchV2Error(ms)
if err != nil {
t.Fatal("fetch", err)
}
Expand All @@ -52,8 +52,8 @@ func TestData(t *testing.T) {

addEntry(t)

ms := mbtest.NewReportingMetricSetV2(t, getConfig())
err := mbtest.WriteEventsReporterV2(ms, t, "")
ms := mbtest.NewReportingMetricSetV2Error(t, getConfig())
err := mbtest.WriteEventsReporterV2Error(ms, t, "")
if err != nil {
t.Fatal("write", err)
}
Expand Down
5 changes: 1 addition & 4 deletions metricbeat/module/redis/keyspace/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ func eventsMapping(r mb.ReporterV2, info map[string]string) {
event := mb.Event{
MetricSetFields: space,
}
if !r.Event(event) {
debugf("Failed to report event, interrupting Fetch")
return
}
r.Event(event)
}
}

Expand Down
13 changes: 4 additions & 9 deletions metricbeat/module/redis/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,11 @@ package keyspace
import (
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
"github.com/elastic/beats/metricbeat/module/redis"
)

var (
debugf = logp.MakeDebug("redis-keyspace")
)

func init() {
mb.Registry.MustAddMetricSet("redis", "keyspace", New,
mb.WithHostParser(parse.PassThruHostParser),
Expand All @@ -52,14 +47,14 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
}

// Fetch fetches metrics from Redis by issuing the INFO command.
func (m *MetricSet) Fetch(r mb.ReporterV2) {
func (m *MetricSet) Fetch(r mb.ReporterV2) error {
// Fetch default INFO.
info, err := redis.FetchRedisInfo("keyspace", m.Connection())
if err != nil {
logp.Err("Failed to fetch redis info for keyspaces: %s", err)
return
return errors.Wrap(err, "Failed to fetch redis info for keyspaces")
}

debugf("Redis INFO from %s: %+v", m.Host(), info)
m.Logger().Debugf("Redis INFO from %s: %+v", m.Host(), info)
eventsMapping(r, info)
return nil
}
8 changes: 4 additions & 4 deletions metricbeat/module/redis/keyspace/keyspace_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func TestFetch(t *testing.T) {
addEntry(t)

// Fetch data
ms := mbtest.NewReportingMetricSetV2(t, getConfig())
events, err := mbtest.ReportingFetchV2(ms)
ms := mbtest.NewReportingMetricSetV2Error(t, getConfig())
events, err := mbtest.ReportingFetchV2Error(ms)
if err != nil {
t.Fatal("fetch", err)
}
Expand All @@ -63,8 +63,8 @@ func TestData(t *testing.T) {

addEntry(t)

ms := mbtest.NewReportingMetricSetV2(t, getConfig())
err := mbtest.WriteEventsReporterV2(ms, t, "")
ms := mbtest.NewReportingMetricSetV2Error(t, getConfig())
err := mbtest.WriteEventsReporterV2Error(ms, t, "")
if err != nil {
t.Fatal("write", err)
}
Expand Down

0 comments on commit 2499449

Please sign in to comment.