diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 3c3b4da214d..3212eece712 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -128,11 +128,6 @@ func runReceive( level.Info(logger).Log("mode", receiveMode, "msg", "running receive") - rwTLSConfig, err := tls.NewServerConfig(log.With(logger, "protocol", "HTTP"), conf.rwServerCert, conf.rwServerKey, conf.rwServerClientCA) - if err != nil { - return err - } - dialOpts, err := extgrpc.StoreClientGRPCOpts( logger, reg, @@ -151,36 +146,11 @@ func runReceive( dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(conf.compression))) } - var bkt objstore.Bucket confContentYaml, err := conf.objStoreConfig.Content() if err != nil { return err } - // Has this thanos receive instance been configured to ingest metrics into a local TSDB? - enableIngestion := receiveMode == receive.IngestorOnly || receiveMode == receive.RouterIngestor - - upload := len(confContentYaml) > 0 - if enableIngestion { - if upload { - if tsdbOpts.MinBlockDuration != tsdbOpts.MaxBlockDuration { - if !conf.ignoreBlockSize { - return errors.Errorf("found that TSDB Max time is %d and Min time is %d. "+ - "Compaction needs to be disabled (tsdb.min-block-duration = tsdb.max-block-duration)", tsdbOpts.MaxBlockDuration, tsdbOpts.MinBlockDuration) - } - level.Warn(logger).Log("msg", "flag to ignore min/max block duration flags differing is being used. If the upload of a 2h block fails and a tsdb compaction happens that block may be missing from your Thanos bucket storage.") - } - // The background shipper continuously scans the data directory and uploads - // new blocks to object storage service. - bkt, err = client.NewBucket(logger, confContentYaml, reg, comp.String()) - if err != nil { - return err - } - } else { - level.Info(logger).Log("msg", "no supported bucket was configured, uploads will be disabled") - } - } - // TODO(brancz): remove after a couple of versions // Migrate non-multi-tsdb capable storage to multi-tsdb disk layout. if err := migrateLegacyStorage(logger, conf.dataDir, conf.defaultTenantID); err != nil { @@ -191,25 +161,20 @@ func runReceive( if err != nil { return errors.Wrap(err, "get content of relabel configuration") } - var relabelConfig []*relabel.Config + + var ( + bkt objstore.Bucket + relabelConfig []*relabel.Config + limitsConfig *receive.RootLimitsConfig + // hashringChangedChan signals when TSDB needs to be flushed and updated due to hashring config change. + hashringChangedChan = make(chan struct{}, 1) + dbs *receive.MultiTSDB + ) + if err := yaml.Unmarshal(relabelContentYaml, &relabelConfig); err != nil { return errors.Wrap(err, "parse relabel configuration") } - dbs := receive.NewMultiTSDB( - conf.dataDir, - logger, - reg, - tsdbOpts, - lset, - conf.tenantLabelName, - bkt, - conf.allowOutOfOrderUpload, - hashFunc, - ) - writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs, conf.writerInterning) - - var limitsConfig *receive.RootLimitsConfig if conf.writeLimitsConfig != nil { limitsContentYaml, err := conf.writeLimitsConfig.Content() if err != nil { @@ -220,31 +185,12 @@ func runReceive( return errors.Wrap(err, "parse limit configuration") } } + limiter, err := receive.NewLimiter(conf.writeLimitsConfig, reg, receiveMode, log.With(logger, "component", "receive-limiter")) if err != nil { return errors.Wrap(err, "creating limiter") } - webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{ - Writer: writer, - ListenAddress: conf.rwAddress, - Registry: reg, - Endpoint: conf.endpoint, - TenantHeader: conf.tenantHeader, - TenantField: conf.tenantField, - DefaultTenantID: conf.defaultTenantID, - ReplicaHeader: conf.replicaHeader, - ReplicationFactor: conf.replicationFactor, - RelabelConfigs: relabelConfig, - ReceiverMode: receiveMode, - Tracer: tracer, - TLSConfig: rwTLSConfig, - DialOpts: dialOpts, - ForwardTimeout: time.Duration(*conf.forwardTimeout), - TSDBStats: dbs, - Limiter: limiter, - }) - grpcProbe := prober.NewGRPC() httpProbe := prober.NewHTTP() statusProber := prober.Combine( @@ -253,26 +199,99 @@ func runReceive( prober.NewInstrumentation(comp, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)), ) - // Start all components while we wait for TSDB to open but only load - // initial config and mark ourselves as ready after it completes. + writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs, conf.writerInterning) - // hashringChangedChan signals when TSDB needs to be flushed and updated due to hashring config change. - hashringChangedChan := make(chan struct{}, 1) + upload := len(confContentYaml) > 0 + // Has this thanos receive instance been configured to ingest metrics into a local TSDB? + enableIngestion := receiveMode == receive.IngestorOnly || receiveMode == receive.RouterIngestor if enableIngestion { - // uploadC signals when new blocks should be uploaded. - uploadC := make(chan struct{}, 1) - // uploadDone signals when uploading has finished. - uploadDone := make(chan struct{}, 1) + if enableIngestion { + err := ingestionMode(bkt, upload, tsdbOpts, conf, logger, confContentYaml, reg, comp, dbs, lset, hashFunc, g, hashringChangedChan, statusProber, + relabelConfig, receiveMode, tracer, dialOpts, limiter, httpProbe, grpcProbe, limitsConfig, grpcLogOpts, tagOpts, enableIngestion, writer) + if err != nil { + return err + } - level.Debug(logger).Log("msg", "setting up TSDB") - { - if err := startTSDBAndUpload(g, logger, reg, dbs, uploadC, hashringChangedChan, upload, uploadDone, statusProber, bkt, receive.HashringAlgorithm(conf.hashringsAlgorithm)); err != nil { + } else { + err := routerMode(conf, logger, reg, comp, dbs, writer, g, hashringChangedChan, statusProber, + relabelConfig, receiveMode, tracer, dialOpts, limiter, httpProbe, grpcProbe, grpcLogOpts, tagOpts, enableIngestion) + if err != nil { return err } } } + return nil +} + +func ingestionMode(bkt objstore.Bucket, + upload bool, + tsdbOpts *tsdb.Options, + conf *receiveConfig, + logger log.Logger, + confContentYaml []byte, + reg *prometheus.Registry, + comp component.SourceStoreAPI, + dbs *receive.MultiTSDB, + lset labels.Labels, + hashFunc metadata.HashFunc, + g *run.Group, + hashringChangedChan chan struct{}, + statusProber prober.Probe, + relabelConfig []*relabel.Config, + receiveMode receive.ReceiverMode, + tracer opentracing.Tracer, + dialOpts []grpc.DialOption, + limiter *receive.Limiter, + httpProbe *prober.HTTPProbe, + grpcProbe *prober.GRPCProbe, + limitsConfig *receive.RootLimitsConfig, + grpcLogOpts []grpc_logging.Option, + tagOpts []tags.Option, + enableIngestion bool, + writer *receive.Writer, + +) error { + if upload { + var err error + bkt, err = uploadToBucket(tsdbOpts, conf, logger, confContentYaml, reg, comp) + if err != nil { + return err + } + } else { + level.Info(logger).Log("msg", "no supported bucket was configured, uploads will be disabled") + } + + dbs = receive.NewMultiTSDB( + conf.dataDir, + logger, + reg, + tsdbOpts, + lset, + conf.tenantLabelName, + bkt, + conf.allowOutOfOrderUpload, + hashFunc, + ) + + // uploadC signals when new blocks should be uploaded. + uploadC := make(chan struct{}, 1) + // uploadDone signals when uploading has finished. + uploadDone := make(chan struct{}, 1) + + level.Debug(logger).Log("msg", "setting up TSDB") + { + if err := startTSDBAndUpload(g, logger, reg, dbs, uploadC, hashringChangedChan, upload, uploadDone, statusProber, bkt, receive.HashringAlgorithm(conf.hashringsAlgorithm)); err != nil { + return err + } + } + + err, webHandler := getWebhandler(conf, logger, reg, dbs, writer, relabelConfig, receiveMode, tracer, dialOpts, limiter) + if err != nil { + return err + } + level.Debug(logger).Log("msg", "setting up hashring") { if err := setupHashring(g, logger, reg, conf, hashringChangedChan, webHandler, statusProber, enableIngestion); err != nil { @@ -282,98 +301,21 @@ func runReceive( level.Debug(logger).Log("msg", "setting up HTTP server") { - srv := httpserver.New(logger, reg, comp, httpProbe, - httpserver.WithListen(*conf.httpBindAddr), - httpserver.WithGracePeriod(time.Duration(*conf.httpGracePeriod)), - httpserver.WithTLSConfig(*conf.httpTLSConfig), - ) - g.Add(func() error { - statusProber.Healthy() - return srv.ListenAndServe() - }, func(err error) { - statusProber.NotReady(err) - defer statusProber.NotHealthy(err) - - srv.Shutdown(err) - }) + if err := setupHttpSever(logger, reg, comp, httpProbe, conf, g, statusProber); err != nil { + return err + } } level.Debug(logger).Log("msg", "setting up gRPC server") { - tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), conf.grpcConfig.tlsSrvCert, conf.grpcConfig.tlsSrvKey, conf.grpcConfig.tlsSrvClientCA) - if err != nil { - return errors.Wrap(err, "setup gRPC server") - } - - proxy := store.NewProxyStore( - logger, - reg, - dbs.TSDBLocalClients, - comp, - labels.Labels{}, - 0, - store.LazyRetrieval, - ) - mts := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, conf.storeRateLimits) - rw := store.ReadWriteTSDBStore{ - StoreServer: mts, - WriteableStoreServer: webHandler, + if err := setupgRPC(logger, conf, webHandler, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe, statusProber, g, dbs, httpProbe); err != nil { + return nil } - - infoSrv := info.NewInfoServer( - component.Receive.String(), - info.WithLabelSetFunc(func() []labelpb.ZLabelSet { return proxy.LabelSet() }), - info.WithStoreInfoFunc(func() *infopb.StoreInfo { - if httpProbe.IsReady() { - minTime, maxTime := proxy.TimeRange() - return &infopb.StoreInfo{ - MinTime: minTime, - MaxTime: maxTime, - SupportsSharding: true, - SupportsWithoutReplicaLabels: true, - } - } - return nil - }), - info.WithExemplarsInfoFunc(), - ) - - srv := grpcserver.New(logger, receive.NewUnRegisterer(reg), tracer, grpcLogOpts, tagOpts, comp, grpcProbe, - grpcserver.WithServer(store.RegisterStoreServer(rw, logger)), - grpcserver.WithServer(store.RegisterWritableStoreServer(rw)), - grpcserver.WithServer(exemplars.RegisterExemplarsServer(exemplars.NewMultiTSDB(dbs.TSDBExemplars))), - grpcserver.WithServer(info.RegisterInfoServer(infoSrv)), - grpcserver.WithListen(conf.grpcConfig.bindAddress), - grpcserver.WithGracePeriod(conf.grpcConfig.gracePeriod), - grpcserver.WithMaxConnAge(conf.grpcConfig.maxConnectionAge), - grpcserver.WithTLSConfig(tlsCfg), - ) - - g.Add( - func() error { - level.Info(logger).Log("msg", "listening for StoreAPI and WritableStoreAPI gRPC", "address", conf.grpcConfig.bindAddress) - statusProber.Healthy() - return srv.ListenAndServe() - }, - func(err error) { - statusProber.NotReady(err) - defer statusProber.NotHealthy(err) - - srv.Shutdown(err) - }, - ) } level.Debug(logger).Log("msg", "setting up receive HTTP handler") { - g.Add( - func() error { - return errors.Wrap(webHandler.Run(), "error starting web server") - }, - func(err error) { - webHandler.Close() - }, - ) + setupHttpHandler(g, webHandler) } if limitsConfig.AreHeadSeriesLimitsConfigured() { @@ -409,18 +351,71 @@ func runReceive( } { - if limiter.CanReload() { - ctx, cancel := context.WithCancel(context.Background()) - g.Add(func() error { - level.Debug(logger).Log("msg", "limits config initialized with file watcher.") - if err := limiter.StartConfigReloader(ctx); err != nil { - return err - } - <-ctx.Done() - return nil - }, func(err error) { - cancel() - }) + if err := configReloader(limiter, logger, g); err != nil { + return err + } + } + level.Info(logger).Log("msg", "starting receiver") + return nil +} + +func routerMode( + conf *receiveConfig, + logger log.Logger, + reg *prometheus.Registry, + comp component.SourceStoreAPI, + dbs *receive.MultiTSDB, + writer *receive.Writer, + g *run.Group, + hashringChangedChan chan struct{}, + statusProber prober.Probe, + relabelConfig []*relabel.Config, + receiveMode receive.ReceiverMode, + tracer opentracing.Tracer, + dialOpts []grpc.DialOption, + limiter *receive.Limiter, + httpProbe *prober.HTTPProbe, + grpcProbe *prober.GRPCProbe, + grpcLogOpts []grpc_logging.Option, + tagOpts []tags.Option, + enableIngestion bool, + +) error { + + err, webHandler := getWebhandler(conf, logger, reg, dbs, writer, relabelConfig, receiveMode, tracer, dialOpts, limiter) + if err != nil { + return err + } + + level.Debug(logger).Log("msg", "setting up hashring") + { + if err := setupHashring(g, logger, reg, conf, hashringChangedChan, webHandler, statusProber, enableIngestion); err != nil { + return err + } + } + + level.Debug(logger).Log("msg", "setting up HTTP server") + { + if err := setupHttpSever(logger, reg, comp, httpProbe, conf, g, statusProber); err != nil { + return err + } + } + + level.Debug(logger).Log("msg", "setting up gRPC server") + { + if err := setupgRPC(logger, conf, webHandler, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe, statusProber, g, dbs, httpProbe); err != nil { + return nil + } + } + + level.Debug(logger).Log("msg", "setting up receive HTTP handler") + { + setupHttpHandler(g, webHandler) + } + + { + if err := configReloader(limiter, logger, g); err != nil { + return err } } @@ -428,6 +423,222 @@ func runReceive( return nil } +func setupgRPCandInfoSrv(rw store.ReadWriteTSDBStore, + infoSrvOpts []info.ServerOptionFunc, + grpcOpts []grpcserver.Option, + logger log.Logger, + reg *prometheus.Registry, + comp component.SourceStoreAPI, + dbs *receive.MultiTSDB, + httpProbe *prober.HTTPProbe, +) (store.ReadWriteTSDBStore, []info.ServerOptionFunc, []grpcserver.Option) { + mts := store.NewProxyStore( + logger, + reg, + dbs.TSDBLocalClients, + comp, + labels.Labels{}, + 0, + store.LazyRetrieval, + ) + rw.StoreServer = mts + + infoSrvOpts = append(infoSrvOpts, info.WithLabelSetFunc(func() []labelpb.ZLabelSet { return mts.LabelSet() }), + info.WithStoreInfoFunc(func() *infopb.StoreInfo { + if httpProbe.IsReady() { + minTime, maxTime := mts.TimeRange() + return &infopb.StoreInfo{ + MinTime: minTime, + MaxTime: maxTime, + SupportsSharding: true, + SupportsWithoutReplicaLabels: true, + } + } + return nil + }), + info.WithExemplarsInfoFunc(), + ) + grpcOpts = append(grpcOpts, + grpcserver.WithServer(store.RegisterStoreServer(rw, logger)), + grpcserver.WithServer(exemplars.RegisterExemplarsServer(exemplars.NewMultiTSDB(dbs.TSDBExemplars))), + ) + + return rw, infoSrvOpts, grpcOpts +} + +func uploadToBucket(tsdbOpts *tsdb.Options, + conf *receiveConfig, + logger log.Logger, + confContentYaml []byte, + reg *prometheus.Registry, + comp component.SourceStoreAPI, +) (objstore.Bucket, error) { + if tsdbOpts.MinBlockDuration != tsdbOpts.MaxBlockDuration { + if !conf.ignoreBlockSize { + return nil, errors.Errorf("found that TSDB Max time is %d and Min time is %d. "+ + "Compaction needs to be disabled (tsdb.min-block-duration = tsdb.max-block-duration)", tsdbOpts.MaxBlockDuration, tsdbOpts.MinBlockDuration) + } + level.Warn(logger).Log("msg", "flag to ignore min/max block duration flags differing is being used. If the upload of a 2h block fails and a tsdb compaction happens that block may be missing from your Thanos bucket storage.") + } + // The background shipper continuously scans the data directory and uploads + // new blocks to object storage service. + bkt, err := client.NewBucket(logger, confContentYaml, reg, comp.String()) + if err != nil { + return nil, err + } + return bkt, nil +} + +func configReloader(limiter *receive.Limiter, logger log.Logger, g *run.Group) error { + if limiter.CanReload() { + ctx, cancel := context.WithCancel(context.Background()) + g.Add(func() error { + level.Debug(logger).Log("msg", "limits config initialized with file watcher.") + if err := limiter.StartConfigReloader(ctx); err != nil { + return err + } + <-ctx.Done() + return nil + }, func(err error) { + cancel() + }) + } + return nil +} + +func setupHttpHandler(g *run.Group, webHandler *receive.Handler) { + g.Add( + func() error { + return errors.Wrap(webHandler.Run(), "error starting web server") + }, + func(err error) { + webHandler.Close() + }, + ) +} + +func setupgRPC(logger log.Logger, + conf *receiveConfig, + webHandler *receive.Handler, + reg *prometheus.Registry, + tracer opentracing.Tracer, + grpcLogOpts []grpc_logging.Option, + tagOpts []tags.Option, + comp component.SourceStoreAPI, + grpcProbe *prober.GRPCProbe, + statusProber prober.Probe, + g *run.Group, + dbs *receive.MultiTSDB, + httpProbe *prober.HTTPProbe, +) error { + tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), conf.grpcConfig.tlsSrvCert, conf.grpcConfig.tlsSrvKey, conf.grpcConfig.tlsSrvClientCA) + if err != nil { + return errors.Wrap(err, "setup gRPC server") + } + + grpcOpts := []grpcserver.Option{ + grpcserver.WithListen(conf.grpcConfig.bindAddress), + grpcserver.WithGracePeriod(time.Duration(conf.grpcConfig.gracePeriod)), + grpcserver.WithTLSConfig(tlsCfg), + grpcserver.WithMaxConnAge(conf.grpcConfig.maxConnectionAge), + } + + infoSrvOpts := []info.ServerOptionFunc{} + + rw := store.ReadWriteTSDBStore{ + WriteableStoreServer: webHandler, + } + + // If ingestion is enabled and dbs is nil. + if dbs != nil { + rw, infoSrvOpts, grpcOpts = setupgRPCandInfoSrv(rw, infoSrvOpts, grpcOpts, logger, reg, comp, dbs, httpProbe) + } + + grpcOpts = append(grpcOpts, + grpcserver.WithServer(store.RegisterWritableStoreServer(rw)), + grpcserver.WithServer(info.RegisterInfoServer(info.NewInfoServer( + component.Receive.String(), + infoSrvOpts..., + ))), + ) + + srv := grpcserver.New(logger, receive.NewUnRegisterer(reg), tracer, grpcLogOpts, tagOpts, comp, grpcProbe, grpcOpts...) + + g.Add( + func() error { + level.Info(logger).Log("msg", "listening for StoreAPI and WritableStoreAPI gRPC", "address", *&conf.grpcConfig.bindAddress) + statusProber.Healthy() + return srv.ListenAndServe() + }, + func(err error) { + statusProber.NotReady(err) + defer statusProber.NotHealthy(err) + + srv.Shutdown(err) + }, + ) + + return nil +} + +func setupHttpSever(logger log.Logger, reg *prometheus.Registry, comp component.SourceStoreAPI, httpProbe *prober.HTTPProbe, conf *receiveConfig, g *run.Group, statusProber prober.Probe) error { + + srv := httpserver.New(logger, reg, comp, httpProbe, + httpserver.WithListen(*conf.httpBindAddr), + httpserver.WithGracePeriod(time.Duration(*conf.httpGracePeriod)), + httpserver.WithTLSConfig(*conf.httpTLSConfig), + ) + g.Add(func() error { + statusProber.Healthy() + return srv.ListenAndServe() + }, func(err error) { + statusProber.NotReady(err) + defer statusProber.NotHealthy(err) + + srv.Shutdown(err) + }) + + return nil +} + +func getWebhandler( + conf *receiveConfig, + logger log.Logger, + reg *prometheus.Registry, + dbs *receive.MultiTSDB, + writer *receive.Writer, + relabelConfig []*relabel.Config, + receiveMode receive.ReceiverMode, + tracer opentracing.Tracer, + dialOpts []grpc.DialOption, + limiter *receive.Limiter, +) (error, *receive.Handler) { + rwTLSConfig, err := tls.NewServerConfig(log.With(logger, "protocol", "HTTP"), conf.rwServerCert, conf.rwServerKey, conf.rwServerClientCA) + if err != nil { + return err, nil + } + webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{ + Writer: writer, + ListenAddress: conf.rwAddress, + Registry: reg, + Endpoint: conf.endpoint, + TenantHeader: conf.tenantHeader, + TenantField: conf.tenantField, + DefaultTenantID: conf.defaultTenantID, + ReplicaHeader: conf.replicaHeader, + ReplicationFactor: conf.replicationFactor, + RelabelConfigs: relabelConfig, + ReceiverMode: receiveMode, + Tracer: tracer, + TLSConfig: rwTLSConfig, + DialOpts: dialOpts, + ForwardTimeout: time.Duration(*conf.forwardTimeout), + TSDBStats: dbs, + Limiter: limiter, + }) + return nil, webHandler +} + // setupHashring sets up the hashring configuration provided. // If no hashring is provided, we setup a single node hashring with local endpoint. func setupHashring(g *run.Group,