Skip to content

Commit

Permalink
Use better type to handle back pressure setting
Browse files Browse the repository at this point in the history
  • Loading branch information
Paulo Janotti committed Jul 11, 2019
1 parent 53789c0 commit 47e0826
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 33 deletions.
26 changes: 17 additions & 9 deletions config/configmodels/configmodels.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,24 +126,24 @@ type Pipelines map[string]*Pipeline
// These are helper structs which you can embed when implementing your specific
// receiver/exporter/processor config storage.

// BackPressureState defines if backpressure should be exterted or not.
type BackPressureState bool
// BackPressureSetting defines if back pressure should be exerted or not.
type BackPressureSetting int

const (
// EnableBackPressure indicates that backpressure is enabled.
EnableBackPressure BackPressureState = false
EnableBackPressure BackPressureSetting = iota
// DisableBackPressure indicates that backpressure is disabled.
DisableBackPressure BackPressureState = true
DisableBackPressure
)

// ReceiverSettings defines common settings for a single-protocol receiver configuration.
// Specific receivers can embed this struct and extend it with more fields if needed.
type ReceiverSettings struct {
TypeVal string `mapstructure:"-"`
NameVal string `mapstructure:"-"`
Enabled bool `mapstructure:"enabled"`
Endpoint string `mapstructure:"endpoint"`
BackPressureState BackPressureState `mapstructure:"disable-backpressure"`
TypeVal string `mapstructure:"-"`
NameVal string `mapstructure:"-"`
Enabled bool `mapstructure:"enabled"`
Endpoint string `mapstructure:"endpoint"`
DisableBackPressure bool `mapstructure:"disable-backpressure"`
}

// Name gets the receiver name.
Expand All @@ -166,6 +166,14 @@ func (rs *ReceiverSettings) SetType(typeStr string) {
rs.TypeVal = typeStr
}

// BackPressureSetting gets the back pressure setting of the configuration.
func (rs *ReceiverSettings) BackPressureSetting() BackPressureSetting {
if rs.DisableBackPressure {
return DisableBackPressure
}
return EnableBackPressure
}

// ExporterSettings defines common settings for an exporter configuration.
// Specific exporters can embed this struct and extend it with more fields if needed.
type ExporterSettings struct {
Expand Down
6 changes: 4 additions & 2 deletions observability/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opencensus.io/trace"

"github.com/open-telemetry/opentelemetry-service/config/configmodels"
)

var (
Expand Down Expand Up @@ -138,8 +140,8 @@ func ContextWithReceiverName(ctx context.Context, receiverName string) context.C
// when the host blocks ingestion. If back pressure is disabled the metric for
// respective data loss is recorded.
// Use it with a context.Context generated using ContextWithReceiverName().
func RecordIngestionBlockedMetrics(ctxWithTraceReceiverName context.Context, backPressureDisabled bool) {
if backPressureDisabled {
func RecordIngestionBlockedMetrics(ctxWithTraceReceiverName context.Context, backPressureSetting configmodels.BackPressureSetting) {
if backPressureSetting == configmodels.DisableBackPressure {
// In this case data loss will happen, record the proper metric.
stats.Record(ctxWithTraceReceiverName, mReceiverIngestionBlockedRPCsWithDataLoss.M(1))
}
Expand Down
5 changes: 3 additions & 2 deletions observability/observability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"testing"

"github.com/open-telemetry/opentelemetry-service/config/configmodels"
"github.com/open-telemetry/opentelemetry-service/observability"
"github.com/open-telemetry/opentelemetry-service/observability/observabilitytest"
)
Expand All @@ -35,8 +36,8 @@ func TestTracePieplineRecordedMetrics(t *testing.T) {

receiverCtx := observability.ContextWithReceiverName(context.Background(), receiverName)
observability.RecordTraceReceiverMetrics(receiverCtx, 17, 13)
observability.RecordIngestionBlockedMetrics(receiverCtx, false)
observability.RecordIngestionBlockedMetrics(receiverCtx, true)
observability.RecordIngestionBlockedMetrics(receiverCtx, configmodels.EnableBackPressure)
observability.RecordIngestionBlockedMetrics(receiverCtx, configmodels.DisableBackPressure)
exporterCtx := observability.ContextWithExporterName(receiverCtx, exporterName)
observability.RecordTraceExporterMetrics(exporterCtx, 27, 23)
if err := observabilitytest.CheckValueViewReceiverReceivedSpans(receiverName, 17); err != nil {
Expand Down
10 changes: 5 additions & 5 deletions receiver/zipkinreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ func TestLoadConfig(t *testing.T) {
assert.Equal(t, r1,
&Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: typeStr,
NameVal: "zipkin/customname",
Endpoint: "127.0.0.1:8765",
Enabled: true,
BackPressureState: configmodels.DisableBackPressure,
TypeVal: typeStr,
NameVal: "zipkin/customname",
Endpoint: "127.0.0.1:8765",
Enabled: true,
DisableBackPressure: true,
},
})
}
2 changes: 1 addition & 1 deletion receiver/zipkinreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (f *factory) CreateTraceReceiver(
) (receiver.TraceReceiver, error) {

rCfg := cfg.(*Config)
return New(rCfg.Endpoint, configmodels.EnableBackPressure, nextConsumer)
return New(rCfg.Endpoint, rCfg.BackPressureSetting(), nextConsumer)
}

// CreateMetricsReceiver creates a metrics receiver based on provided config.
Expand Down
9 changes: 9 additions & 0 deletions receiver/zipkinreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/open-telemetry/opentelemetry-service/config/configerror"
"github.com/open-telemetry/opentelemetry-service/config/configmodels"
"github.com/open-telemetry/opentelemetry-service/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-service/receiver"
)
Expand All @@ -47,6 +48,14 @@ func TestCreateReceiver(t *testing.T) {
tReceiver, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, &mockTraceConsumer{})
assert.Nil(t, err, "receiver creation failed")
assert.NotNil(t, tReceiver, "receiver creation failed")
assert.Equal(t, configmodels.EnableBackPressure, tReceiver.(*ZipkinReceiver).backPressureSetting)

rCfg := cfg.(*Config)
rCfg.DisableBackPressure = true
tReceiver, err = factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, &mockTraceConsumer{})
assert.Nil(t, err, "receiver creation failed")
assert.NotNil(t, tReceiver, "receiver creation failed")
assert.Equal(t, configmodels.DisableBackPressure, tReceiver.(*ZipkinReceiver).backPressureSetting)

mReceiver, err := factory.CreateMetricsReceiver(zap.NewNop(), cfg, nil)
assert.Equal(t, err, configerror.ErrDataTypeIsNotSupported)
Expand Down
20 changes: 10 additions & 10 deletions receiver/zipkinreceiver/trace_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ type ZipkinReceiver struct {
mu sync.Mutex

// addr is the address onto which the HTTP server will be bound
addr string
host receiver.Host
backPressureState configmodels.BackPressureState
nextConsumer consumer.TraceConsumer
addr string
host receiver.Host
backPressureSetting configmodels.BackPressureSetting
nextConsumer consumer.TraceConsumer

startOnce sync.Once
stopOnce sync.Once
Expand All @@ -72,15 +72,15 @@ var _ receiver.TraceReceiver = (*ZipkinReceiver)(nil)
var _ http.Handler = (*ZipkinReceiver)(nil)

// New creates a new zipkinreceiver.ZipkinReceiver reference.
func New(address string, backPressureState configmodels.BackPressureState, nextConsumer consumer.TraceConsumer) (*ZipkinReceiver, error) {
func New(address string, backPressureSetting configmodels.BackPressureSetting, nextConsumer consumer.TraceConsumer) (*ZipkinReceiver, error) {
if nextConsumer == nil {
return nil, errNilNextConsumer
}

zr := &ZipkinReceiver{
addr: address,
backPressureState: backPressureState,
nextConsumer: nextConsumer,
addr: address,
backPressureSetting: backPressureSetting,
nextConsumer: nextConsumer,
}
return zr, nil
}
Expand Down Expand Up @@ -316,7 +316,7 @@ func (zr *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !zr.host.OkToIngest() {
var responseStatusCode int
var zPageMessage string
if zr.backPressureState == configmodels.EnableBackPressure {
if zr.backPressureSetting == configmodels.EnableBackPressure {
responseStatusCode = http.StatusServiceUnavailable
zPageMessage = "Host blocked ingestion. Back pressure is ON."
} else {
Expand All @@ -332,7 +332,7 @@ func (zr *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) {

observability.RecordIngestionBlockedMetrics(
ctxWithReceiverName,
zr.backPressureState == configmodels.DisableBackPressure)
zr.backPressureSetting)
w.WriteHeader(responseStatusCode)
return
}
Expand Down
8 changes: 4 additions & 4 deletions receiver/zipkinreceiver/trace_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,15 +580,15 @@ func TestZipkinExporter_HostIngestionStatusChanges(t *testing.T) {
}
tests := []struct {
name string
backPressureState configmodels.BackPressureState
backPressureSetting configmodels.BackPressureSetting
expectedReceivedBatches int
expectedIngestionBlockedRPCs int
expectedIngestionBlockedRPCsNoBackPressure int
ingestionStates []ingestionStateTest
}{
{
name: "EnableBackPressure",
backPressureState: configmodels.EnableBackPressure,
backPressureSetting: configmodels.EnableBackPressure,
expectedReceivedBatches: 2,
expectedIngestionBlockedRPCs: 1,
expectedIngestionBlockedRPCsNoBackPressure: 0,
Expand All @@ -609,7 +609,7 @@ func TestZipkinExporter_HostIngestionStatusChanges(t *testing.T) {
},
{
name: "DisableBackPressure",
backPressureState: configmodels.DisableBackPressure,
backPressureSetting: configmodels.DisableBackPressure,
expectedReceivedBatches: 2,
expectedIngestionBlockedRPCs: 1,
expectedIngestionBlockedRPCsNoBackPressure: 1,
Expand All @@ -636,7 +636,7 @@ func TestZipkinExporter_HostIngestionStatusChanges(t *testing.T) {
defer doneFn()

sink := new(exportertest.SinkTraceExporter)
zr, err := New("127.0.0.1:0", tt.backPressureState, sink)
zr, err := New("127.0.0.1:0", tt.backPressureSetting, sink)
require.Nil(t, err)
require.NotNil(t, zr)
defer zr.StopTraceReception()
Expand Down

0 comments on commit 47e0826

Please sign in to comment.