From 9048b081e9d22d7aaf16815ceb68babc579fe92f Mon Sep 17 00:00:00 2001 From: Paulo Janotti Date: Wed, 10 Jul 2019 20:56:52 -0700 Subject: [PATCH] Use better type to handle back pressure setting --- config/configmodels/configmodels.go | 26 ++++++++++++------- observability/observability.go | 6 +++-- observability/observability_test.go | 5 ++-- receiver/zipkinreceiver/config_test.go | 10 +++---- receiver/zipkinreceiver/factory.go | 2 +- receiver/zipkinreceiver/factory_test.go | 9 +++++++ receiver/zipkinreceiver/trace_receiver.go | 20 +++++++------- .../zipkinreceiver/trace_receiver_test.go | 8 +++--- 8 files changed, 53 insertions(+), 33 deletions(-) diff --git a/config/configmodels/configmodels.go b/config/configmodels/configmodels.go index 56c292efc74..6e69c4771b4 100644 --- a/config/configmodels/configmodels.go +++ b/config/configmodels/configmodels.go @@ -126,24 +126,24 @@ type Pipelines map[string]*Pipeline // These are helper structs which you can embed when implementing your specific // receiver/exporter/processor config storage. -// BackPressureState defines if backpressure should be exterted or not. -type BackPressureState bool +// BackPressureSetting defines if back pressure should be exerted or not. +type BackPressureSetting int const ( // EnableBackPressure indicates that backpressure is enabled. - EnableBackPressure BackPressureState = false + EnableBackPressure BackPressureSetting = iota // DisableBackPressure indicates that backpressure is disabled. - DisableBackPressure BackPressureState = true + DisableBackPressure ) // ReceiverSettings defines common settings for a single-protocol receiver configuration. // Specific receivers can embed this struct and extend it with more fields if needed. type ReceiverSettings struct { - TypeVal string `mapstructure:"-"` - NameVal string `mapstructure:"-"` - Enabled bool `mapstructure:"enabled"` - Endpoint string `mapstructure:"endpoint"` - BackPressureState BackPressureState `mapstructure:"disable-backpressure"` + TypeVal string `mapstructure:"-"` + NameVal string `mapstructure:"-"` + Enabled bool `mapstructure:"enabled"` + Endpoint string `mapstructure:"endpoint"` + DisableBackPressure bool `mapstructure:"disable-backpressure"` } // Name gets the receiver name. @@ -166,6 +166,14 @@ func (rs *ReceiverSettings) SetType(typeStr string) { rs.TypeVal = typeStr } +// BackPressureSetting gets the back pressure setting of the configuration. +func (rs *ReceiverSettings) BackPressureSetting() BackPressureSetting { + if rs.DisableBackPressure { + return DisableBackPressure + } + return EnableBackPressure +} + // ExporterSettings defines common settings for an exporter configuration. // Specific exporters can embed this struct and extend it with more fields if needed. type ExporterSettings struct { diff --git a/observability/observability.go b/observability/observability.go index 9af20ec9d2d..ba2285bd565 100644 --- a/observability/observability.go +++ b/observability/observability.go @@ -28,6 +28,8 @@ import ( "go.opencensus.io/stats/view" "go.opencensus.io/tag" "go.opencensus.io/trace" + + "github.com/open-telemetry/opentelemetry-service/config/configmodels" ) var ( @@ -138,8 +140,8 @@ func ContextWithReceiverName(ctx context.Context, receiverName string) context.C // when the host blocks ingestion. If back pressure is disabled the metric for // respective data loss is recorded. // Use it with a context.Context generated using ContextWithReceiverName(). -func RecordIngestionBlockedMetrics(ctxWithTraceReceiverName context.Context, backPressureDisabled bool) { - if backPressureDisabled { +func RecordIngestionBlockedMetrics(ctxWithTraceReceiverName context.Context, backPressureSetting configmodels.BackPressureSetting) { + if backPressureSetting == configmodels.DisableBackPressure { // In this case data loss will happen, record the proper metric. stats.Record(ctxWithTraceReceiverName, mReceiverIngestionBlockedRPCsWithDataLoss.M(1)) } diff --git a/observability/observability_test.go b/observability/observability_test.go index cc980a1b15c..7ec2254f5e2 100644 --- a/observability/observability_test.go +++ b/observability/observability_test.go @@ -20,6 +20,7 @@ import ( "context" "testing" + "github.com/open-telemetry/opentelemetry-service/config/configmodels" "github.com/open-telemetry/opentelemetry-service/observability" "github.com/open-telemetry/opentelemetry-service/observability/observabilitytest" ) @@ -35,8 +36,8 @@ func TestTracePieplineRecordedMetrics(t *testing.T) { receiverCtx := observability.ContextWithReceiverName(context.Background(), receiverName) observability.RecordTraceReceiverMetrics(receiverCtx, 17, 13) - observability.RecordIngestionBlockedMetrics(receiverCtx, false) - observability.RecordIngestionBlockedMetrics(receiverCtx, true) + observability.RecordIngestionBlockedMetrics(receiverCtx, configmodels.EnableBackPressure) + observability.RecordIngestionBlockedMetrics(receiverCtx, configmodels.DisableBackPressure) exporterCtx := observability.ContextWithExporterName(receiverCtx, exporterName) observability.RecordTraceExporterMetrics(exporterCtx, 27, 23) if err := observabilitytest.CheckValueViewReceiverReceivedSpans(receiverName, 17); err != nil { diff --git a/receiver/zipkinreceiver/config_test.go b/receiver/zipkinreceiver/config_test.go index c8b33ab710b..92761e77180 100644 --- a/receiver/zipkinreceiver/config_test.go +++ b/receiver/zipkinreceiver/config_test.go @@ -45,11 +45,11 @@ func TestLoadConfig(t *testing.T) { assert.Equal(t, r1, &Config{ ReceiverSettings: configmodels.ReceiverSettings{ - TypeVal: typeStr, - NameVal: "zipkin/customname", - Endpoint: "127.0.0.1:8765", - Enabled: true, - BackPressureState: configmodels.DisableBackPressure, + TypeVal: typeStr, + NameVal: "zipkin/customname", + Endpoint: "127.0.0.1:8765", + Enabled: true, + DisableBackPressure: true, }, }) } diff --git a/receiver/zipkinreceiver/factory.go b/receiver/zipkinreceiver/factory.go index 81f8823e903..d5bf35766ed 100644 --- a/receiver/zipkinreceiver/factory.go +++ b/receiver/zipkinreceiver/factory.go @@ -70,7 +70,7 @@ func (f *factory) CreateTraceReceiver( ) (receiver.TraceReceiver, error) { rCfg := cfg.(*Config) - return New(rCfg.Endpoint, configmodels.EnableBackPressure, nextConsumer) + return New(rCfg.Endpoint, rCfg.BackPressureSetting(), nextConsumer) } // CreateMetricsReceiver creates a metrics receiver based on provided config. diff --git a/receiver/zipkinreceiver/factory_test.go b/receiver/zipkinreceiver/factory_test.go index 1ddd161be0b..bbdb4daff60 100644 --- a/receiver/zipkinreceiver/factory_test.go +++ b/receiver/zipkinreceiver/factory_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/open-telemetry/opentelemetry-service/config/configerror" + "github.com/open-telemetry/opentelemetry-service/config/configmodels" "github.com/open-telemetry/opentelemetry-service/consumer/consumerdata" "github.com/open-telemetry/opentelemetry-service/receiver" ) @@ -47,6 +48,14 @@ func TestCreateReceiver(t *testing.T) { tReceiver, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, &mockTraceConsumer{}) assert.Nil(t, err, "receiver creation failed") assert.NotNil(t, tReceiver, "receiver creation failed") + assert.Equal(t, configmodels.EnableBackPressure, tReceiver.(*ZipkinReceiver).backPressureSetting) + + rCfg := cfg.(*Config) + rCfg.DisableBackPressure = true + tReceiver, err = factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, &mockTraceConsumer{}) + assert.Nil(t, err, "receiver creation failed") + assert.NotNil(t, tReceiver, "receiver creation failed") + assert.Equal(t, configmodels.DisableBackPressure, tReceiver.(*ZipkinReceiver).backPressureSetting) mReceiver, err := factory.CreateMetricsReceiver(zap.NewNop(), cfg, nil) assert.Equal(t, err, configerror.ErrDataTypeIsNotSupported) diff --git a/receiver/zipkinreceiver/trace_receiver.go b/receiver/zipkinreceiver/trace_receiver.go index a66c8e93bd6..942cfb1841e 100644 --- a/receiver/zipkinreceiver/trace_receiver.go +++ b/receiver/zipkinreceiver/trace_receiver.go @@ -58,10 +58,10 @@ type ZipkinReceiver struct { mu sync.Mutex // addr is the address onto which the HTTP server will be bound - addr string - host receiver.Host - backPressureState configmodels.BackPressureState - nextConsumer consumer.TraceConsumer + addr string + host receiver.Host + backPressureSetting configmodels.BackPressureSetting + nextConsumer consumer.TraceConsumer startOnce sync.Once stopOnce sync.Once @@ -72,15 +72,15 @@ var _ receiver.TraceReceiver = (*ZipkinReceiver)(nil) var _ http.Handler = (*ZipkinReceiver)(nil) // New creates a new zipkinreceiver.ZipkinReceiver reference. -func New(address string, backPressureState configmodels.BackPressureState, nextConsumer consumer.TraceConsumer) (*ZipkinReceiver, error) { +func New(address string, backPressureSetting configmodels.BackPressureSetting, nextConsumer consumer.TraceConsumer) (*ZipkinReceiver, error) { if nextConsumer == nil { return nil, errNilNextConsumer } zr := &ZipkinReceiver{ - addr: address, - backPressureState: backPressureState, - nextConsumer: nextConsumer, + addr: address, + backPressureSetting: backPressureSetting, + nextConsumer: nextConsumer, } return zr, nil } @@ -316,7 +316,7 @@ func (zr *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !zr.host.OkToIngest() { var responseStatusCode int var zPageMessage string - if zr.backPressureState == configmodels.EnableBackPressure { + if zr.backPressureSetting == configmodels.EnableBackPressure { responseStatusCode = http.StatusServiceUnavailable zPageMessage = "Host blocked ingestion. Back pressure is ON." } else { @@ -332,7 +332,7 @@ func (zr *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { observability.RecordIngestionBlockedMetrics( ctxWithReceiverName, - zr.backPressureState == configmodels.DisableBackPressure) + zr.backPressureSetting) w.WriteHeader(responseStatusCode) return } diff --git a/receiver/zipkinreceiver/trace_receiver_test.go b/receiver/zipkinreceiver/trace_receiver_test.go index 828c28a424d..0276791d1dc 100644 --- a/receiver/zipkinreceiver/trace_receiver_test.go +++ b/receiver/zipkinreceiver/trace_receiver_test.go @@ -580,7 +580,7 @@ func TestZipkinExporter_HostIngestionStatusChanges(t *testing.T) { } tests := []struct { name string - backPressureState configmodels.BackPressureState + backPressureSetting configmodels.BackPressureSetting expectedReceivedBatches int expectedIngestionBlockedRPCs int expectedIngestionBlockedRPCsNoBackPressure int @@ -588,7 +588,7 @@ func TestZipkinExporter_HostIngestionStatusChanges(t *testing.T) { }{ { name: "EnableBackPressure", - backPressureState: configmodels.EnableBackPressure, + backPressureSetting: configmodels.EnableBackPressure, expectedReceivedBatches: 2, expectedIngestionBlockedRPCs: 1, expectedIngestionBlockedRPCsNoBackPressure: 0, @@ -609,7 +609,7 @@ func TestZipkinExporter_HostIngestionStatusChanges(t *testing.T) { }, { name: "DisableBackPressure", - backPressureState: configmodels.DisableBackPressure, + backPressureSetting: configmodels.DisableBackPressure, expectedReceivedBatches: 2, expectedIngestionBlockedRPCs: 1, expectedIngestionBlockedRPCsNoBackPressure: 1, @@ -636,7 +636,7 @@ func TestZipkinExporter_HostIngestionStatusChanges(t *testing.T) { defer doneFn() sink := new(exportertest.SinkTraceExporter) - zr, err := New("127.0.0.1:0", tt.backPressureState, sink) + zr, err := New("127.0.0.1:0", tt.backPressureSetting, sink) require.Nil(t, err) require.NotNil(t, zr) defer zr.StopTraceReception()