Skip to content

Commit

Permalink
Add S3Uploader to Flow Aggregator
Browse files Browse the repository at this point in the history
This PR adds S3Uploader as a new exporter of Flow Aggregator. It
periodically exports expired flow records from Flow Aggregator
to AWS S3 storage bucket.

Signed-off-by: heanlan <hanlan@vmware.com>
  • Loading branch information
heanlan committed Aug 22, 2022
1 parent 37bc9ac commit 5c03a61
Show file tree
Hide file tree
Showing 18 changed files with 1,844 additions and 619 deletions.
11 changes: 10 additions & 1 deletion build/charts/flow-aggregator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Kubernetes: `>= 1.16.0-0`
| clickHouse.commitInterval | string | `"8s"` | CommitInterval is the periodical interval between batch commit of flow records to DB. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". |
| clickHouse.compress | bool | `true` | Compress enables lz4 compression when committing flow records. |
| clickHouse.connectionSecret | object | `{"password":"clickhouse_operator_password","username":"clickhouse_operator"}` | Credentials to connect to ClickHouse. They will be stored in a Secret. |
| clickHouse.databaseURL | string | `"tcp://clickhouse-clickhouse.flow-visibility.svc:9000"` | |
| clickHouse.databaseURL | string | `"tcp://clickhouse-clickhouse.flow-visibility.svc:9000"` | DatabaseURL is the url to the database. TCP protocol is required. |
| clickHouse.debug | bool | `false` | Debug enables debug logs from ClickHouse sql driver. |
| clickHouse.enable | bool | `false` | Determine whether to enable exporting flow records to ClickHouse. |
| flowAggregatorAddress | string | `"flow-aggregator.flow-aggregator.svc"` | Provide DNS name or IP address of flow aggregator for generating TLS certificate. It must match the flowCollectorAddr parameter in the antrea-agent config. |
Expand All @@ -38,6 +38,15 @@ Kubernetes: `>= 1.16.0-0`
| inactiveFlowRecordTimeout | string | `"90s"` | Provide the inactive flow record timeout as a duration string. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". |
| logVerbosity | int | `0` | |
| recordContents.podLabels | bool | `false` | Determine whether source and destination Pod labels will be included in the flow records. |
| s3Uploader.awsCredentials | object | `{"aws_access_key_id":"changeme","aws_secret_access_key":"changeme","aws_session_token":""}` | Credentials to authenticate to AWS. They will be stored in a Secret and injected into the Pod as environment variables. |
| s3Uploader.bucketName | string | `""` | BucketName is the name of the S3 bucket to which flow records will be uploaded. It is required. |
| s3Uploader.bucketPrefix | string | `""` | BucketPrefix is the prefix ("folder") under which flow records will be uploaded. |
| s3Uploader.compress | bool | `true` | Compress enables gzip compression when uploading files to S3. |
| s3Uploader.enable | bool | `false` | Determine whether to enable exporting flow records to AWS S3. |
| s3Uploader.maxRecordsPerFile | int | `11.9.0-dev0` | MaxRecordsPerFile is the maximum number of records per file uploaded. It is not recommended to change this value. |
| s3Uploader.recordFormat | string | `"CSV"` | RecordFormat defines the format of the flow records uploaded to S3. Only "CSV" is supported at the moment. |
| s3Uploader.region | string | `"us-west-2"` | Region is used as a "hint" to get the region in which the provided bucket is located. An error will occur if the bucket does not exist in the AWS partition the region hint belongs to. |
| s3Uploader.uploadInterval | string | `"60s"` | UploadInterval is the duration between each file upload to S3. If the number of pending records reaches maxRecordsPerFile, we will not wait for this full duration before uploading. |
| testing.coverage | bool | `false` | |

----------------------------------------------
Expand Down
5 changes: 5 additions & 0 deletions build/images/flow-aggregator/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,9 @@ LABEL description="The docker image for the flow aggregator"
COPY --from=flow-aggregator-build /antrea/bin/flow-aggregator /
COPY --from=flow-aggregator-build /antrea/bin/antctl /usr/local/bin/

# install ca-certificates
RUN apt-get update \
&& apt-get install -y --no-install-recommends ca-certificates \
&& update-ca-certificates

ENTRYPOINT ["/flow-aggregator"]
4 changes: 4 additions & 0 deletions build/images/flow-aggregator/Dockerfile.coverage
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,7 @@ USER root
COPY --from=flow-aggregator-build /antrea/bin/flow-aggregator* /usr/local/bin/
COPY --from=flow-aggregator-build /antrea/test/e2e/coverage/flow-aggregator-arg-file /
COPY --from=flow-aggregator-build /antrea/bin/antctl* /usr/local/bin/

RUN apt-get update \
&& apt-get install -y --no-install-recommends ca-certificates \
&& update-ca-certificates
21 changes: 20 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ require (
github.com/Microsoft/hcsshim v0.8.9
github.com/TomCodeLV/OVSDB-golang-lib v0.0.0-20200116135253-9bbdfadcd881
github.com/awalterschulze/gographviz v2.0.1+incompatible
github.com/aws/aws-sdk-go-v2 v1.16.10
github.com/aws/aws-sdk-go-v2/config v1.16.0
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.23
github.com/aws/aws-sdk-go-v2/service/s3 v1.27.4
github.com/blang/semver v3.5.1+incompatible
github.com/cheggaaa/pb/v3 v3.0.8
github.com/confluentinc/bincover v0.1.0
Expand Down Expand Up @@ -92,6 +96,20 @@ require (
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/VividCortex/ewma v1.1.1 // indirect
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.4 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.12.12 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.11 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.17 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.11 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.18 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.8 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.4 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.12 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.11 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.11 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.11.15 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.16.12 // indirect
github.com/aws/smithy-go v1.12.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/cenk/hub v1.0.1 // indirect
Expand All @@ -115,7 +133,7 @@ require (
github.com/go-openapi/swag v0.19.14 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/go-cmp v0.5.5 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
Expand All @@ -127,6 +145,7 @@ require (
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/josharian/native v0.0.0-20200817173448-b6b71def0850 // indirect
github.com/json-iterator/go v1.1.12 // indirect
Expand Down
44 changes: 43 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,43 @@ github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf/go.mod h1:l
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/awalterschulze/gographviz v2.0.1+incompatible h1:XIECBRq9VPEQqkQL5pw2OtjCAdrtIgFKoJU8eT98AS8=
github.com/awalterschulze/gographviz v2.0.1+incompatible/go.mod h1:GEV5wmg4YquNw7v1kkyoX9etIk8yVmXj+AkDHuuETHs=
github.com/aws/aws-sdk-go-v2 v1.16.10 h1:+yDD0tcuHRQZgqONkpDwzepqmElQaSlFPymHRHR9mrc=
github.com/aws/aws-sdk-go-v2 v1.16.10/go.mod h1:WTACcleLz6VZTp7fak4EO5b9Q4foxbn+8PIz3PmyKlo=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.4 h1:zfT11pa7ifu/VlLDpmc5OY2W4nYmnKkFDGeMVnmqAI0=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.4/go.mod h1:ES0I1GBs+YYgcDS1ek47Erbn4TOL811JKqBXtgzqyZ8=
github.com/aws/aws-sdk-go-v2/config v1.15.17/go.mod h1:eatrtwIm5WdvASoYCy5oPkinfiwiYFg2jLG9tJoKzkE=
github.com/aws/aws-sdk-go-v2/config v1.16.0 h1:LxHC50cwOLxYo67NEpwpNUiOi6ngXfDpEETphSZ6bAw=
github.com/aws/aws-sdk-go-v2/config v1.16.0/go.mod h1:eatrtwIm5WdvASoYCy5oPkinfiwiYFg2jLG9tJoKzkE=
github.com/aws/aws-sdk-go-v2/credentials v1.12.12 h1:iShu6VaWZZZfUZvlGtRjl+g1lWk44g1QmiCTD4KS0jI=
github.com/aws/aws-sdk-go-v2/credentials v1.12.12/go.mod h1:vFHC2HifIWHebmoVsfpqliKuqbAY2LaVlvy03JzF4c4=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.11 h1:zZHPdM2x09/0F8D7XyVvQnP2/jaW7bEMmtcSCPYq/iI=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.11/go.mod h1:38Asv/UyQbDNpSXCurZRlDMjzIl6J+wUe8vY3TtUuzA=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.23 h1:lzS1GSHBzvBMlCA030/ecL5tF2ip8RLr/LBq5fBpv/4=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.23/go.mod h1:yGuKwoNVv2eGUHlp7ciCQLHmFNeESebnHucZfRL9EkA=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.17 h1:U8DZvyFFesBmK62dYC6BRXm4Cd/wPP3aPcecu3xv/F4=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.17/go.mod h1:6qtGip7sJEyvgsLjphRZWF9qPe3xJf1mL/MM01E35Wc=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.11 h1:GMp98usVW5tzQhxd26KWhoNQPlR2noIlfbzqjVGBhLU=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.11/go.mod h1:cYAfnB+9ZkmZWpQWmPDsuIGm4EA+6k2ZVtxKjw/XJBY=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.18 h1:/spg6h3tG4pefphbvhpgdMtFMegSajPPSEJd1t8lnpc=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.18/go.mod h1:hTHq8hL4bAxJyng364s9d4IUGXZOs7Y5LSqAhIiIQ2A=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.8 h1:9PY5a+kHQzC6d9eR+KLNSJP3DHDLYmPFA5/+eSDBo9o=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.8/go.mod h1:pcQfUOFVK4lMnSzgX3dCA81UsA9YCilRUSYgkjSU2i8=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.4 h1:akfcyqM9SvrBKWZOkBcXAGDrHfKaEP4Aca8H/bCiLW8=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.4/go.mod h1:oehQLbMQkppKLXvpx/1Eo0X47Fe+0971DXC9UjGnKcI=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.12 h1:eNQYkKjDSLDjIbBQ85rIkjpBGgnavrl/U3YKDdxAz14=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.12/go.mod h1:k2HaF2yfT082M+kKo3Xdf4rd5HGKvDmrPC5Kwzc2KUw=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.11 h1:GkYtp4gi4wdWUV+pPetjk5y2aDxbr0t8n5OjVBwZdII=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.11/go.mod h1:OEofCUKF7Hri4ShOCokF6k6hGq9PCB2sywt/9rLSXjY=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.11 h1:ZBLEKweAzBBtJa8H+MTFfVyvo+eHdM8xec5oTm9IlqI=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.11/go.mod h1:mNS1VHxYXPNqxIdCTxf87j9ROfTMa4fNpIkA+iAfz0g=
github.com/aws/aws-sdk-go-v2/service/s3 v1.27.4 h1:0RPAahwT63znFepvhfS+/WYtT+gEuAwaeNcCrzTQMH0=
github.com/aws/aws-sdk-go-v2/service/s3 v1.27.4/go.mod h1:wcpDmROpK5W7oWI6JcJIYGrVpHbF/Pu+FHxyBXyoa1E=
github.com/aws/aws-sdk-go-v2/service/sso v1.11.15 h1:HaIE5/TtKr66qZTJpvMifDxH4lRt2JZawbkLYOo1F+Y=
github.com/aws/aws-sdk-go-v2/service/sso v1.11.15/go.mod h1:dDVD4ElJRTQXx7dOQ59EkqGyNU9tnwy1RKln+oLIOTU=
github.com/aws/aws-sdk-go-v2/service/sts v1.16.12 h1:YU9UHPukkCCnETHEExOptF/BxPvGJKXO/NBx+RMQ/2A=
github.com/aws/aws-sdk-go-v2/service/sts v1.16.12/go.mod h1:b53qpmhHk7mTL2J/tfG6f38neZiyBQSiNXGCuNKq4+4=
github.com/aws/smithy-go v1.12.1 h1:yQRC55aXN/y1W10HgwHle01DRuV9Dpf31iGkotjt3Ag=
github.com/aws/smithy-go v1.12.1/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
Expand Down Expand Up @@ -440,8 +477,9 @@ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.1.0 h1:Hsa8mG0dQ46ij8Sl2AYJDUv1oA9/d6Vk+3LG99Oe02g=
github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
Expand Down Expand Up @@ -537,6 +575,10 @@ github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANyt
github.com/j-keck/arping v0.0.0-20160618110441-2cf9dc699c56/go.mod h1:ymszkNOg6tORTn+6F6j+Jc8TOr5osrynvN6ivFWZ2GA=
github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/jonboulle/clockwork v0.2.2 h1:UOGuzwb1PwsrDAObMuhUnj0p5ULPj8V/xJ7Kx9qUBdQ=
Expand Down
18 changes: 18 additions & 0 deletions pkg/config/flowaggregator/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ const (
DefaultClickHouseCommitInterval = "8s"
MinClickHouseCommitInterval = 1 * time.Second
DefaultClickHouseDatabaseUrl = "tcp://clickhouse-clickhouse.flow-visibility.svc:9000"
DefaultS3Region = "us-west-2"
DefaultS3RecordFormat = "CSV"
DefaultS3MaxRecordsPerFile = 1000000
DefaultS3UploadInterval = "60s"
MinS3CommitInterval = 1 * time.Second
)

func SetConfigDefaults(flowAggregatorConf *FlowAggregatorConfig) {
Expand Down Expand Up @@ -66,4 +71,17 @@ func SetConfigDefaults(flowAggregatorConf *FlowAggregatorConfig) {
if flowAggregatorConf.ClickHouse.CommitInterval == "" {
flowAggregatorConf.ClickHouse.CommitInterval = DefaultClickHouseCommitInterval
}
if flowAggregatorConf.S3Uploader.Compress == nil {
flowAggregatorConf.S3Uploader.Compress = new(bool)
*flowAggregatorConf.S3Uploader.Compress = true
}
if flowAggregatorConf.S3Uploader.MaxRecordsPerFile == 0 {
flowAggregatorConf.S3Uploader.MaxRecordsPerFile = DefaultS3MaxRecordsPerFile
}
if flowAggregatorConf.S3Uploader.RecordFormat == "" {
flowAggregatorConf.S3Uploader.RecordFormat = DefaultS3RecordFormat
}
if flowAggregatorConf.S3Uploader.UploadInterval == "" {
flowAggregatorConf.S3Uploader.UploadInterval = DefaultS3UploadInterval
}
}
Loading

0 comments on commit 5c03a61

Please sign in to comment.