Skip to content

Commit

Permalink
Start ovs-vswitchd with flow-restore-wait
Browse files Browse the repository at this point in the history
This patch starts ovs-vswitchd with flow-restore-wait set to true and
removes the config after restoring necessary flows for the following
reasons:

1. It prevents packets from being mishandled by ovs-vswitchd in its
default fashion, which could affect existing connections' conntrack
state and cause issues like antrea-io#625.

2. It prevents ovs-vswitchd from flushing or expiring previously set
datapath flows, so existing connections can achieve 0 downtime during
OVS restart.

As a result, we remove the config here after restoring necessary flows.
  • Loading branch information
tnqn authored and GraysonWu committed Sep 18, 2020
1 parent c178b0c commit 4a9e156
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 3 deletions.
22 changes: 20 additions & 2 deletions build/images/scripts/start_ovs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,33 @@

source logging
source daemon_status
source /usr/share/openvswitch/scripts/ovs-lib

CONTAINER_NAME="antrea-ovs"
OVS_DB_FILE="/var/run/openvswitch/conf.db"

set -euo pipefail

function start_ovs {
log_info $CONTAINER_NAME "Starting OVS"
/usr/share/openvswitch/scripts/ovs-ctl --system-id=random start --db-file=$OVS_DB_FILE
if daemon_is_running ovsdb-server; then
log_info $CONTAINER_NAME "ovsdb-server is already running"
else
log_info $CONTAINER_NAME "Starting ovsdb-server"
/usr/share/openvswitch/scripts/ovs-ctl --no-ovs-vswitchd --system-id=random start --db-file=$OVS_DB_FILE
log_info $CONTAINER_NAME "Started ovsdb-server"
fi

if daemon_is_running ovs-vswitchd; then
log_info $CONTAINER_NAME "ovs-vswitchd is already running"
else
log_info $CONTAINER_NAME "Starting ovs-vswitchd"
# Start ovs-vswitchd with flow-restore-wait set to true so that packets won't be
# mishandled in its default fashion, the config will be removed after antrea-agent
# restoring flows.
ovs-vsctl --no-wait set open_vswitch . other_config:flow-restore-wait="true"
/usr/share/openvswitch/scripts/ovs-ctl --no-ovsdb-server --system-id=random start --db-file=$OVS_DB_FILE
log_info $CONTAINER_NAME "Started ovs-vswitchd"
fi
}

function stop_ovs {
Expand Down
4 changes: 4 additions & 0 deletions build/images/scripts/start_ovs_netdev
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ function del_br_phy {

function start_ovs {
log_info $CONTAINER_NAME "Starting OVS"
# Unlike the start_ovs script, we don't set flow-restore-wait when starting OVS
# with the netdev datapath. This is because the Node's network relies on the
# forwarding of OVS so we cannot get Node, Pod, NetworkPolicy data from
# Kubernetes API to install proper flows before removing flow-restore-wait.
/usr/share/openvswitch/scripts/ovs-ctl --system-id=random start --db-file=$OVS_DB_FILE
}

Expand Down
6 changes: 6 additions & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ func run(o *Options) error {
return fmt.Errorf("error initializing CNI server: %v", err)
}

// TODO: we should call this after installing flows for initial node routes
// and initial NetworkPolicies so that no packets will be mishandled.
if err := agentInitializer.FlowRestoreComplete(); err != nil {
return err
}

// set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will
// cause the stopCh channel to be closed; if another signal is received before the program
// exits, we will force exit.
Expand Down
32 changes: 32 additions & 0 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/containernetworking/plugins/pkg/ip"
"github.com/vishvananda/netlink"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog"

Expand Down Expand Up @@ -290,12 +291,43 @@ func (i *Initializer) initOpenFlowPipeline() error {
klog.Info("Replaying OF flows to OVS bridge")
i.ofClient.ReplayFlows()
klog.Info("Flow replay completed")

// ofClient and ovsBridgeClient have their own mechanisms to restore connections with OVS, and it could
// happen that ovsBridgeClient's connection is not ready when ofClient completes flow replay. We retry it
// with a timeout that is longer time than ovsBridgeClient's maximum connecting retry interval (8 seconds)
// to ensure the flag can be removed successfully.
err := wait.PollImmediate(200*time.Millisecond, 10*time.Second, func() (done bool, err error) {
if err := i.FlowRestoreComplete(); err != nil {
return false, nil
}
return true, nil
})
// This shouldn't happen unless OVS is disconnected again after replaying flows. If it happens, we will try
// to clean up the config again so an error log should be fine.
if err != nil {
klog.Errorf("Failed to clean up flow-restore-wait config: %v", err)
}
}
}()

return nil
}

func (i *Initializer) FlowRestoreComplete() error {
// ovs-vswitchd is started with flow-restore-wait set to true for the following reasons:
// 1. It prevents packets from being mishandled by ovs-vswitchd in its default fashion,
// which could affect existing connections' conntrack state and cause issues like #625.
// 2. It prevents ovs-vswitchd from flushing or expiring previously set datapath flows,
// so existing connections can achieve 0 downtime during OVS restart.
// As a result, we remove the config here after restoring necessary flows.
klog.Info("Cleaning up flow-restore-wait config")
if err := i.ovsBridgeClient.DeleteOVSOtherConfig(map[string]interface{}{"flow-restore-wait": "true"}); err != nil {
return fmt.Errorf("error when cleaning up flow-restore-wait config: %v", err)
}
klog.Info("Cleaned up flow-restore-wait config")
return nil
}

// setupGatewayInterface creates the host gateway interface which is an internal port on OVS. The ofport for host
// gateway interface is predefined, so invoke CreateInternalPort with a specific ofport_request
func (i *Initializer) setupGatewayInterface() error {
Expand Down
3 changes: 3 additions & 0 deletions pkg/ovs/ovsconfig/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,7 @@ type OVSBridgeClient interface {
GetPortList() ([]OVSPortData, Error)
SetInterfaceMTU(name string, MTU int) error
GetOVSVersion() (string, Error)
AddOVSOtherConfig(configs map[string]interface{}) Error
GetOVSOtherConfig() (map[string]string, Error)
DeleteOVSOtherConfig(configs map[string]interface{}) Error
}
63 changes: 63 additions & 0 deletions pkg/ovs/ovsconfig/ovs_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,3 +673,66 @@ func (br *OVSBridge) GetOVSVersion() (string, Error) {

return res[0].Rows[0].(map[string]interface{})["ovs_version"].(string), nil
}

// AddOVSOtherConfig adds the given configs to the "other_config" column of
// the single record of the "Open_vSwitch" table.
// For each config, it will only be added if its key doesn't already exist.
// No error is returned if configs already exist.
func (br *OVSBridge) AddOVSOtherConfig(configs map[string]interface{}) Error {
tx := br.ovsdb.Transaction(openvSwitchSchema)

mutateSet := helpers.MakeOVSDBMap(configs)
tx.Mutate(dbtransaction.Mutate{
Table: "Open_vSwitch",
Mutations: [][]interface{}{{"other_config", "insert", mutateSet}},
})

_, err, temporary := tx.Commit()
if err != nil {
klog.Error("Transaction failed: ", err)
return NewTransactionError(err, temporary)
}
return nil
}

func (br *OVSBridge) GetOVSOtherConfig() (map[string]string, Error) {
tx := br.ovsdb.Transaction(openvSwitchSchema)

tx.Select(dbtransaction.Select{
Table: "Open_vSwitch",
Columns: []string{"other_config"},
})

res, err, temporary := tx.Commit()
if err != nil {
klog.Error("Transaction failed: ", err)
return nil, NewTransactionError(err, temporary)
}
if len(res[0].Rows) == 0 {
klog.Warning("Could not find other_config")
return nil, nil
}
otherConfigs := res[0].Rows[0].(map[string]interface{})["other_config"].([]interface{})
return buildMapFromOVSDBMap(otherConfigs), nil
}

// DeleteOVSOtherConfig deletes the given configs from the "other_config" column of
// the single record of the "Open_vSwitch" table.
// For each config, it will only be deleted if its key exists and its value matches the stored one.
// No error is returned if configs don't exist or don't match.
func (br *OVSBridge) DeleteOVSOtherConfig(configs map[string]interface{}) Error {
tx := br.ovsdb.Transaction(openvSwitchSchema)

mutateSet := helpers.MakeOVSDBMap(configs)
tx.Mutate(dbtransaction.Mutate{
Table: "Open_vSwitch",
Mutations: [][]interface{}{{"other_config", "delete", mutateSet}},
})

_, err, temporary := tx.Commit()
if err != nil {
klog.Error("Transaction failed: ", err)
return NewTransactionError(err, temporary)
}
return nil
}
45 changes: 44 additions & 1 deletion pkg/ovs/ovsconfig/testing/mock_ovsconfig.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions test/integration/ovs/ovs_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,36 @@ func TestOVSBridgeExternalIDs(t *testing.T) {
}
}

func TestOVSOtherConfig(t *testing.T) {
data := &testData{}
data.setup(t)
defer data.teardown(t)

otherConfigs := map[string]interface{}{"flow-restore-wait": "true", "foo1": "bar1"}
err := data.br.AddOVSOtherConfig(otherConfigs)
require.Nil(t, err, "Error when adding OVS other_config")

gotOtherConfigs, err := data.br.GetOVSOtherConfig()
require.Nil(t, err, "Error when getting OVS other_config")
require.Equal(t, map[string]string{"flow-restore-wait": "true", "foo1": "bar1"}, gotOtherConfigs, "other_config mismatched")

// Expect only the new config "foo2: bar2" will be added.
err = data.br.AddOVSOtherConfig(map[string]interface{}{"flow-restore-wait": "false", "foo2": "bar2"})
require.Nil(t, err, "Error when adding OVS other_config")

gotOtherConfigs, err = data.br.GetOVSOtherConfig()
require.Nil(t, err, "Error when getting OVS other_config")
require.Equal(t, map[string]string{"flow-restore-wait": "true", "foo1": "bar1", "foo2": "bar2"}, gotOtherConfigs, "other_config mismatched")

// Expect only the matched config "flow-restore-wait: true" will be deleted.
err = data.br.DeleteOVSOtherConfig(map[string]interface{}{"flow-restore-wait": "true", "foo1": "bar2"})
require.Nil(t, err, "Error when deleting OVS other_config")

gotOtherConfigs, err = data.br.GetOVSOtherConfig()
require.Nil(t, err, "Error when getting OVS other_config")
require.Equal(t, map[string]string{"foo1": "bar1", "foo2": "bar2"}, gotOtherConfigs, "other_config mismatched")
}

func deleteAllPorts(t *testing.T, br *ovsconfig.OVSBridge) {
portList, err := br.GetPortUUIDList()
require.Nil(t, err, "Error when retrieving port list")
Expand Down

0 comments on commit 4a9e156

Please sign in to comment.