From 25acf6c8fc8c69bcbdbf0e20c8d8748621dc9394 Mon Sep 17 00:00:00 2001 From: Yun-Tang Hsu <59460118+yuntanghsu@users.noreply.github.com> Date: Mon, 8 Jan 2024 10:03:56 -0800 Subject: [PATCH] Improve flow-visibility e2e tests (#5770) 1. Changed the order in which we append expired records before exporting them from our exporter. For inter-Node traffic with egress/ingress NP with action drop, we will receive records from both PacketIn and the conntrack table. For the egress case, we need to ensure that the PacketIn record arrives at the FA first, or we will be stuck waiting for correlation. 2. Add checks to verify that records can successfully be sent by the Flow Exporter before sending traffic in e2e tests. 3. Add labels to External subtest to filter useless logs from the IPFIX collector Pod. 4. Confirm the correct addition of a label to a specific Pod after updating the Pod. 5. Remove the octetDeltaCount check and, instead, filter out all records with octetDeltaCount=0 when retrieving records from the IPFIX collector Pod and ClickHouse. 6. Use new version of go-ipfix test collector. The new collector no longer prints all received records. Instead records are saved in-memory, and can be queried over HTTP. The list of records can also be reset with an HTTP call. In this way, we can drastically reduce the time to retrieve records in tests. Signed-off-by: Yun-Tang Hsu --- ci/kind/test-e2e-kind.sh | 2 +- go.mod | 24 +- go.sum | 47 +-- pkg/agent/flowexporter/exporter/exporter.go | 8 +- pkg/antctl/transform/common/transform.go | 4 + .../handlers/recordmetrics/handler.go | 30 +- .../handlers/recordmetrics/handler_test.go | 26 +- .../exporter/testing/mock_exporter.go | 2 +- pkg/flowaggregator/flowaggregator.go | 12 +- pkg/flowaggregator/flowaggregator_test.go | 20 +- pkg/flowaggregator/querier/querier.go | 12 +- test/e2e/flowaggregator_test.go | 320 ++++++++++-------- test/e2e/framework.go | 76 +++-- 13 files changed, 343 insertions(+), 240 deletions(-) diff --git a/ci/kind/test-e2e-kind.sh b/ci/kind/test-e2e-kind.sh index 43301936b6a..19f9913e753 100755 --- a/ci/kind/test-e2e-kind.sh +++ b/ci/kind/test-e2e-kind.sh @@ -201,7 +201,7 @@ COMMON_IMAGES_LIST=("registry.k8s.io/e2e-test-images/agnhost:2.29" \ "projects.registry.vmware.com/antrea/nginx:1.21.6-alpine" \ "projects.registry.vmware.com/antrea/toolbox:1.1-0") -FLOW_VISIBILITY_IMAGE_LIST=("projects.registry.vmware.com/antrea/ipfix-collector:v0.6.2" \ +FLOW_VISIBILITY_IMAGE_LIST=("projects.registry.vmware.com/antrea/ipfix-collector:v0.8.2" \ "projects.registry.vmware.com/antrea/clickhouse-operator:0.21.0" \ "projects.registry.vmware.com/antrea/metrics-exporter:0.21.0" \ "projects.registry.vmware.com/antrea/clickhouse-server:23.4") diff --git a/go.mod b/go.mod index cf54ac7e22f..bdc51997fa0 100644 --- a/go.mod +++ b/go.mod @@ -50,16 +50,16 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.4 github.com/ti-mo/conntrack v0.4.0 - github.com/vishvananda/netlink v1.1.1-0.20211101163509-b10eb8fe5cf6 - github.com/vmware/go-ipfix v0.7.0 - go.uber.org/mock v0.3.0 - golang.org/x/crypto v0.14.0 - golang.org/x/mod v0.13.0 - golang.org/x/net v0.17.0 - golang.org/x/sync v0.4.0 - golang.org/x/sys v0.13.0 - golang.org/x/time v0.3.0 - golang.org/x/tools v0.14.0 + github.com/vishvananda/netlink v1.2.1-beta.2 + github.com/vmware/go-ipfix v0.8.2 + go.uber.org/mock v0.4.0 + golang.org/x/crypto v0.17.0 + golang.org/x/mod v0.14.0 + golang.org/x/net v0.19.0 + golang.org/x/sync v0.6.0 + golang.org/x/sys v0.16.0 + golang.org/x/time v0.5.0 + golang.org/x/tools v0.16.1 golang.zx2c4.com/wireguard/wgctrl v0.0.0-20210506160403-92e472f520a5 google.golang.org/grpc v1.59.0 google.golang.org/protobuf v1.31.0 @@ -218,8 +218,8 @@ require ( go.uber.org/multierr v1.9.0 // indirect go.uber.org/zap v1.24.0 // indirect golang.org/x/oauth2 v0.12.0 // indirect - golang.org/x/term v0.13.0 // indirect - golang.org/x/text v0.13.0 // indirect + golang.org/x/term v0.15.0 // indirect + golang.org/x/text v0.14.0 // indirect golang.zx2c4.com/wireguard v0.0.0-20210427022245-097af6e1351b // indirect gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect google.golang.org/appengine v1.6.7 // indirect diff --git a/go.sum b/go.sum index 5e9e9aa00c2..16f53b01be2 100644 --- a/go.sum +++ b/go.sum @@ -1109,15 +1109,16 @@ github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv github.com/vishvananda/netlink v0.0.0-20181108222139-023a6dafdcdf/go.mod h1:+SR5DhBJrl6ZM7CoCKvpw5BKroDKQ+PJqOg65H/2ktk= github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE= github.com/vishvananda/netlink v1.1.1-0.20201029203352-d40f9887b852/go.mod h1:twkDnbuQxJYemMlGd4JFIcuhgX83tXhKS2B/PRMpOho= -github.com/vishvananda/netlink v1.1.1-0.20211101163509-b10eb8fe5cf6 h1:167a2omrzz+nN9Of6lN/0yOB9itzw+IOioRThNZ30jA= github.com/vishvananda/netlink v1.1.1-0.20211101163509-b10eb8fe5cf6/go.mod h1:twkDnbuQxJYemMlGd4JFIcuhgX83tXhKS2B/PRMpOho= +github.com/vishvananda/netlink v1.2.1-beta.2 h1:Llsql0lnQEbHj0I1OuKyp8otXp0r3q0mPkuhwHfStVs= +github.com/vishvananda/netlink v1.2.1-beta.2/go.mod h1:twkDnbuQxJYemMlGd4JFIcuhgX83tXhKS2B/PRMpOho= github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc/go.mod h1:ZjcWmFBXmLKZu9Nxj3WKYEafiSqer2rnvPr0en9UNpI= github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU= github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f h1:p4VB7kIXpOQvVn1ZaTIVp+3vuYAXFe3OJEvjbUYJLaA= github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= -github.com/vmware/go-ipfix v0.7.0 h1:7dOth2p5eL01GKzyXg2sibJcD9Fhb8KeLrn/ysctiwE= -github.com/vmware/go-ipfix v0.7.0/go.mod h1:Y3YKMFN/Nec6QwmXcDae+uy6xuDgbejwRAZv9RTzS9c= +github.com/vmware/go-ipfix v0.8.2 h1:7pnmXZpI0995psJgno4Bur5fr9PCxGQuKjCI/RYurzA= +github.com/vmware/go-ipfix v0.8.2/go.mod h1:NvEehcpptPOTBaLSkMA+88l2Oe8YNelVBdvj8PA/1d0= github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= @@ -1203,8 +1204,8 @@ go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= -go.uber.org/mock v0.3.0 h1:3mUxI1No2/60yUYax92Pt8eNOEecx2D3lcXZh2NEZJo= -go.uber.org/mock v0.3.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= +go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= +go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= @@ -1240,8 +1241,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU= -golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= -golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= +golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -1278,8 +1279,8 @@ golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= -golang.org/x/mod v0.13.0 h1:I/DsJXRlw/8l/0c24sM9yb0T4z9liZTduXvdAWYiysY= -golang.org/x/mod v0.13.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0= +golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20170114055629-f2499483f923/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1342,8 +1343,8 @@ golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= -golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= +golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1368,8 +1369,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= -golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= +golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1486,15 +1487,15 @@ golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= -golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= -golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= +golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4= +golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1507,16 +1508,16 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= -golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181011042414-1f849cf54d09/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -1584,8 +1585,8 @@ golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/tools v0.14.0 h1:jvNa2pY0M4r62jkRQ6RwEZZyPcymeL9XZMLBbV7U2nc= -golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg= +golang.org/x/tools v0.16.1 h1:TLyB3WofjdOEepBHAU20JdNC1Zbg87elYofWYAY5oZA= +golang.org/x/tools v0.16.1/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/pkg/agent/flowexporter/exporter/exporter.go b/pkg/agent/flowexporter/exporter/exporter.go index fb853f895c5..5d7c6322252 100644 --- a/pkg/agent/flowexporter/exporter/exporter.go +++ b/pkg/agent/flowexporter/exporter/exporter.go @@ -256,8 +256,14 @@ func (exp *FlowExporter) Run(stopCh <-chan struct{}) { func (exp *FlowExporter) sendFlowRecords() (time.Duration, error) { currTime := time.Now() var expireTime1, expireTime2 time.Duration - exp.expiredConns, expireTime1 = exp.conntrackConnStore.GetExpiredConns(exp.expiredConns, currTime, maxConnsToExport) + // We export records from denyConnStore first, then conntrackConnStore. We enforce the ordering to handle a + // special case: for an inter-node connection with egress drop network policy, both conntrackConnStore and + // denyConnStore from the same Node will send out records to Flow Aggregator. If the record from conntrackConnStore + // arrives FA first, FA will not be able to capture the deny network policy metadata, and it will keep waiting + // for a record from destination Node to finish flow correlation until timeout. Later on we probably should + // consider doing a record deduplication between conntrackConnStore and denyConnStore before exporting records. exp.expiredConns, expireTime2 = exp.denyConnStore.GetExpiredConns(exp.expiredConns, currTime, maxConnsToExport) + exp.expiredConns, expireTime1 = exp.conntrackConnStore.GetExpiredConns(exp.expiredConns, currTime, maxConnsToExport) // Select the shorter time out among two connection stores to do the next round of export. nextExpireTime := getMinTime(expireTime1, expireTime2) for i := range exp.expiredConns { diff --git a/pkg/antctl/transform/common/transform.go b/pkg/antctl/transform/common/transform.go index bf0a5df5acc..9f9a1dd7e4c 100644 --- a/pkg/antctl/transform/common/transform.go +++ b/pkg/antctl/transform/common/transform.go @@ -57,6 +57,10 @@ func Int64ToString(val int64) string { return strconv.Itoa(int(val)) } +func BoolToString(val bool) string { + return strconv.FormatBool(val) +} + func GenerateTableElementWithSummary(list []string, maxColumnLength int) string { element := "" sort.Strings(list) diff --git a/pkg/flowaggregator/apiserver/handlers/recordmetrics/handler.go b/pkg/flowaggregator/apiserver/handlers/recordmetrics/handler.go index 960de52d3d9..b41448f08a0 100644 --- a/pkg/flowaggregator/apiserver/handlers/recordmetrics/handler.go +++ b/pkg/flowaggregator/apiserver/handlers/recordmetrics/handler.go @@ -26,10 +26,14 @@ import ( // Response is the response struct of recordmetrics command. type Response struct { - NumRecordsExported int64 `json:"numRecordsExported,omitempty"` - NumRecordsReceived int64 `json:"numRecordsReceived,omitempty"` - NumFlows int64 `json:"numFlows,omitempty"` - NumConnToCollector int64 `json:"numConnToCollector,omitempty"` + NumRecordsExported int64 `json:"numRecordsExported,omitempty"` + NumRecordsReceived int64 `json:"numRecordsReceived,omitempty"` + NumFlows int64 `json:"numFlows,omitempty"` + NumConnToCollector int64 `json:"numConnToCollector,omitempty"` + WithClickHouseExporter bool `json:"withClickHouseExporter,omitempty"` + WithS3Exporter bool `json:"withS3Exporter,omitempty"` + WithLogExporter bool `json:"withLogExporter,omitempty"` + WithIPFIXExporter bool `json:"withIPFIXExporter,omitempty"` } // HandleFunc returns the function which can handle the /recordmetrics API request. @@ -37,10 +41,14 @@ func HandleFunc(faq querier.FlowAggregatorQuerier) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { metrics := faq.GetRecordMetrics() metricsResponse := Response{ - NumRecordsExported: metrics.NumRecordsExported, - NumRecordsReceived: metrics.NumRecordsReceived, - NumFlows: metrics.NumFlows, - NumConnToCollector: metrics.NumConnToCollector, + NumRecordsExported: metrics.NumRecordsExported, + NumRecordsReceived: metrics.NumRecordsReceived, + NumFlows: metrics.NumFlows, + NumConnToCollector: metrics.NumConnToCollector, + WithClickHouseExporter: metrics.WithClickHouseExporter, + WithS3Exporter: metrics.WithS3Exporter, + WithLogExporter: metrics.WithLogExporter, + WithIPFIXExporter: metrics.WithIPFIXExporter, } err := json.NewEncoder(w).Encode(metricsResponse) if err != nil { @@ -51,7 +59,7 @@ func HandleFunc(faq querier.FlowAggregatorQuerier) http.HandlerFunc { } func (r Response) GetTableHeader() []string { - return []string{"RECORDS-EXPORTED", "RECORDS-RECEIVED", "FLOWS", "EXPORTERS-CONNECTED"} + return []string{"RECORDS-EXPORTED", "RECORDS-RECEIVED", "FLOWS", "EXPORTERS-CONNECTED", "CLICKHOUSE-EXPORTER", "S3-EXPORTER", "LOG-EXPORTER", "IPFIX-EXPORTER"} } func (r Response) GetTableRow(maxColumnLength int) []string { @@ -60,6 +68,10 @@ func (r Response) GetTableRow(maxColumnLength int) []string { common.Int64ToString(r.NumRecordsReceived), common.Int64ToString(r.NumFlows), common.Int64ToString(r.NumConnToCollector), + common.BoolToString(r.WithClickHouseExporter), + common.BoolToString(r.WithS3Exporter), + common.BoolToString(r.WithLogExporter), + common.BoolToString(r.WithIPFIXExporter), } } diff --git a/pkg/flowaggregator/apiserver/handlers/recordmetrics/handler_test.go b/pkg/flowaggregator/apiserver/handlers/recordmetrics/handler_test.go index eb1af81495f..45a699e7fa1 100644 --- a/pkg/flowaggregator/apiserver/handlers/recordmetrics/handler_test.go +++ b/pkg/flowaggregator/apiserver/handlers/recordmetrics/handler_test.go @@ -31,10 +31,14 @@ func TestRecordMetricsQuery(t *testing.T) { ctrl := gomock.NewController(t) faq := queriertest.NewMockFlowAggregatorQuerier(ctrl) faq.EXPECT().GetRecordMetrics().Return(querier.Metrics{ - NumRecordsExported: 20, - NumRecordsReceived: 15, - NumFlows: 30, - NumConnToCollector: 1, + NumRecordsExported: 20, + NumRecordsReceived: 15, + NumFlows: 30, + NumConnToCollector: 1, + WithClickHouseExporter: true, + WithS3Exporter: true, + WithLogExporter: true, + WithIPFIXExporter: true, }) handler := HandleFunc(faq) @@ -48,12 +52,16 @@ func TestRecordMetricsQuery(t *testing.T) { err = json.Unmarshal(recorder.Body.Bytes(), &received) assert.Nil(t, err) assert.Equal(t, Response{ - NumRecordsExported: 20, - NumRecordsReceived: 15, - NumFlows: 30, - NumConnToCollector: 1, + NumRecordsExported: 20, + NumRecordsReceived: 15, + NumFlows: 30, + NumConnToCollector: 1, + WithClickHouseExporter: true, + WithS3Exporter: true, + WithLogExporter: true, + WithIPFIXExporter: true, }, received) - assert.Equal(t, received.GetTableRow(0), []string{"20", "15", "30", "1"}) + assert.Equal(t, received.GetTableRow(0), []string{"20", "15", "30", "1", "true", "true", "true", "true"}) } diff --git a/pkg/flowaggregator/exporter/testing/mock_exporter.go b/pkg/flowaggregator/exporter/testing/mock_exporter.go index b19c002b0f9..8307816b5a8 100644 --- a/pkg/flowaggregator/exporter/testing/mock_exporter.go +++ b/pkg/flowaggregator/exporter/testing/mock_exporter.go @@ -1,4 +1,4 @@ -// Copyright 2023 Antrea Authors +// Copyright 2024 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/flowaggregator/flowaggregator.go b/pkg/flowaggregator/flowaggregator.go index e674c1e6f63..e11ea72945b 100644 --- a/pkg/flowaggregator/flowaggregator.go +++ b/pkg/flowaggregator/flowaggregator.go @@ -553,10 +553,14 @@ func (fa *flowAggregator) GetFlowRecords(flowKey *ipfixintermediate.FlowKey) []m func (fa *flowAggregator) GetRecordMetrics() querier.Metrics { return querier.Metrics{ - NumRecordsExported: fa.numRecordsExported, - NumRecordsReceived: fa.collectingProcess.GetNumRecordsReceived(), - NumFlows: fa.aggregationProcess.GetNumFlows(), - NumConnToCollector: fa.collectingProcess.GetNumConnToCollector(), + NumRecordsExported: fa.numRecordsExported, + NumRecordsReceived: fa.collectingProcess.GetNumRecordsReceived(), + NumFlows: fa.aggregationProcess.GetNumFlows(), + NumConnToCollector: fa.collectingProcess.GetNumConnToCollector(), + WithClickHouseExporter: fa.clickHouseExporter != nil, + WithS3Exporter: fa.s3Exporter != nil, + WithLogExporter: fa.logExporter != nil, + WithIPFIXExporter: fa.ipfixExporter != nil, } } diff --git a/pkg/flowaggregator/flowaggregator_test.go b/pkg/flowaggregator/flowaggregator_test.go index 392ce829618..99e1951a50a 100644 --- a/pkg/flowaggregator/flowaggregator_test.go +++ b/pkg/flowaggregator/flowaggregator_test.go @@ -725,17 +725,29 @@ func TestFlowAggregator_GetRecordMetrics(t *testing.T) { ctrl := gomock.NewController(t) mockCollectingProcess := ipfixtesting.NewMockIPFIXCollectingProcess(ctrl) mockAggregationProcess := ipfixtesting.NewMockIPFIXAggregationProcess(ctrl) + mockIPFIXExporter := exportertesting.NewMockInterface(ctrl) + mockClickHouseExporter := exportertesting.NewMockInterface(ctrl) + mockS3Exporter := exportertesting.NewMockInterface(ctrl) + mockLogExporter := exportertesting.NewMockInterface(ctrl) want := querier.Metrics{ - NumRecordsExported: 1, - NumRecordsReceived: 1, - NumFlows: 1, - NumConnToCollector: 1, + NumRecordsExported: 1, + NumRecordsReceived: 1, + NumFlows: 1, + NumConnToCollector: 1, + WithClickHouseExporter: true, + WithS3Exporter: true, + WithLogExporter: true, + WithIPFIXExporter: true, } fa := &flowAggregator{ collectingProcess: mockCollectingProcess, aggregationProcess: mockAggregationProcess, numRecordsExported: 1, + clickHouseExporter: mockClickHouseExporter, + s3Exporter: mockS3Exporter, + logExporter: mockLogExporter, + ipfixExporter: mockIPFIXExporter, } mockCollectingProcess.EXPECT().GetNumRecordsReceived().Return(int64(1)) diff --git a/pkg/flowaggregator/querier/querier.go b/pkg/flowaggregator/querier/querier.go index de694375a1d..349f5ed9fbd 100644 --- a/pkg/flowaggregator/querier/querier.go +++ b/pkg/flowaggregator/querier/querier.go @@ -19,10 +19,14 @@ import ( ) type Metrics struct { - NumRecordsExported int64 - NumRecordsReceived int64 - NumFlows int64 - NumConnToCollector int64 + NumRecordsExported int64 + NumRecordsReceived int64 + NumFlows int64 + NumConnToCollector int64 + WithClickHouseExporter bool + WithS3Exporter bool + WithLogExporter bool + WithIPFIXExporter bool } type FlowAggregatorQuerier interface { diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index baf631c715d..bc895944b05 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -19,7 +19,6 @@ import ( "encoding/json" "fmt" "net" - "regexp" "strconv" "strings" "testing" @@ -30,6 +29,7 @@ import ( ipfixregistry "github.com/vmware/go-ipfix/pkg/registry" corev1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/utils/strings/slices" @@ -39,6 +39,7 @@ import ( "antrea.io/antrea/pkg/antctl" "antrea.io/antrea/pkg/antctl/runtime" secv1beta1 "antrea.io/antrea/pkg/apis/crd/v1beta1" + "antrea.io/antrea/pkg/flowaggregator/apiserver/handlers/recordmetrics" "antrea.io/antrea/test/e2e/utils" ) @@ -168,6 +169,10 @@ type testFlow struct { checkDstSvc bool } +type IPFIXCollectorResponse struct { + FlowRecords []string `json:"flowRecords"` +} + func TestFlowAggregatorSecureConnection(t *testing.T) { skipIfNotFlowVisibilityTest(t) skipIfHasWindowsNodes(t) @@ -209,6 +214,11 @@ func TestFlowAggregatorSecureConnection(t *testing.T) { if err != nil { t.Fatalf("Error when setting up test: %v", err) } + // Check recordmetrics of Flow Aggregator to make sure Antrea-agent Pods/ClickHouse/IPFIX collector and Flow Aggregator + // are correctly connected + if err := getAndCheckFlowAggregatorMetrics(t, data); err != nil { + t.Fatalf("Error when checking metrics of Flow Aggregator: %v", err) + } t.Run(o.name, func(t *testing.T) { defer func() { teardownTest(t, data) @@ -240,6 +250,9 @@ func TestFlowAggregator(t *testing.T) { if err != nil { t.Fatalf("Error when setting up test: %v", err) } + if err := getAndCheckFlowAggregatorMetrics(t, data); err != nil { + t.Fatalf("Error when checking metrics of Flow Aggregator: %v", err) + } defer func() { teardownTest(t, data) // Execute teardownFlowAggregator later than teardownTest to ensure that the log @@ -306,7 +319,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // records from previous subtests. To mitigate this, we add a different label to perftest Pods during each subtest // before initiating traffic. This label is then employed as a filter when collecting records from either the // ClickHouse or the IPFIX collector Pod. - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) checkIntraNodeFlows(t, data, podAIPs, podBIPs, isIPv6, label) }) @@ -316,7 +329,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("IntraNodeDenyConnIngressANP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "IntraNodeDenyConnIngressANP" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), true) defer func() { if anp1 != nil { @@ -353,7 +366,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("IntraNodeDenyConnEgressANP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "IntraNodeDenyConnEgressANP" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), false) defer func() { if anp1 != nil { @@ -390,7 +403,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("IntraNodeDenyConnNP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "IntraNodeDenyConnNP" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) np1, np2 := deployDenyNetworkPolicies(t, data, "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName()) defer func() { if np1 != nil { @@ -428,7 +441,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("IntraNodeDenyConnIngressANPThroughSvc", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "IntraNodeDenyConnIngressANPThroughSvc" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), true) defer func() { if anp1 != nil { @@ -470,7 +483,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("IntraNodeDenyConnEgressANPThroughSvc", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "IntraNodeDenyConnEgressANPThroughSvc" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-b", "perftest-d", controlPlaneNodeName(), controlPlaneNodeName(), false) defer func() { if anp1 != nil { @@ -511,7 +524,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("InterNodeFlows", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "InterNodeFlows" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", controlPlaneNodeName(), workerNodeName(1)) defer func() { if anp1 != nil { @@ -534,7 +547,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("InterNodeDenyConnIngressANP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "InterNodeDenyConnIngressANP" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), true) defer func() { if anp1 != nil { @@ -571,7 +584,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("InterNodeDenyConnEgressANP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "InterNodeDenyConnEgressANP" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), false) defer func() { if anp1 != nil { @@ -608,7 +621,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("InterNodeDenyConnNP", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "InterNodeDenyConnNP" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) np1, np2 := deployDenyNetworkPolicies(t, data, "perftest-c", "perftest-b", workerNodeName(1), controlPlaneNodeName()) defer func() { if np1 != nil { @@ -646,7 +659,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("InterNodeDenyConnIngressANPThroughSvc", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "InterNodeDenyConnIngressANPThroughSvc" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), true) defer func() { if anp1 != nil { @@ -693,7 +706,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("InterNodeDenyConnEgressANPThroughSvc", func(t *testing.T) { skipIfAntreaPolicyDisabled(t) label := "InterNodeDenyConnEgressANPThroughSvc" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) anp1, anp2 := deployDenyAntreaNetworkPolicies(t, data, "perftest-a", "perftest-c", "perftest-e", controlPlaneNodeName(), workerNodeName(1), false) defer func() { if anp1 != nil { @@ -744,6 +757,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // Deploy the client Pod on the control-plane node clientName, clientIPs, clientCleanupFunc := createAndWaitForPod(t, data, data.createBusyboxPodOnNode, "test-client-", nodeName(0), data.testNamespace, false) defer clientCleanupFunc() + label := "ToExternalEgressOnSourceNode" + addLabelToTestPods(t, data, label, []string{clientName}) // Create an Egress and the Egress IP is assigned to the Node running the client Pods var egressNodeIP string @@ -759,14 +774,13 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } t.Logf("Egress %s is realized with Egress IP %s", egress.Name, egressNodeIP) defer data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) - if !isIPv6 { if clientIPs.ipv4 != nil && serverIPs.ipv4 != nil { - checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.ipv4.String(), serverIPs.ipv4.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP) + checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.ipv4.String(), serverIPs.ipv4.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP, label) } } else { if clientIPs.ipv6 != nil && serverIPs.ipv6 != nil { - checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.ipv6.String(), serverIPs.ipv6.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP) + checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.ipv6.String(), serverIPs.ipv6.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP, label) } } }) @@ -784,6 +798,8 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // Deploy the client Pod on the control-plane node clientName, clientIPs, clientCleanupFunc := createAndWaitForPod(t, data, data.createBusyboxPodOnNode, "test-client-", nodeName(0), data.testNamespace, false) defer clientCleanupFunc() + label := "ToExternalEgressOnOtherNode" + addLabelToTestPods(t, data, label, []string{clientName}) // Create an Egress and the Egress IP is assigned to the Node not running the client Pods var egressNodeIP string @@ -799,14 +815,13 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { } t.Logf("Egress %s is realized with Egress IP %s", egress.Name, egressNodeIP) defer data.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), egress.Name, metav1.DeleteOptions{}) - if !isIPv6 { if clientIPs.ipv4 != nil && serverIPs.ipv4 != nil { - checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.ipv4.String(), serverIPs.ipv4.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP) + checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.ipv4.String(), serverIPs.ipv4.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP, label) } } else { if clientIPs.ipv6 != nil && serverIPs.ipv6 != nil { - checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.ipv6.String(), serverIPs.ipv6.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP) + checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.ipv6.String(), serverIPs.ipv6.String(), serverPodPort, isIPv6, egress.Name, egressNodeIP, label) } } }) @@ -817,14 +832,15 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { // Deploy the client Pod on the control-plane node clientName, clientIPs, clientCleanupFunc := createAndWaitForPod(t, data, data.createBusyboxPodOnNode, "test-client-", nodeName(0), data.testNamespace, false) defer clientCleanupFunc() - + label := "ToExternalFlows" + addLabelToTestPods(t, data, label, []string{clientName}) if !isIPv6 { if clientIPs.ipv4 != nil && serverIPs.ipv4 != nil { - checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.ipv4.String(), serverIPs.ipv4.String(), serverPodPort, isIPv6, "", "") + checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.ipv4.String(), serverIPs.ipv4.String(), serverPodPort, isIPv6, "", "", label) } } else { if clientIPs.ipv6 != nil && serverIPs.ipv6 != nil { - checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.ipv6.String(), serverIPs.ipv6.String(), serverPodPort, isIPv6, "", "") + checkRecordsForToExternalFlows(t, data, nodeName(0), clientName, clientIPs.ipv6.String(), serverIPs.ipv6.String(), serverPodPort, isIPv6, "", "", label) } } }) @@ -833,7 +849,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("LocalServiceAccess", func(t *testing.T) { skipIfProxyDisabled(t, data) label := "LocalServiceAccess" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) // In dual stack cluster, Service IP can be assigned as different IP family from specified. // In that case, source IP and destination IP will align with IP family of Service IP. // For IPv4-only and IPv6-only cluster, IP family of Service IP will be same as Pod IPs. @@ -849,7 +865,7 @@ func testHelper(t *testing.T, data *TestData, isIPv6 bool) { t.Run("RemoteServiceAccess", func(t *testing.T) { skipIfProxyDisabled(t, data) label := "RemoteServiceAccess" - addLabelToPerftestPods(t, data, label) + addLabelToTestPods(t, data, label, podNames) // In dual stack cluster, Service IP can be assigned as different IP family from specified. // In that case, source IP and destination IP will align with IP family of Service IP. // For IPv4-only and IPv6-only cluster, IP family of Service IP will be same as Pod IPs. @@ -987,79 +1003,71 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri func checkRecordsForFlowsCollector(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isIPv6, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy bool, bandwidthInMbps float64, labelFilter string) { collectorOutput, recordSlices := getCollectorOutput(t, srcIP, dstIP, srcPort, checkService, true, isIPv6, data, labelFilter) + // Checking only data records as data records cannot be decoded without template + // record. + assert.GreaterOrEqualf(t, len(recordSlices), expectedNumDataRecords, "IPFIX collector should receive expected number of flow records. Considered records: %s \n Collector output: %s", recordSlices, collectorOutput) // Iterate over recordSlices and build some results to test with expected results - dataRecordsCount := 0 - src, dst := matchSrcAndDstAddress(srcIP, dstIP, checkService, isIPv6) for _, record := range recordSlices { - // Check the source port along with source and destination IPs as there - // are flow records for control flows during the iperf with same IPs - // and destination port. - if strings.Contains(record, src) && strings.Contains(record, dst) && strings.Contains(record, srcPort) { - dataRecordsCount = dataRecordsCount + 1 - // Check if record has both Pod name of source and destination Pod. + // Check if record has both Pod name of source and destination Pod. + if isIntraNode { + checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-b", controlPlaneNodeName(), data.testNamespace) + checkFlowType(t, record, ipfixregistry.FlowTypeIntraNode) + } else { + checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-c", workerNodeName(1), data.testNamespace) + checkFlowType(t, record, ipfixregistry.FlowTypeInterNode) + } + assert := assert.New(t) + if checkService { if isIntraNode { - checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-b", controlPlaneNodeName(), data.testNamespace) - checkFlowType(t, record, ipfixregistry.FlowTypeIntraNode) + assert.Contains(record, data.testNamespace+"/perftest-b", "Record with ServiceIP does not have Service name") } else { - checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-c", workerNodeName(1), data.testNamespace) - checkFlowType(t, record, ipfixregistry.FlowTypeInterNode) - } - assert := assert.New(t) - if checkService { - if isIntraNode { - assert.Contains(record, data.testNamespace+"/perftest-b", "Record with ServiceIP does not have Service name") - } else { - assert.Contains(record, data.testNamespace+"/perftest-c", "Record with ServiceIP does not have Service name") - } - } - if checkK8sNetworkPolicy { - // Check if records have both ingress and egress network policies. - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyName: %s", ingressAllowNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the ingress rule") - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the ingress rule") - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyType: %d", ipfixregistry.PolicyTypeK8sNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the ingress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyName: %s", egressAllowNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the egress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the egress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyType: %d", ipfixregistry.PolicyTypeK8sNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the egress rule") - } - if checkAntreaNetworkPolicy { - // Check if records have both ingress and egress network policies. - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyName: %s", ingressAntreaNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the ingress rule") - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the ingress rule") - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyType: %d", ipfixregistry.PolicyTypeAntreaNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the ingress rule") - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyRuleName: %s", testIngressRuleName), "Record does not have the correct NetworkPolicy RuleName with the ingress rule") - assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyRuleAction: %d", ipfixregistry.NetworkPolicyRuleActionAllow), "Record does not have the correct NetworkPolicy RuleAction with the ingress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyName: %s", egressAntreaNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the egress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the egress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyType: %d", ipfixregistry.PolicyTypeAntreaNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the egress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyRuleName: %s", testEgressRuleName), "Record does not have the correct NetworkPolicy RuleName with the egress rule") - assert.Contains(record, fmt.Sprintf("egressNetworkPolicyRuleAction: %d", ipfixregistry.NetworkPolicyRuleActionAllow), "Record does not have the correct NetworkPolicy RuleAction with the egress rule") + assert.Contains(record, data.testNamespace+"/perftest-c", "Record with ServiceIP does not have Service name") } + } + if checkK8sNetworkPolicy { + // Check if records have both ingress and egress network policies. + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyName: %s", ingressAllowNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the ingress rule") + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the ingress rule") + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyType: %d", ipfixregistry.PolicyTypeK8sNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the ingress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyName: %s", egressAllowNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the egress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the egress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyType: %d", ipfixregistry.PolicyTypeK8sNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the egress rule") + } + if checkAntreaNetworkPolicy { + // Check if records have both ingress and egress network policies. + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyName: %s", ingressAntreaNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the ingress rule") + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the ingress rule") + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyType: %d", ipfixregistry.PolicyTypeAntreaNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the ingress rule") + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyRuleName: %s", testIngressRuleName), "Record does not have the correct NetworkPolicy RuleName with the ingress rule") + assert.Contains(record, fmt.Sprintf("ingressNetworkPolicyRuleAction: %d", ipfixregistry.NetworkPolicyRuleActionAllow), "Record does not have the correct NetworkPolicy RuleAction with the ingress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyName: %s", egressAntreaNetworkPolicyName), "Record does not have the correct NetworkPolicy name with the egress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyNamespace: %s", data.testNamespace), "Record does not have the correct NetworkPolicy Namespace with the egress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyType: %d", ipfixregistry.PolicyTypeAntreaNetworkPolicy), "Record does not have the correct NetworkPolicy Type with the egress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyRuleName: %s", testEgressRuleName), "Record does not have the correct NetworkPolicy RuleName with the egress rule") + assert.Contains(record, fmt.Sprintf("egressNetworkPolicyRuleAction: %d", ipfixregistry.NetworkPolicyRuleActionAllow), "Record does not have the correct NetworkPolicy RuleAction with the egress rule") + } - // Skip the bandwidth check for the iperf control flow records which have 0 throughput. - if !strings.Contains(record, "throughput: 0") { - flowStartTime := int64(getUint64FieldFromRecord(t, record, "flowStartSeconds")) - exportTime := int64(getUint64FieldFromRecord(t, record, "flowEndSeconds")) - flowEndReason := int64(getUint64FieldFromRecord(t, record, "flowEndReason")) - var recBandwidth float64 - // flowEndReason == 3 means the end of flow detected - if exportTime >= flowStartTime+iperfTimeSec || flowEndReason == 3 { - // Check average bandwidth on the last record. - octetTotalCount := getUint64FieldFromRecord(t, record, "octetTotalCount") - recBandwidth = float64(octetTotalCount) * 8 / float64(iperfTimeSec) / 1000000 - } else { - // Check bandwidth with the field "throughput" except for the last record, - // as their throughput may be significantly lower than the average Iperf throughput. - throughput := getUint64FieldFromRecord(t, record, "throughput") - recBandwidth = float64(throughput) / 1000000 - } - t.Logf("Throughput check on record with flowEndSeconds-flowStartSeconds: %v, Iperf throughput: %.2f Mbits/s, IPFIX record throughput: %.2f Mbits/s", exportTime-flowStartTime, bandwidthInMbps, recBandwidth) - assert.InDeltaf(recBandwidth, bandwidthInMbps, bandwidthInMbps*0.15, "Difference between Iperf bandwidth and IPFIX record bandwidth should be lower than 15%%, record: %s", record) + // Skip the bandwidth check for the iperf control flow records which have 0 throughput. + if !strings.Contains(record, "throughput: 0") { + flowStartTime := int64(getUint64FieldFromRecord(t, record, "flowStartSeconds")) + exportTime := int64(getUint64FieldFromRecord(t, record, "flowEndSeconds")) + flowEndReason := int64(getUint64FieldFromRecord(t, record, "flowEndReason")) + var recBandwidth float64 + // flowEndReason == 3 means the end of flow detected + if flowEndReason == 3 { + // Check average bandwidth on the last record. + octetTotalCount := getUint64FieldFromRecord(t, record, "octetTotalCount") + recBandwidth = float64(octetTotalCount) * 8 / float64(iperfTimeSec) / 1000000 + } else { + // Check bandwidth with the field "throughput" except for the last record, + // as their throughput may be significantly lower than the average Iperf throughput. + throughput := getUint64FieldFromRecord(t, record, "throughput") + recBandwidth = float64(throughput) / 1000000 } + t.Logf("Throughput check on record with flowEndSeconds-flowStartSeconds: %v, Iperf throughput: %.2f Mbits/s, IPFIX record throughput: %.2f Mbits/s", exportTime-flowStartTime, bandwidthInMbps, recBandwidth) + assert.InDeltaf(recBandwidth, bandwidthInMbps, bandwidthInMbps*0.15, "Difference between Iperf bandwidth and IPFIX record bandwidth should be lower than 15%%, record: %s", record) } } - // Checking only data records as data records cannot be decoded without template - // record. - assert.GreaterOrEqualf(t, dataRecordsCount, expectedNumDataRecords, "IPFIX collector should receive expected number of flow records. Considered records: %s \n Collector output: %s", recordSlices, collectorOutput) } func checkRecordsForFlowsClickHouse(t *testing.T, data *TestData, srcIP, dstIP, srcPort string, isIntraNode, checkService, checkK8sNetworkPolicy, checkAntreaNetworkPolicy bool, bandwidthInMbps float64, labelFilter string) { @@ -1114,7 +1122,7 @@ func checkRecordsForFlowsClickHouse(t *testing.T, data *TestData, srcIP, dstIP, exportTime := record.FlowEndSeconds.Unix() var recBandwidth float64 // flowEndReason == 3 means the end of flow detected - if exportTime >= flowStartTime+iperfTimeSec || record.FlowEndReason == 3 { + if record.FlowEndReason == 3 { octetTotalCount := record.OctetTotalCount recBandwidth = float64(octetTotalCount) * 8 / float64(exportTime-flowStartTime) / 1000000 } else { @@ -1132,7 +1140,7 @@ func checkRecordsForFlowsClickHouse(t *testing.T, data *TestData, srcIP, dstIP, assert.GreaterOrEqualf(t, len(clickHouseRecords), expectedNumDataRecords, "ClickHouse should receive expected number of flow records. Considered records: %s", clickHouseRecords) } -func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName string, srcPodName string, srcIP string, dstIP string, dstPort int32, isIPv6 bool, egressName, egressIP string) { +func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName string, srcPodName string, srcIP string, dstIP string, dstPort int32, isIPv6 bool, egressName, egressIP, labelFilter string) { var cmd string if !isIPv6 { cmd = fmt.Sprintf("wget -O- %s:%d", dstIP, dstPort) @@ -1141,24 +1149,19 @@ func checkRecordsForToExternalFlows(t *testing.T, data *TestData, srcNodeName st } stdout, stderr, err := data.RunCommandFromPod(data.testNamespace, srcPodName, busyboxContainerName, strings.Fields(cmd)) require.NoErrorf(t, err, "Error when running wget command, stdout: %s, stderr: %s", stdout, stderr) - - _, recordSlices := getCollectorOutput(t, srcIP, dstIP, "", false, false, isIPv6, data, "") + _, recordSlices := getCollectorOutput(t, srcIP, dstIP, "", false, false, isIPv6, data, labelFilter) for _, record := range recordSlices { - if strings.Contains(record, srcIP) && strings.Contains(record, dstIP) { - checkPodAndNodeData(t, record, srcPodName, srcNodeName, "", "", data.testNamespace) - checkFlowType(t, record, ipfixregistry.FlowTypeToExternal) - assert.NotContains(t, record, "octetDeltaCount: 0", "octetDeltaCount should be non-zero") - if egressName != "" { - checkEgressInfo(t, record, egressName, egressIP) - } + checkPodAndNodeData(t, record, srcPodName, srcNodeName, "", "", data.testNamespace) + checkFlowType(t, record, ipfixregistry.FlowTypeToExternal) + if egressName != "" { + checkEgressInfo(t, record, egressName, egressIP) } } - clickHouseRecords := getClickHouseOutput(t, data, srcIP, dstIP, "", false, false, "") + clickHouseRecords := getClickHouseOutput(t, data, srcIP, dstIP, "", false, false, labelFilter) for _, record := range clickHouseRecords { checkPodAndNodeDataClickHouse(data, t, record, srcPodName, srcNodeName, "", "") checkFlowTypeClickHouse(t, record, ipfixregistry.FlowTypeToExternal) - assert.Greater(t, record.OctetDeltaCount, uint64(0), "octetDeltaCount should be non-zero") if egressName != "" { checkEgressInfoClickHouse(t, record, egressName, egressIP) } @@ -1418,31 +1421,36 @@ func getCollectorOutput(t *testing.T, srcIP, dstIP, srcPort string, isDstService err := wait.PollImmediate(500*time.Millisecond, exporterActiveFlowExportTimeout+aggregatorActiveFlowRecordTimeout*2, func() (bool, error) { var rc int var err error - // `pod-running-timeout` option is added to cover scenarios where ipfix flow-collector has crashed after being deployed - rc, collectorOutput, _, err = data.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl logs --pod-running-timeout=%v ipfix-collector -n %s", aggregatorInactiveFlowRecordTimeout.String(), data.testNamespace)) + var cmd string + ipfixCollectorIP, err := testData.podWaitForIPs(defaultTimeout, "ipfix-collector", testData.testNamespace) + if err != nil || len(ipfixCollectorIP.ipStrings) == 0 { + require.NoErrorf(t, err, "Should be able to get IP from IPFIX collector Pod") + } + if !isIPv6 { + cmd = fmt.Sprintf("curl http://%s:8080/records", ipfixCollectorIP.ipv4.String()) + } else { + cmd = fmt.Sprintf("curl http://[%s]:8080/records", ipfixCollectorIP.ipv6.String()) + } + rc, collectorOutput, _, err = data.RunCommandOnNode(controlPlaneNodeName(), cmd) if err != nil || rc != 0 { return false, err } // Checking that all the data records which correspond to the iperf flow are received - recordSlices = getRecordsFromOutput(t, collectorOutput, labelFilter) src, dst := matchSrcAndDstAddress(srcIP, dstIP, isDstService, isIPv6) + recordSlices = getRecordsFromOutput(t, collectorOutput, labelFilter, src, dst, srcPort) if checkAllRecords { for _, record := range recordSlices { - flowStartTime := int64(getUint64FieldFromRecord(t, record, "flowStartSeconds")) - exportTime := int64(getUint64FieldFromRecord(t, record, "flowEndSeconds")) flowEndReason := int64(getUint64FieldFromRecord(t, record, "flowEndReason")) - if strings.Contains(record, src) && strings.Contains(record, dst) && strings.Contains(record, srcPort) { - // flowEndReason == 3 means the end of flow detected - if exportTime >= flowStartTime+iperfTimeSec || flowEndReason == 3 { - return true, nil - } + // flowEndReason == 3 means the end of flow detected + if flowEndReason == 3 { + return true, nil } } return false, nil } - return strings.Contains(collectorOutput, src) && strings.Contains(collectorOutput, dst) && strings.Contains(collectorOutput, srcPort), nil + return len(recordSlices) != 0, nil }) - require.NoErrorf(t, err, "IPFIX collector did not receive the expected records in collector output: %v iperf source port: %s", collectorOutput, srcPort) + require.NoErrorf(t, err, "IPFIX collector did not receive the expected records in collector, recordSlices ares: %v, output: %v iperf source port: %s", recordSlices, collectorOutput, srcPort) return collectorOutput, recordSlices } @@ -1454,9 +1462,9 @@ func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort str var flowRecords []*ClickHouseFullRow var queryOutput string - query := fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationIP = '%s')", srcIP, dstIP) + query := fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationIP = '%s') AND (octetDeltaCount != 0)", srcIP, dstIP) if isDstService { - query = fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationClusterIP = '%s')", srcIP, dstIP) + query = fmt.Sprintf("SELECT * FROM flows WHERE (sourceIP = '%s') AND (destinationClusterIP = '%s') AND (octetDeltaCount != 0)", srcIP, dstIP) } if len(srcPort) > 0 { query = fmt.Sprintf("%s AND (sourceTransportPort = %s)", query, srcPort) @@ -1477,7 +1485,6 @@ func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort str if err != nil { return false, err } - rows := strings.Split(queryOutput, "\n") flowRecords = make([]*ClickHouseFullRow, 0, len(rows)) for _, row := range rows { @@ -1495,10 +1502,8 @@ func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort str if checkAllRecords { for _, record := range flowRecords { - flowStartTime := record.FlowStartSeconds.Unix() - exportTime := record.FlowEndSeconds.Unix() // flowEndReason == 3 means the end of flow detected - if exportTime >= flowStartTime+iperfTimeSec || record.FlowEndReason == 3 { + if record.FlowEndReason == 3 { return true, nil } } @@ -1510,17 +1515,24 @@ func getClickHouseOutput(t *testing.T, data *TestData, srcIP, dstIP, srcPort str return flowRecords } -func getRecordsFromOutput(t *testing.T, output, labelFilter string) []string { - re := regexp.MustCompile("(?m)^.*" + "#" + ".*$[\r\n]+") - output = re.ReplaceAllString(output, "") - output = strings.TrimSpace(output) - recordSlices := strings.Split(output, "IPFIX-HDR:") - if labelFilter == "" { - return recordSlices +func getRecordsFromOutput(t *testing.T, output, labelFilter, src, dst, srcPort string) []string { + var response IPFIXCollectorResponse + err := json.Unmarshal([]byte(output), &response) + if err != nil { + require.NoErrorf(t, err, "error when unmarshall output from IPFIX collector Pod") } + recordSlices := response.FlowRecords records := []string{} for _, recordSlice := range recordSlices { - if strings.Contains(recordSlice, labelFilter) { + // We don't check the last record. + if strings.Contains(recordSlice, "octetDeltaCount: 0") { + continue + } + // We don't check the record that can't match the srcIP, dstIP and srcPort. + if !strings.Contains(recordSlice, src) || !strings.Contains(recordSlice, dst) || !strings.Contains(recordSlice, srcPort) { + continue + } + if labelFilter == "" || strings.Contains(recordSlice, labelFilter) { records = append(records, recordSlice) } } @@ -1753,14 +1765,24 @@ func deletePerftestServices(t *testing.T, data *TestData) { } } -func addLabelToPerftestPods(t *testing.T, data *TestData, label string) { - perftestPods, err := data.clientset.CoreV1().Pods(data.testNamespace).List(context.TODO(), metav1.ListOptions{LabelSelector: "app=iperf"}) - require.NoError(t, err, "Error when getting perftest Pods") - for i := range perftestPods.Items { - pod := &perftestPods.Items[i] - pod.Labels["targetLabel"] = label - _, err = data.clientset.CoreV1().Pods(data.testNamespace).Update(context.TODO(), pod, metav1.UpdateOptions{}) - require.NoErrorf(t, err, "Error when adding label to %s", pod.Name) +func addLabelToTestPods(t *testing.T, data *TestData, label string, podNames []string) { + for _, podName := range podNames { + testPod, err := data.clientset.CoreV1().Pods(data.testNamespace).Get(context.TODO(), podName, metav1.GetOptions{}) + require.NoErrorf(t, err, "Error when getting Pod %s in %s", testPod, data.testNamespace) + testPod.Labels["targetLabel"] = label + _, err = data.clientset.CoreV1().Pods(data.testNamespace).Update(context.TODO(), testPod, metav1.UpdateOptions{}) + require.NoErrorf(t, err, "Error when adding label to %s", testPod.Name) + err = wait.Poll(defaultInterval, timeout, func() (bool, error) { + pod, err := data.clientset.CoreV1().Pods(data.testNamespace).Get(context.TODO(), testPod.Name, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return false, nil + } + return false, fmt.Errorf("error when getting Pod '%s': %w", pod.Name, err) + } + return pod.Labels["targetLabel"] == label, nil + }) + require.NoErrorf(t, err, "Error when verifying the label on %s", testPod.Name) } } @@ -1815,6 +1837,34 @@ func createToExternalTestServer(t *testing.T, data *TestData) *PodIPs { return serverIPs } +func getAndCheckFlowAggregatorMetrics(t *testing.T, data *TestData) error { + flowAggPod, err := data.getFlowAggregator() + if err != nil { + return fmt.Errorf("error when getting flow-aggregator Pod: %w", err) + } + podName := flowAggPod.Name + command := []string{"antctl", "get", "recordmetrics", "-o", "json"} + if err := wait.Poll(defaultInterval, 2*defaultTimeout, func() (bool, error) { + stdout, _, err := runAntctl(podName, command, data) + if err != nil { + t.Logf("Error when requesting recordmetrics, %v", err) + return false, nil + } + metrics := &recordmetrics.Response{} + if err := json.Unmarshal([]byte(stdout), metrics); err != nil { + return false, fmt.Errorf("error when decoding recordmetrics: %w", err) + } + if metrics.NumConnToCollector != int64(clusterInfo.numNodes) || !metrics.WithClickHouseExporter || !metrics.WithIPFIXExporter || metrics.NumRecordsExported == 0 { + t.Logf("Metrics are not correct. Current metrics: NumConnToCollector=%d, ClickHouseExporter=%v, IPFIXExporter=%v, NumRecordsExported=%d", metrics.NumConnToCollector, metrics.WithClickHouseExporter, metrics.WithIPFIXExporter, metrics.NumRecordsExported) + return false, nil + } + return true, nil + }); err != nil { + return fmt.Errorf("error when checking recordmetrics for Flow Aggregator: %w", err) + } + return nil +} + type ClickHouseFullRow struct { TimeInserted time.Time `json:"timeInserted"` FlowStartSeconds time.Time `json:"flowStartSeconds"` diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 12af781685b..1d39f4988e3 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -75,42 +75,44 @@ const ( defaultInterval = 1 * time.Second // antreaNamespace is the K8s Namespace in which all Antrea resources are running. - antreaNamespace = "kube-system" - kubeNamespace = "kube-system" - flowAggregatorNamespace = "flow-aggregator" - antreaConfigVolume = "antrea-config" - antreaWindowsConfigVolume = "antrea-windows-config" - flowAggregatorConfigVolume = "flow-aggregator-config" - antreaDaemonSet = "antrea-agent" - antreaWindowsDaemonSet = "antrea-agent-windows" - antreaDeployment = "antrea-controller" - flowAggregatorDeployment = "flow-aggregator" - flowAggregatorCHSecret = "clickhouse-ca" - antreaDefaultGW = "antrea-gw0" - testAntreaIPAMNamespace = "antrea-ipam-test" - testAntreaIPAMNamespace11 = "antrea-ipam-test-11" - testAntreaIPAMNamespace12 = "antrea-ipam-test-12" - busyboxContainerName = "busybox" - mcjoinContainerName = "mcjoin" - agnhostContainerName = "agnhost" - toolboxContainerName = "toolbox" - nginxContainerName = "nginx" - controllerContainerName = "antrea-controller" - ovsContainerName = "antrea-ovs" - agentContainerName = "antrea-agent" - antreaYML = "antrea.yml" - antreaIPSecYML = "antrea-ipsec.yml" - antreaCovYML = "antrea-coverage.yml" - antreaIPSecCovYML = "antrea-ipsec-coverage.yml" - flowAggregatorYML = "flow-aggregator.yml" - flowAggregatorCovYML = "flow-aggregator-coverage.yml" - flowVisibilityYML = "flow-visibility.yml" - flowVisibilityTLSYML = "flow-visibility-tls.yml" - chOperatorYML = "clickhouse-operator-install-bundle.yml" - flowVisibilityCHPodName = "chi-clickhouse-clickhouse-0-0-0" - flowVisibilityNamespace = "flow-visibility" - defaultBridgeName = "br-int" - monitoringNamespace = "monitoring" + antreaNamespace = "kube-system" + kubeNamespace = "kube-system" + flowAggregatorNamespace = "flow-aggregator" + antreaConfigVolume = "antrea-config" + antreaWindowsConfigVolume = "antrea-windows-config" + flowAggregatorConfigVolume = "flow-aggregator-config" + antreaDaemonSet = "antrea-agent" + antreaWindowsDaemonSet = "antrea-agent-windows" + antreaDeployment = "antrea-controller" + flowAggregatorDeployment = "flow-aggregator" + flowAggregatorCHSecret = "clickhouse-ca" + antreaDefaultGW = "antrea-gw0" + testAntreaIPAMNamespace = "antrea-ipam-test" + testAntreaIPAMNamespace11 = "antrea-ipam-test-11" + testAntreaIPAMNamespace12 = "antrea-ipam-test-12" + busyboxContainerName = "busybox" + mcjoinContainerName = "mcjoin" + agnhostContainerName = "agnhost" + toolboxContainerName = "toolbox" + nginxContainerName = "nginx" + controllerContainerName = "antrea-controller" + ovsContainerName = "antrea-ovs" + agentContainerName = "antrea-agent" + flowAggregatorContainerName = "flow-aggregator" + + antreaYML = "antrea.yml" + antreaIPSecYML = "antrea-ipsec.yml" + antreaCovYML = "antrea-coverage.yml" + antreaIPSecCovYML = "antrea-ipsec-coverage.yml" + flowAggregatorYML = "flow-aggregator.yml" + flowAggregatorCovYML = "flow-aggregator-coverage.yml" + flowVisibilityYML = "flow-visibility.yml" + flowVisibilityTLSYML = "flow-visibility-tls.yml" + chOperatorYML = "clickhouse-operator-install-bundle.yml" + flowVisibilityCHPodName = "chi-clickhouse-clickhouse-0-0-0" + flowVisibilityNamespace = "flow-visibility" + defaultBridgeName = "br-int" + monitoringNamespace = "monitoring" antreaControllerCovBinary = "antrea-controller-coverage" antreaAgentCovBinary = "antrea-agent-coverage" @@ -132,7 +134,7 @@ const ( nginxImage = "projects.registry.vmware.com/antrea/nginx:1.21.6-alpine" iisImage = "mcr.microsoft.com/windows/servercore/iis" toolboxImage = "projects.registry.vmware.com/antrea/toolbox:1.2-1" - ipfixCollectorImage = "projects.registry.vmware.com/antrea/ipfix-collector:v0.6.2" + ipfixCollectorImage = "projects.registry.vmware.com/antrea/ipfix-collector:v0.8.2" ipfixCollectorPort = "4739" clickHouseHTTPPort = "8123"