diff --git a/.github/workflows/kind.yml b/.github/workflows/kind.yml index 02b89cce7b0..89baa920f27 100755 --- a/.github/workflows/kind.yml +++ b/.github/workflows/kind.yml @@ -189,6 +189,60 @@ jobs: path: log.tar.gz retention-days: 30 + test-e2e-encap-endpointslice: + name: E2e tests on a Kind cluster on Linux with endpointslice enabled + needs: build-antrea-coverage-image + runs-on: [ubuntu-18.04] + steps: + - name: Free disk space + # https://github.com/actions/virtual-environments/issues/709 + run: | + sudo apt-get clean + df -h + - uses: actions/checkout@v2 + - uses: actions/setup-go@v1 + with: + go-version: 1.13 + - name: Download Antrea image from previous job + uses: actions/download-artifact@v1 + with: + name: antrea-ubuntu + - name: Load Antrea image + run: docker load -i antrea-ubuntu/antrea-ubuntu.tar + - name: Install Kind + run: | + curl -Lo ./kind https://github.com/kubernetes-sigs/kind/releases/download/${KIND_VERSION}/kind-$(uname)-amd64 + chmod +x ./kind + sudo mv kind /usr/local/bin + - name: Run e2e tests + run: | + mkdir log + mkdir test-e2e-encap-endpointslice-coverage + ANTREA_LOG_DIR=$PWD/log ANTREA_COV_DIR=$PWD/test-e2e-encap-endpointslice-coverage ./ci/kind/test-e2e-kind.sh --encap-mode encap --endpointslice --coverage + - name: Tar coverage files + run: tar -czf test-e2e-encap-endpointslice-coverage.tar.gz test-e2e-encap-endpointslice-coverage + - name: Upload coverage for test-e2e-encap-endpointslice-coverage + uses: actions/upload-artifact@v2 + with: + name: test-e2e-encap-endpointslice-coverage + path: test-e2e-encap-endpointslice-coverage.tar.gz + - name: Codecov + uses: codecov/codecov-action@v1 + with: + file: '*antrea*' + flags: kind-e2e-tests + name: codecov-test-e2e-encap-endpointslice + directory: test-e2e-encap-endpointslice-coverage + - name: Tar log files + if: ${{ failure() }} + run: tar -czf log.tar.gz log + - name: Upload test log + uses: actions/upload-artifact@v2 + if: ${{ failure() }} + with: + name: e2e-kind-encap-endpointslice.tar.gz + path: log.tar.gz + test-e2e-noencap: name: E2e tests on a Kind cluster on Linux (noEncap) needs: [build-antrea-coverage-image, build-flow-aggregator-image] diff --git a/Makefile b/Makefile index a72a52fca19..075f428f1d9 100644 --- a/Makefile +++ b/Makefile @@ -152,6 +152,7 @@ endif -v $(CURDIR)/.coverage:/usr/src/github.com/vmware-tanzu/antrea/.coverage \ -v $(CURDIR):/usr/src/github.com/vmware-tanzu/antrea:ro \ -v /lib/modules:/lib/modules \ + --sysctl net.ipv6.conf.all.disable_ipv6=0 \ antrea/test test-integration $(USERID) $(GRPID) .PHONY: docker-tidy diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index 5f4013292b9..8e3f0804104 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -883,6 +883,14 @@ rules: - get - watch - list +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - watch + - list - apiGroups: - clusterinformation.antrea.tanzu.vmware.com resources: @@ -1170,6 +1178,10 @@ data: # Service traffic. # AntreaProxy: true + # Enable EndpointSlice support in AntreaProxy. If AntreaProxy is not enabled, this + # flag will not take effect. + # EndpointSlice: false + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -1332,7 +1344,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-b8hh7hm486 + name: antrea-config-8hc98g2f9b namespace: kube-system --- apiVersion: v1 @@ -1443,7 +1455,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-b8hh7hm486 + name: antrea-config-8hc98g2f9b name: antrea-config - name: antrea-controller-tls secret: @@ -1708,7 +1720,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-b8hh7hm486 + name: antrea-config-8hc98g2f9b name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index c1c243bd170..cd3b9644918 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -883,6 +883,14 @@ rules: - get - watch - list +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - watch + - list - apiGroups: - clusterinformation.antrea.tanzu.vmware.com resources: @@ -1170,6 +1178,10 @@ data: # Service traffic. # AntreaProxy: true + # Enable EndpointSlice support in AntreaProxy. If AntreaProxy is not enabled, this + # flag will not take effect. + # EndpointSlice: false + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -1332,7 +1344,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-b8hh7hm486 + name: antrea-config-8hc98g2f9b namespace: kube-system --- apiVersion: v1 @@ -1443,7 +1455,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-b8hh7hm486 + name: antrea-config-8hc98g2f9b name: antrea-config - name: antrea-controller-tls secret: @@ -1710,7 +1722,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-b8hh7hm486 + name: antrea-config-8hc98g2f9b name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 6dbdee6102e..9c3741ff030 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -883,6 +883,14 @@ rules: - get - watch - list +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - watch + - list - apiGroups: - clusterinformation.antrea.tanzu.vmware.com resources: @@ -1170,6 +1178,10 @@ data: # Service traffic. # AntreaProxy: true + # Enable EndpointSlice support in AntreaProxy. If AntreaProxy is not enabled, this + # flag will not take effect. + # EndpointSlice: false + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -1332,7 +1344,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-hhfkgg2fg5 + name: antrea-config-tkhmdbc7hc namespace: kube-system --- apiVersion: v1 @@ -1443,7 +1455,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-hhfkgg2fg5 + name: antrea-config-tkhmdbc7hc name: antrea-config - name: antrea-controller-tls secret: @@ -1711,7 +1723,7 @@ spec: path: /home/kubernetes/bin name: host-cni-bin - configMap: - name: antrea-config-hhfkgg2fg5 + name: antrea-config-tkhmdbc7hc name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index e752315b49e..79c4c609b26 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -883,6 +883,14 @@ rules: - get - watch - list +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - watch + - list - apiGroups: - clusterinformation.antrea.tanzu.vmware.com resources: @@ -1170,6 +1178,10 @@ data: # Service traffic. # AntreaProxy: true + # Enable EndpointSlice support in AntreaProxy. If AntreaProxy is not enabled, this + # flag will not take effect. + # EndpointSlice: false + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -1337,7 +1349,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-bdc66g4872 + name: antrea-config-75d52g7h69 namespace: kube-system --- apiVersion: v1 @@ -1457,7 +1469,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-bdc66g4872 + name: antrea-config-75d52g7h69 name: antrea-config - name: antrea-controller-tls secret: @@ -1757,7 +1769,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-bdc66g4872 + name: antrea-config-75d52g7h69 name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index d0f9ac2615a..7888f4bf2c1 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -883,6 +883,14 @@ rules: - get - watch - list +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - watch + - list - apiGroups: - clusterinformation.antrea.tanzu.vmware.com resources: @@ -1170,6 +1178,10 @@ data: # Service traffic. # AntreaProxy: true + # Enable EndpointSlice support in AntreaProxy. If AntreaProxy is not enabled, this + # flag will not take effect. + # EndpointSlice: false + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true @@ -1337,7 +1349,7 @@ metadata: annotations: {} labels: app: antrea - name: antrea-config-9964gfgbb4 + name: antrea-config-c62t4mcf7k namespace: kube-system --- apiVersion: v1 @@ -1448,7 +1460,7 @@ spec: key: node-role.kubernetes.io/master volumes: - configMap: - name: antrea-config-9964gfgbb4 + name: antrea-config-c62t4mcf7k name: antrea-config - name: antrea-controller-tls secret: @@ -1713,7 +1725,7 @@ spec: operator: Exists volumes: - configMap: - name: antrea-config-9964gfgbb4 + name: antrea-config-c62t4mcf7k name: antrea-config - hostPath: path: /etc/cni/net.d diff --git a/build/yamls/base/agent-rbac.yml b/build/yamls/base/agent-rbac.yml index 5a2e60bde33..09bb8c283a7 100644 --- a/build/yamls/base/agent-rbac.yml +++ b/build/yamls/base/agent-rbac.yml @@ -36,6 +36,14 @@ rules: - get - watch - list + - apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - watch + - list - apiGroups: - clusterinformation.antrea.tanzu.vmware.com resources: diff --git a/build/yamls/base/conf/antrea-agent.conf b/build/yamls/base/conf/antrea-agent.conf index 8316dfa6fc0..37a45a54125 100644 --- a/build/yamls/base/conf/antrea-agent.conf +++ b/build/yamls/base/conf/antrea-agent.conf @@ -5,6 +5,10 @@ featureGates: # Service traffic. # AntreaProxy: true +# Enable EndpointSlice support in AntreaProxy. If AntreaProxy is not enabled, this +# flag will not take effect. +# EndpointSlice: false + # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true diff --git a/ci/kind/test-e2e-kind.sh b/ci/kind/test-e2e-kind.sh index 0cfa7fcc22b..03b99a2bf7c 100755 --- a/ci/kind/test-e2e-kind.sh +++ b/ci/kind/test-e2e-kind.sh @@ -25,6 +25,7 @@ function echoerr { _usage="Usage: $0 [--encap-mode ] [--no-proxy] [--np] [--coverage] [--help|-h] --encap-mode Traffic encapsulation mode. (default is 'encap') --no-proxy Disables Antrea proxy. + --endpointslice Enables Antrea proxy and EndpointSlice support --np Enables Namespaced Antrea NetworkPolicy CRDs and ClusterNetworkPolicy related CRDs. --coverage Enables measure Antrea code coverage when run e2e tests on kind. --help, -h Print this message and exit @@ -49,6 +50,7 @@ trap "quit" INT EXIT mode="" proxy=true +endpointslice=false np=false coverage=false while [[ $# -gt 0 ]] @@ -60,6 +62,10 @@ case $key in proxy=false shift ;; + --endpointslice) + endpointslice=true + shift + ;; --np) np=true shift @@ -87,6 +93,9 @@ manifest_args="" if ! $proxy; then manifest_args="$manifest_args --no-proxy" fi +if $endpointslice; then + manifest_args="$manifest_args --endpointslice" +fi if $np; then # See https://github.com/vmware-tanzu/antrea/issues/897 manifest_args="$manifest_args --np --tun vxlan" diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index c01ddad3a71..d3b8f03c977 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -186,9 +186,9 @@ func run(o *Options) error { case v4Enabled && v6Enabled: proxier = proxy.NewDualStackProxier(nodeConfig.Name, informerFactory, ofClient) case v4Enabled: - proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, false) + proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, features.DefaultFeatureGate.Enabled(features.EndpointSlice), false) case v6Enabled: - proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, true) + proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, features.DefaultFeatureGate.Enabled(features.EndpointSlice), true) default: return fmt.Errorf("at least one of IPv4 or IPv6 should be enabled") } diff --git a/docs/feature-gates.md b/docs/feature-gates.md index 9ee1abca8ef..569a4d8bde9 100644 --- a/docs/feature-gates.md +++ b/docs/feature-gates.md @@ -36,6 +36,7 @@ example, to enable `AntreaProxy` on Linux, edit the Agent configuration in the | Feature Name | Component | Default | Stage | Alpha Release | Beta Release | GA Release | Extra Requirements | Notes | | ----------------------- | ------------------ | ------- | ----- | ------------- | ------------ | ---------- | ------------------ | ----- | | `AntreaProxy` | Agent | `false` | Alpha | v0.8 | v0.11 | N/A | Yes | Must be enabled for Windows. | +| `EndpointSlice` | Agent | `false` | Alpha | v0.13.0 | N/A | N/A | Yes | | | `AntreaPolicy` | Agent + Controller | `false` | Alpha | v0.8 | N/A | N/A | No | Agent side config required from v0.9.0+. | | `Traceflow` | Agent + Controller | `false` | Alpha | v0.8 | v0.11 | N/A | Yes | | | `FlowExporter` | Agent | `false` | Alpha | v0.9 | N/A | N/A | Yes | | @@ -55,6 +56,15 @@ manifest provided as part of releases enables this feature by default. If you edit the manifest, make sure you do not disable it, as it is needed for correct NetworkPolicy implementation for Pod-to-Service traffic. +### EndpointSlice + +`EndpointSlice` enable Service EndpointSlice support in Antrea proxy. The +EndpointSlice API was introduced in Kubernetes 1.16 (alpha) and it is enabled +by default in Kubernetes 1.17 (beta). This flag will take no effect if AntreaProxy +is not enabled. Refer to this +[link](https://kubernetes.io/docs/tasks/administer-cluster/enabling-endpointslices/) +for more information. + #### Requirements for this Feature When using the OVS built-in kernel module (which is the most common case), your diff --git a/hack/generate-manifest.sh b/hack/generate-manifest.sh index 2b6adb31b4d..3de6ac954d0 100755 --- a/hack/generate-manifest.sh +++ b/hack/generate-manifest.sh @@ -29,6 +29,7 @@ Generate a YAML manifest for Antrea using Kustomize and print it to stdout. --ipsec Generate a manifest with IPSec encryption of tunnel traffic enabled --all-features Generate a manifest with all alpha features enabled --no-proxy Generate a manifest with Antrea proxy disabled + --endpointslice Generate a manifest with EndpointSlice support enabled --np Generate a manifest with ClusterNetworkPolicy and Antrea NetworkPolicy features enabled --k8s-1.15 Generates a manifest which supports Kubernetes 1.15. --keep Debug flag which will preserve the generated kustomization.yml @@ -62,6 +63,7 @@ KIND=false IPSEC=false ALLFEATURES=false PROXY=true +ENDPOINTSLICE=false NP=false KEEP=false ENCAP_MODE="" @@ -106,6 +108,11 @@ case $key in PROXY=false shift ;; + --endpointslice) + PROXY=true + ENDPOINTSLICE=true + shift + ;; --np) NP=true shift @@ -149,6 +156,12 @@ case $key in esac done +if [ "$PROXY" == false ] && [ "$ENDPOINTSLICE" == true ]; then + echoerr "--endpointslice works only without --no-proxy" + print_help + exit 1 +fi + if [ "$MODE" != "dev" ] && [ "$MODE" != "release" ]; then echoerr "--mode must be one of 'dev' or 'release'" print_help @@ -230,12 +243,17 @@ if $ALLFEATURES; then sed -i.bak -E "s/^[[:space:]]*#[[:space:]]*AntreaPolicy[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ AntreaPolicy: true/" antrea-agent.conf sed -i.bak -E "s/^[[:space:]]*#[[:space:]]*FlowExporter[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ FlowExporter: true/" antrea-agent.conf sed -i.bak -E "s/^[[:space:]]*#[[:space:]]*NetworkPolicyStats[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ NetworkPolicyStats: true/" antrea-agent.conf + sed -i.bak -E "s/^[[:space:]]*#[[:space:]]*EndpointSlice[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ EndpointSlice: true/" antrea-agent.conf fi if ! $PROXY; then sed -i.bak -E "s/^[[:space:]]*#[[:space:]]*AntreaProxy[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ AntreaProxy: false/" antrea-agent.conf fi +if $ENDPOINTSLICE; then + sed -i.bak -E "s/^[[:space:]]*#[[:space:]]*EndpointSlice[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ EndpointSlice: true/" antrea-agent.conf +fi + if $NP; then sed -i.bak -E "s/^[[:space:]]*#[[:space:]]*AntreaPolicy[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ AntreaPolicy: true/" antrea-controller.conf sed -i.bak -E "s/^[[:space:]]*#[[:space:]]*AntreaPolicy[[:space:]]*:[[:space:]]*[a-z]+[[:space:]]*$/ AntreaPolicy: true/" antrea-agent.conf diff --git a/pkg/agent/proxy/endpoints.go b/pkg/agent/proxy/endpoints.go index f4f72443555..29b2aae5a9a 100644 --- a/pkg/agent/proxy/endpoints.go +++ b/pkg/agent/proxy/endpoints.go @@ -21,6 +21,7 @@ import ( "sync" corev1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1beta1" apimachinerytypes "k8s.io/apimachinery/pkg/types" "k8s.io/klog" @@ -28,6 +29,12 @@ import ( k8sproxy "github.com/vmware-tanzu/antrea/third_party/proxy" ) +var supportedEndpointSliceAddressTypes = map[string]struct{}{ + string(discovery.AddressTypeIP): {}, // IP is a deprecated address type + string(discovery.AddressTypeIPv4): {}, + string(discovery.AddressTypeIPv6): {}, +} + // endpointsChange describes an Endpoints change, previous is the state from before // all of them, current is state after applying all of those. type endpointsChange struct { @@ -44,17 +51,23 @@ type endpointsChangesTracker struct { // initialized tells whether Endpoints have been synced. initialized bool // changes contains endpoints changes since the last checkoutChanges call. - changes map[apimachinerytypes.NamespacedName]*endpointsChange + changes map[apimachinerytypes.NamespacedName]*endpointsChange + sliceCache *EndpointSliceCache } -func newEndpointsChangesTracker(hostname string) *endpointsChangesTracker { - return &endpointsChangesTracker{ +func newEndpointsChangesTracker(hostname string, enableEndpointSlice bool) *endpointsChangesTracker { + tracker := &endpointsChangesTracker{ hostname: hostname, changes: map[apimachinerytypes.NamespacedName]*endpointsChange{}, } + ipv6 := false + if enableEndpointSlice { + tracker.sliceCache = NewEndpointSliceCache(hostname, &ipv6) + } + return tracker } -// OnEndpointUpdate updates given Service's Endpoints change map based on the +// OnEndpointUpdate updates the given Service's endpointsChange map based on the // Endpoints pair. It returns true if items changed, // otherwise it returns false. // Update can be used to add/update/delete items of EndpointsChangeMap. @@ -95,10 +108,42 @@ func (t *endpointsChangesTracker) OnEndpointUpdate(previous, current *corev1.End return len(t.changes) > 0 } +// EndpointSliceUpdate updates the given service's endpoints change map based on the endpoints pair. +// It returns true if items changed, otherwise it returns false. Will add/update/delete items of endpointsChange Map. +// If removeSlice is true, slice will be removed, otherwise it will be added or updated. +func (t *endpointsChangesTracker) OnEndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool { + // This should never happen. + if endpointSlice == nil { + klog.Error("Nil endpointSlice passed to EndpointSliceUpdate") + return false + } + + if _, has := supportedEndpointSliceAddressTypes[string(endpointSlice.AddressType)]; !has { + klog.V(4).Infof("EndpointSlice address type not supported: %s", endpointSlice.AddressType) + return false + } + + if _, _, err := endpointSliceCacheKeys(endpointSlice); err != nil { + klog.Warningf("Error getting endpoint slice cache keys: %v", err) + return false + } + + t.Lock() + defer t.Unlock() + + changeNeeded := t.sliceCache.updatePending(endpointSlice, removeSlice) + + return changeNeeded +} + func (t *endpointsChangesTracker) checkoutChanges() []*endpointsChange { t.Lock() defer t.Unlock() + if t.sliceCache != nil { + return t.sliceCache.checkoutChanges() + } + var changes []*endpointsChange for _, change := range t.changes { changes = append(changes, change) diff --git a/pkg/agent/proxy/endpointslicecache.go b/pkg/agent/proxy/endpointslicecache.go new file mode 100644 index 00000000000..aa988b7d4c3 --- /dev/null +++ b/pkg/agent/proxy/endpointslicecache.go @@ -0,0 +1,365 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Copyright 2020 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Original file https://raw.githubusercontent.com/kubernetes/kubernetes/0c0d4fea8dd6bdcd16b9e1d35da3f7d209341a6f/pkg/proxy/endpointslicecache.go +// Remove makeEndpointInfo and recorder in fields. +// Remove unused standardEndpointInfo. +// Remove unneeded sort.Sort in endpointsMapFromEndpointInfo. +// Update import paths. + +package proxy + +import ( + "fmt" + "reflect" + "sort" + "strings" + "sync" + + v1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1beta1" + apimachinerytypes "k8s.io/apimachinery/pkg/types" + "k8s.io/klog" + utilnet "k8s.io/utils/net" + + "github.com/vmware-tanzu/antrea/pkg/agent/proxy/types" + "github.com/vmware-tanzu/antrea/third_party/proxy" +) + +// EndpointSliceCache is used as a cache of EndpointSlice information. +type EndpointSliceCache struct { + // lock protects trackerByServiceMap. + lock sync.Mutex + + // trackerByServiceMap is the basis of this cache. It contains endpoint + // slice trackers grouped by service name and endpoint slice name. The first + // key represents a namespaced service name while the second key represents + // an endpoint slice name. Since endpoints can move between slices, we + // require slice specific caching to prevent endpoints being removed from + // the cache when they may have just moved to a different slice. + trackerByServiceMap map[apimachinerytypes.NamespacedName]*endpointSliceTracker + + hostname string + isIPv6Mode *bool +} + +// endpointSliceTracker keeps track of EndpointSlices as they have been applied +// by a proxier along with any pending EndpointSlices that have been updated +// in this cache but not yet applied by a proxier. +type endpointSliceTracker struct { + applied endpointSliceInfoByName + pending endpointSliceInfoByName +} + +// endpointSliceInfoByName groups endpointSliceInfo by the names of the +// corresponding EndpointSlices. +type endpointSliceInfoByName map[string]*endpointSliceInfo + +// endpointSliceInfo contains just the attributes kube-proxy cares about. +// Used for caching. Intentionally small to limit memory util. +type endpointSliceInfo struct { + Ports []discovery.EndpointPort + Endpoints []*endpointInfo + Remove bool +} + +// endpointInfo contains just the attributes kube-proxy cares about. +// Used for caching. Intentionally small to limit memory util. +// Addresses and Topology are copied from EndpointSlice Endpoints. +type endpointInfo struct { + Addresses []string + Topology map[string]string +} + +// spToEndpointMap stores groups Endpoint objects by ServicePortName and +// EndpointSlice name. +type spToEndpointMap map[proxy.ServicePortName]map[string]proxy.Endpoint + +// NewEndpointSliceCache initializes an EndpointSliceCache. +func NewEndpointSliceCache(hostname string, isIPv6Mode *bool) *EndpointSliceCache { + return &EndpointSliceCache{ + trackerByServiceMap: map[apimachinerytypes.NamespacedName]*endpointSliceTracker{}, + hostname: hostname, + isIPv6Mode: isIPv6Mode, + } +} + +// newEndpointSliceTracker initializes an endpointSliceTracker. +func newEndpointSliceTracker() *endpointSliceTracker { + return &endpointSliceTracker{ + applied: endpointSliceInfoByName{}, + pending: endpointSliceInfoByName{}, + } +} + +// newEndpointSliceInfo generates endpointSliceInfo from an EndpointSlice. +func newEndpointSliceInfo(endpointSlice *discovery.EndpointSlice, remove bool) *endpointSliceInfo { + esInfo := &endpointSliceInfo{ + Ports: make([]discovery.EndpointPort, len(endpointSlice.Ports)), + Endpoints: []*endpointInfo{}, + Remove: remove, + } + + // copy here to avoid mutating shared EndpointSlice object. + copy(esInfo.Ports, endpointSlice.Ports) + sort.Sort(byPort(esInfo.Ports)) + + if !remove { + for _, endpoint := range endpointSlice.Endpoints { + if endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready { + esInfo.Endpoints = append(esInfo.Endpoints, &endpointInfo{ + Addresses: endpoint.Addresses, + Topology: endpoint.Topology, + }) + } + } + + sort.Sort(byAddress(esInfo.Endpoints)) + } + + return esInfo +} + +// updatePending updates a pending slice in the cache. +func (cache *EndpointSliceCache) updatePending(endpointSlice *discovery.EndpointSlice, remove bool) bool { + serviceKey, sliceKey, err := endpointSliceCacheKeys(endpointSlice) + if err != nil { + klog.Warningf("Error getting endpoint slice cache keys: %v", err) + return false + } + + esInfo := newEndpointSliceInfo(endpointSlice, remove) + + cache.lock.Lock() + defer cache.lock.Unlock() + + if _, ok := cache.trackerByServiceMap[serviceKey]; !ok { + cache.trackerByServiceMap[serviceKey] = newEndpointSliceTracker() + } + + changed := cache.esInfoChanged(serviceKey, sliceKey, esInfo) + + if changed { + cache.trackerByServiceMap[serviceKey].pending[sliceKey] = esInfo + } + + return changed +} + +// checkoutChanges returns a list of all endpointsChanges that are +// pending and then marks them as applied. +func (cache *EndpointSliceCache) checkoutChanges() []*endpointsChange { + changes := []*endpointsChange{} + + cache.lock.Lock() + defer cache.lock.Unlock() + + for serviceNN, esTracker := range cache.trackerByServiceMap { + if len(esTracker.pending) == 0 { + continue + } + + change := &endpointsChange{} + + change.previous = cache.getEndpointsMap(serviceNN, esTracker.applied) + + for name, sliceInfo := range esTracker.pending { + if sliceInfo.Remove { + delete(esTracker.applied, name) + } else { + esTracker.applied[name] = sliceInfo + } + + delete(esTracker.pending, name) + } + + change.current = cache.getEndpointsMap(serviceNN, esTracker.applied) + changes = append(changes, change) + } + + return changes +} + +// getEndpointsMap computes an EndpointsMap for a given set of EndpointSlices. +func (cache *EndpointSliceCache) getEndpointsMap(serviceNN apimachinerytypes.NamespacedName, sliceInfoByName endpointSliceInfoByName) types.EndpointsMap { + endpointInfoBySP := cache.endpointInfoByServicePort(serviceNN, sliceInfoByName) + return endpointsMapFromEndpointInfo(endpointInfoBySP) +} + +// endpointInfoByServicePort groups endpoint info by service port name and address. +func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN apimachinerytypes.NamespacedName, sliceInfoByName endpointSliceInfoByName) spToEndpointMap { + endpointInfoBySP := spToEndpointMap{} + + for _, sliceInfo := range sliceInfoByName { + for _, port := range sliceInfo.Ports { + if port.Name == nil { + klog.Warningf("Ignoring port with nil name %v", port) + continue + } + // TODO: handle nil ports to mean "all" + if port.Port == nil || *port.Port == int32(0) { + klog.Warningf("Ignoring invalid endpoint port %s", *port.Name) + continue + } + + svcPortName := proxy.ServicePortName{ + NamespacedName: serviceNN, + Port: *port.Name, + Protocol: *port.Protocol, + } + + endpointInfoBySP[svcPortName] = cache.addEndpointsByIP(serviceNN, int(*port.Port), endpointInfoBySP[svcPortName], sliceInfo.Endpoints) + } + } + + return endpointInfoBySP +} + +// addEndpointsByIP adds endpointInfo for each IP. +func (cache *EndpointSliceCache) addEndpointsByIP(serviceNN apimachinerytypes.NamespacedName, portNum int, endpointsByIP map[string]proxy.Endpoint, endpoints []*endpointInfo) map[string]proxy.Endpoint { + if endpointsByIP == nil { + endpointsByIP = map[string]proxy.Endpoint{} + } + + // iterate through endpoints to add them to endpointsByIP. + for _, endpoint := range endpoints { + if len(endpoint.Addresses) == 0 { + klog.Warningf("Ignoring invalid endpoint port %s with empty addresses", endpoint) + continue + } + + // Filter out the incorrect IP version case. Any endpoint port that + // contains incorrect IP version will be ignored. + if cache.isIPv6Mode != nil && utilnet.IsIPv6String(endpoint.Addresses[0]) != *cache.isIPv6Mode { + continue + } + + isLocal := cache.isLocal(endpoint.Topology[v1.LabelHostname]) + endpointInfo := proxy.NewBaseEndpointInfo(endpoint.Addresses[0], portNum, isLocal, endpoint.Topology) + + // This logic ensures we're deduping potential overlapping endpoints + // isLocal should not vary between matching IPs, but if it does, we + // favor a true value here if it exists. + if _, exists := endpointsByIP[endpointInfo.IP()]; !exists || isLocal { + endpointsByIP[endpointInfo.IP()] = endpointInfo + } + } + + return endpointsByIP +} + +func (cache *EndpointSliceCache) isLocal(hostname string) bool { + return len(cache.hostname) > 0 && hostname == cache.hostname +} + +// esInfoChanged returns true if the esInfo parameter should be set as a new +// pending value in the cache. +func (cache *EndpointSliceCache) esInfoChanged(serviceKey apimachinerytypes.NamespacedName, sliceKey string, esInfo *endpointSliceInfo) bool { + if _, ok := cache.trackerByServiceMap[serviceKey]; ok { + appliedInfo, appliedOk := cache.trackerByServiceMap[serviceKey].applied[sliceKey] + pendingInfo, pendingOk := cache.trackerByServiceMap[serviceKey].pending[sliceKey] + + // If there's already a pending value, return whether or not this would + // change that. + if pendingOk { + return !reflect.DeepEqual(esInfo, pendingInfo) + } + + // If there's already an applied value, return whether or not this would + // change that. + if appliedOk { + return !reflect.DeepEqual(esInfo, appliedInfo) + } + } + + // If this is marked for removal and does not exist in the cache, no changes + // are necessary. + if esInfo.Remove { + return false + } + + // If not in the cache, and not marked for removal, it should be added. + return true +} + +// endpointsMapFromEndpointInfo computes an endpointsMap from endpointInfo that +// has been grouped by service port and IP. +func endpointsMapFromEndpointInfo(endpointInfoBySP map[proxy.ServicePortName]map[string]proxy.Endpoint) types.EndpointsMap { + endpointsMap := types.EndpointsMap{} + + // transform endpointInfoByServicePort into an endpointsMap. + for svcPortName, endpointInfoByIP := range endpointInfoBySP { + if len(endpointInfoByIP) > 0 { + endpointsMap[svcPortName] = map[string]proxy.Endpoint{} + for _, endpointInfo := range endpointInfoByIP { + endpointsMap[svcPortName][endpointInfo.String()] = endpointInfo + } + } + } + + return endpointsMap +} + +// endpointSliceCacheKeys returns cache keys used for a given EndpointSlice. +func endpointSliceCacheKeys(endpointSlice *discovery.EndpointSlice) (apimachinerytypes.NamespacedName, string, error) { + var err error + serviceName, ok := endpointSlice.Labels[discovery.LabelServiceName] + if !ok || serviceName == "" { + err = fmt.Errorf("no %s label set on endpoint slice: %s", discovery.LabelServiceName, endpointSlice.Name) + } else if endpointSlice.Namespace == "" || endpointSlice.Name == "" { + err = fmt.Errorf("expected EndpointSlice name and namespace to be set: %v", endpointSlice) + } + return apimachinerytypes.NamespacedName{Namespace: endpointSlice.Namespace, Name: serviceName}, endpointSlice.Name, err +} + +// byAddress helps sort endpointInfo +type byAddress []*endpointInfo + +func (e byAddress) Len() int { + return len(e) +} +func (e byAddress) Swap(i, j int) { + e[i], e[j] = e[j], e[i] +} +func (e byAddress) Less(i, j int) bool { + return strings.Join(e[i].Addresses, ",") < strings.Join(e[j].Addresses, ",") +} + +// byPort helps sort EndpointSlice ports by port number +type byPort []discovery.EndpointPort + +func (p byPort) Len() int { + return len(p) +} +func (p byPort) Swap(i, j int) { + p[i], p[j] = p[j], p[i] +} +func (p byPort) Less(i, j int) bool { + return *p[i].Port < *p[j].Port +} diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 37c8c45f5e2..4705760d503 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -20,6 +20,7 @@ import ( "time" corev1 "k8s.io/api/core/v1" + "k8s.io/api/discovery/v1beta1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/informers" "k8s.io/client-go/tools/record" @@ -30,6 +31,7 @@ import ( "github.com/vmware-tanzu/antrea/pkg/agent/proxy/metrics" "github.com/vmware-tanzu/antrea/pkg/agent/proxy/types" "github.com/vmware-tanzu/antrea/pkg/agent/querier" + "github.com/vmware-tanzu/antrea/pkg/features" binding "github.com/vmware-tanzu/antrea/pkg/ovs/openflow" k8sproxy "github.com/vmware-tanzu/antrea/third_party/proxy" "github.com/vmware-tanzu/antrea/third_party/proxy/config" @@ -41,9 +43,10 @@ const ( ) type proxier struct { - once sync.Once - endpointsConfig *config.EndpointsConfig - serviceConfig *config.ServiceConfig + once sync.Once + endpointSliceConfig *config.EndpointSliceConfig + endpointsConfig *config.EndpointsConfig + serviceConfig *config.ServiceConfig // endpointsChanges and serviceChanges contains all changes to endpoints and // services that happened since last syncProxyRules call. For a single object, // changes are accumulated. Once both endpointsChanges and serviceChanges @@ -64,11 +67,12 @@ type proxier struct { // serviceStringMapMutex protects serviceStringMap object. serviceStringMapMutex sync.Mutex - runner *k8sproxy.BoundedFrequencyRunner - stopChan <-chan struct{} - agentQuerier querier.AgentQuerier - ofClient openflow.Client - isIPv6 bool + runner *k8sproxy.BoundedFrequencyRunner + stopChan <-chan struct{} + agentQuerier querier.AgentQuerier + ofClient openflow.Client + isIPv6 bool + enableEndpointSlice bool } func (p *proxier) isInitialized() bool { @@ -105,6 +109,7 @@ func (p *proxier) removeStaleServices() { } } delete(p.serviceInstalledMap, svcPortName) + delete(p.endpointInstalledMap, svcPortName) p.deleteServiceByIP(svcInfo.String()) p.groupCounter.Recycle(svcPortName) } @@ -175,6 +180,49 @@ func smallSliceDifference(s1, s2 []string) []string { return diff } +// calculateServiceEndpointUpdateList calculates the Endpoints of the Service to be installed. +// If Endpoints of the Service have changed, all current Endpoints of the Service will be returned. Otherwise, it will +// return nil. +func (p *proxier) calculateServiceEndpointUpdateList(svcPortName k8sproxy.ServicePortName) ([]k8sproxy.Endpoint, bool) { + if _, ok := p.serviceMap[svcPortName]; !ok { // The Service is not expected to be installed. + return nil, false + } + var needUpdate bool + endpointsExpected, ok := p.endpointsMap[svcPortName] + if !ok || len(endpointsExpected) == 0 { // No endpoint for this Service. + return nil, false + } + endpointInstalled, ok := p.endpointInstalledMap[svcPortName] + if !ok { + p.endpointInstalledMap[svcPortName] = map[string]struct{}{} + endpointInstalled = p.endpointInstalledMap[svcPortName] + } + var endpointUpdateList []k8sproxy.Endpoint + for _, endpoint := range endpointsExpected { + _, installed := endpointInstalled[endpoint.String()] + if !installed { + needUpdate = true + } + endpointUpdateList = append(endpointUpdateList, endpoint) + } + if !needUpdate { + endpointUpdateList = nil + } + return endpointUpdateList, needUpdate +} + +func (p *proxier) installEndpoints(endpoints []k8sproxy.Endpoint, svcPortName k8sproxy.ServicePortName, svcInfo *types.ServiceInfo) error { + groupID, _ := p.groupCounter.Get(svcPortName) + if err := p.ofClient.InstallEndpointFlows(svcInfo.OFProtocol, endpoints, true); err != nil { + return err + } + err := p.ofClient.InstallServiceGroup(groupID, svcInfo.StickyMaxAgeSeconds() != 0, endpoints) + if err != nil { + return err + } + return nil +} + func (p *proxier) installServices() { for svcPortName, svcPort := range p.serviceMap { svcInfo := svcPort.(*types.ServiceInfo) @@ -276,6 +324,9 @@ func (p *proxier) installServices() { } p.serviceInstalledMap[svcPortName] = svcPort + for _, endpoint := range endpointUpdateList { + p.endpointInstalledMap[svcPortName][endpoint.String()] = struct{}{} + } p.addServiceByIP(svcInfo.String(), svcPortName) } } @@ -353,6 +404,31 @@ func (p *proxier) OnEndpointsSynced() { } } +func (p *proxier) OnEndpointSliceAdd(endpointSlice *v1beta1.EndpointSlice) { + if p.endpointsChanges.OnEndpointSliceUpdate(endpointSlice, false) && p.isInitialized() { + p.runner.Run() + } +} + +func (p *proxier) OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice *v1beta1.EndpointSlice) { + if p.endpointsChanges.OnEndpointSliceUpdate(newEndpointSlice, false) && p.isInitialized() { + p.runner.Run() + } +} + +func (p *proxier) OnEndpointSliceDelete(endpointSlice *v1beta1.EndpointSlice) { + if p.endpointsChanges.OnEndpointSliceUpdate(endpointSlice, true) && p.isInitialized() { + p.runner.Run() + } +} + +func (p *proxier) OnEndpointSlicesSynced() { + p.endpointsChanges.OnEndpointsSynced() + if p.isInitialized() { + p.runner.Run() + } +} + func (p *proxier) OnServiceAdd(service *corev1.Service) { p.OnServiceUpdate(nil, service) } @@ -412,7 +488,11 @@ func (p *proxier) deleteServiceByIP(serviceStr string) { func (p *proxier) Run(stopCh <-chan struct{}) { p.once.Do(func() { go p.serviceConfig.Run(stopCh) - go p.endpointsConfig.Run(stopCh) + if p.enableEndpointSlice { + go p.endpointSliceConfig.Run(stopCh) + } else { + go p.endpointsConfig.Run(stopCh) + } p.stopChan = stopCh p.SyncLoop() }) @@ -422,6 +502,7 @@ func NewProxier( hostname string, informerFactory informers.SharedInformerFactory, ofClient openflow.Client, + enableEndpointSlice bool, isIPv6 bool) *proxier { recorder := record.NewBroadcaster().NewRecorder( runtime.NewScheme(), @@ -430,10 +511,10 @@ func NewProxier( metrics.Register() klog.Infof("Creating proxier with IPv6 enabled=%t", isIPv6) p := &proxier{ - endpointsConfig: config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), resyncPeriod), + enableEndpointSlice: enableEndpointSlice, serviceConfig: config.NewServiceConfig(informerFactory.Core().V1().Services(), resyncPeriod), - endpointsChanges: newEndpointsChangesTracker(hostname), serviceChanges: newServiceChangesTracker(recorder, isIPv6), + endpointsChanges: newEndpointsChangesTracker(hostname, enableEndpointSlice), serviceMap: k8sproxy.ServiceMap{}, serviceInstalledMap: k8sproxy.ServiceMap{}, endpointInstalledMap: map[k8sproxy.ServicePortName]map[string]struct{}{}, @@ -444,7 +525,13 @@ func NewProxier( isIPv6: isIPv6, } p.serviceConfig.RegisterEventHandler(p) - p.endpointsConfig.RegisterEventHandler(p) + if enableEndpointSlice { + p.endpointSliceConfig = config.NewEndpointSliceConfig(informerFactory.Discovery().V1beta1().EndpointSlices(), resyncPeriod) + p.endpointSliceConfig.RegisterEventHandler(p) + } else { + p.endpointsConfig = config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), resyncPeriod) + p.endpointsConfig.RegisterEventHandler(p) + } p.runner = k8sproxy.NewBoundedFrequencyRunner(componentName, p.syncProxyRules, 0, 30*time.Second, -1) return p } @@ -453,10 +540,10 @@ func NewDualStackProxier( hostname string, informerFactory informers.SharedInformerFactory, ofClient openflow.Client) k8sproxy.Provider { // Create an ipv4 instance of the single-stack proxier - ipv4Proxier := NewProxier(hostname, informerFactory, ofClient, false) + ipv4Proxier := NewProxier(hostname, informerFactory, ofClient, features.DefaultFeatureGate.Enabled(features.EndpointSlice), false) // Create an ipv6 instance of the single-stack proxier - ipv6Proxier := NewProxier(hostname, informerFactory, ofClient, true) + ipv6Proxier := NewProxier(hostname, informerFactory, ofClient, features.DefaultFeatureGate.Enabled(features.EndpointSlice), true) // Return a meta-proxier that dispatch calls between the two // single-stack proxier instances diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index 3efe5349d49..d22ba5fb21c 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -88,7 +88,7 @@ func NewFakeProxier(ofClient openflow.Client, isIPv6 bool) *proxier { corev1.EventSource{Component: componentName, Host: hostname}, ) p := &proxier{ - endpointsChanges: newEndpointsChangesTracker(hostname), + endpointsChanges: newEndpointsChangesTracker(hostname, false), serviceChanges: newServiceChangesTracker(recorder, isIPv6), serviceMap: k8sproxy.ServiceMap{}, serviceInstalledMap: k8sproxy.ServiceMap{}, @@ -152,6 +152,61 @@ func testClusterIP(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) { fp.syncProxyRules() } +func TestLoadbalancer(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockOFClient := ofmock.NewMockClient(ctrl) + fp := NewFakeProxier(mockOFClient, false) + + svcIPv4 := net.ParseIP("10.20.30.41") + svcPort := 80 + loadBalancerIPv4 := net.ParseIP("169.254.0.1") + svcPortName := k8sproxy.ServicePortName{ + NamespacedName: makeNamespaceName("ns1", "svc1"), + Port: "80", + Protocol: corev1.ProtocolTCP, + } + makeServiceMap(fp, + makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) { + svc.Spec.ClusterIP = svcIPv4.String() + svc.Spec.LoadBalancerIP = loadBalancerIPv4.String() + svc.Spec.Type = corev1.ServiceTypeLoadBalancer + ingress := []corev1.LoadBalancerIngress{{IP: loadBalancerIPv4.String()}} + svc.Status.LoadBalancer.Ingress = ingress + svc.Spec.Ports = []corev1.ServicePort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: corev1.ProtocolTCP, + }} + }), + ) + + epIP := net.ParseIP("10.180.0.1") + makeEndpointsMap(fp, + makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *corev1.Endpoints) { + ept.Subsets = []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{ + IP: epIP.String(), + }}, + Ports: []corev1.EndpointPort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: corev1.ProtocolTCP, + }}, + }} + }), + ) + + groupID, _ := fp.groupCounter.Get(svcPortName) + mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCP, gomock.Any(), false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIPv4, uint16(svcPort), binding.ProtocolTCP, uint16(0)).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIPv4, uint16(svcPort), binding.ProtocolTCP, uint16(0)).Times(1) + mockOFClient.EXPECT().InstallLoadBalancerServiceFromOutsideFlows(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + + fp.syncProxyRules() +} + func TestClusterIPv4(t *testing.T) { testClusterIP(t, net.ParseIP("10.20.30.41"), net.ParseIP("10.180.0.1"), false) } @@ -250,7 +305,6 @@ func testClusterIPNoEndpoint(t *testing.T, svcIP net.IP, isIPv6 bool) { }), ) makeEndpointsMap(fp) - fp.syncProxyRules() } diff --git a/pkg/features/antrea_features.go b/pkg/features/antrea_features.go index 1ae07e0d4b1..a8e4aca2370 100644 --- a/pkg/features/antrea_features.go +++ b/pkg/features/antrea_features.go @@ -33,6 +33,11 @@ const ( // Allows to apply ClusterNetworkPolicy and AntreaNetworkPolicy CRDs. AntreaPolicy featuregate.Feature = "AntreaPolicy" + // alpha: v0.13 + // Enable EndpointSlice support in AntreaProxy. If AntreaProxy is not enabled, this + // flag will not take effect. + EndpointSlice featuregate.Feature = "EndpointSlice" + // alpha: v0.8 // beta: v0.11 // Enable antrea proxy which provides ServiceLB for in-cluster services in antrea agent. @@ -72,6 +77,7 @@ var ( defaultAntreaFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ AntreaPolicy: {Default: false, PreRelease: featuregate.Alpha}, AntreaProxy: {Default: true, PreRelease: featuregate.Beta}, + EndpointSlice: {Default: false, PreRelease: featuregate.Alpha}, Traceflow: {Default: true, PreRelease: featuregate.Beta}, FlowExporter: {Default: false, PreRelease: featuregate.Alpha}, NetworkPolicyStats: {Default: false, PreRelease: featuregate.Alpha}, diff --git a/third_party/proxy/config/config.go b/third_party/proxy/config/config.go index 16b4acfa4c2..9cc67c6fa94 100644 --- a/third_party/proxy/config/config.go +++ b/third_party/proxy/config/config.go @@ -39,8 +39,10 @@ import ( "time" "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1beta1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" coreinformers "k8s.io/client-go/informers/core/v1" + discoveryinformers "k8s.io/client-go/informers/discovery/v1beta1" "k8s.io/client-go/tools/cache" "k8s.io/klog" ) @@ -79,6 +81,23 @@ type EndpointsHandler interface { OnEndpointsSynced() } +// EndpointSliceHandler is an abstract interface of objects which receive +// notifications about endpoint slice object changes. +type EndpointSliceHandler interface { + // OnEndpointSliceAdd is called whenever creation of new endpoint slice + // object is observed. + OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) + // OnEndpointSliceUpdate is called whenever modification of an existing + // endpoint slice object is observed. + OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice *discovery.EndpointSlice) + // OnEndpointSliceDelete is called whenever deletion of an existing + // endpoint slice object is observed. + OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) + // OnEndpointSlicesSynced is called once all the initial event handlers were + // called and the state is fully propagated to local cache. + OnEndpointSlicesSynced() +} + // EndpointsConfig tracks a set of endpoints configurations. type EndpointsConfig struct { listerSynced cache.InformerSynced @@ -260,3 +279,94 @@ func (c *ServiceConfig) handleDeleteService(obj interface{}) { c.eventHandlers[i].OnServiceDelete(service) } } + +// EndpointSliceConfig tracks a set of endpoints configurations. +type EndpointSliceConfig struct { + listerSynced cache.InformerSynced + eventHandlers []EndpointSliceHandler +} + +// NewEndpointSliceConfig creates a new EndpointSliceConfig. +func NewEndpointSliceConfig(endpointSliceInformer discoveryinformers.EndpointSliceInformer, resyncPeriod time.Duration) *EndpointSliceConfig { + result := &EndpointSliceConfig{ + listerSynced: endpointSliceInformer.Informer().HasSynced, + } + + endpointSliceInformer.Informer().AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: result.handleAddEndpointSlice, + UpdateFunc: result.handleUpdateEndpointSlice, + DeleteFunc: result.handleDeleteEndpointSlice, + }, + resyncPeriod, + ) + + return result +} + +// RegisterEventHandler registers a handler which is called on every endpoint slice change. +func (c *EndpointSliceConfig) RegisterEventHandler(handler EndpointSliceHandler) { + c.eventHandlers = append(c.eventHandlers, handler) +} + +// Run waits for cache synced and invokes handlers after syncing. +func (c *EndpointSliceConfig) Run(stopCh <-chan struct{}) { + klog.Info("Starting endpoint slice config controller") + + if !cache.WaitForNamedCacheSync("endpoint slice config", stopCh, c.listerSynced) { + return + } + + for _, h := range c.eventHandlers { + klog.V(3).Infof("Calling handler.OnEndpointSlicesSynced()") + h.OnEndpointSlicesSynced() + } +} + +func (c *EndpointSliceConfig) handleAddEndpointSlice(obj interface{}) { + endpointSlice, ok := obj.(*discovery.EndpointSlice) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj)) + return + } + for _, h := range c.eventHandlers { + klog.V(4).Infof("Calling handler.OnEndpointSliceAdd %+v", endpointSlice) + h.OnEndpointSliceAdd(endpointSlice) + } +} + +func (c *EndpointSliceConfig) handleUpdateEndpointSlice(oldObj, newObj interface{}) { + oldEndpointSlice, ok := oldObj.(*discovery.EndpointSlice) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", newObj)) + return + } + newEndpointSlice, ok := newObj.(*discovery.EndpointSlice) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", newObj)) + return + } + for _, h := range c.eventHandlers { + klog.V(4).Infof("Calling handler.OnEndpointSliceUpdate") + h.OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice) + } +} + +func (c *EndpointSliceConfig) handleDeleteEndpointSlice(obj interface{}) { + endpointSlice, ok := obj.(*discovery.EndpointSlice) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj)) + return + } + if endpointSlice, ok = tombstone.Obj.(*discovery.EndpointSlice); !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj)) + return + } + } + for _, h := range c.eventHandlers { + klog.V(4).Infof("Calling handler.OnEndpointsDelete") + h.OnEndpointSliceDelete(endpointSlice) + } +} diff --git a/third_party/proxy/endpoints.go b/third_party/proxy/endpoints.go index 85e1383454b..7cc121e3bb4 100644 --- a/third_party/proxy/endpoints.go +++ b/third_party/proxy/endpoints.go @@ -40,6 +40,9 @@ Modifies: package proxy import ( + "net" + "strconv" + utilproxy "github.com/vmware-tanzu/antrea/third_party/proxy/util" ) @@ -85,3 +88,11 @@ func (info *BaseEndpointInfo) Port() (int, error) { func (info *BaseEndpointInfo) Equal(other Endpoint) bool { return info.String() == other.String() && info.GetIsLocal() == other.GetIsLocal() } + +func NewBaseEndpointInfo(IP string, port int, isLocal bool, topology map[string]string) *BaseEndpointInfo { + return &BaseEndpointInfo{ + Endpoint: net.JoinHostPort(IP, strconv.Itoa(port)), + IsLocal: isLocal, + Topology: topology, + } +}