Skip to content

Commit

Permalink
Fix dynamodb HA lock race (#6512)
Browse files Browse the repository at this point in the history
* Fix DynamoDB HA race issue

* Add test for race condition (which fails on the released DynamoDB code)
  • Loading branch information
mahmoudm authored and briankassouf committed Apr 2, 2019
1 parent 5dbd372 commit 98079d8
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 12 deletions.
32 changes: 20 additions & 12 deletions physical/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ func (l *DynamoDBLock) tryToLock(stop, success chan struct{}, errors chan error)
case <-stop:
ticker.Stop()
case <-ticker.C:
err := l.writeItem()
err := l.updateItem(true)
if err != nil {
if err, ok := err.(awserr.Error); ok {
// Don't report a condition check failure, this means that the lock
Expand All @@ -633,7 +633,8 @@ func (l *DynamoDBLock) periodicallyRenewLock(done chan struct{}) {
for {
select {
case <-ticker.C:
l.writeItem()
// This should not renew the lock if the lock was deleted from under you.
l.updateItem(false)
case <-done:
ticker.Stop()
return
Expand All @@ -643,9 +644,24 @@ func (l *DynamoDBLock) periodicallyRenewLock(done chan struct{}) {

// Attempts to put/update the dynamodb item using condition expressions to
// evaluate the TTL.
func (l *DynamoDBLock) writeItem() error {
func (l *DynamoDBLock) updateItem(createIfMissing bool) error {
now := time.Now()

conditionExpression := ""
if createIfMissing {
conditionExpression += "attribute_not_exists(#path) or " +
"attribute_not_exists(#key) or "
} else {
conditionExpression += "attribute_exists(#path) and " +
"attribute_exists(#key) and "
}

// To work when upgrading from older versions that did not include the
// Identity attribute, we first check if the attr doesn't exist, and if
// it does, then we check if the identity is equal to our own.
// We also write if the lock expired.
conditionExpression += "(attribute_not_exists(#identity) or #identity = :identity or #expires <= :now)"

_, err := l.backend.client.UpdateItem(&dynamodb.UpdateItemInput{
TableName: aws.String(l.backend.table),
Key: map[string]*dynamodb.AttributeValue{
Expand All @@ -657,15 +673,7 @@ func (l *DynamoDBLock) writeItem() error {
// A. identity is equal to our identity (or the identity doesn't exist)
// or
// B. The ttl on the item is <= to the current time
ConditionExpression: aws.String(
"attribute_not_exists(#path) or " +
"attribute_not_exists(#key) or " +
// To work when upgrading from older versions that did not include the
// Identity attribute, we first check if the attr doesn't exist, and if
// it does, then we check if the identity is equal to our own.
"(attribute_not_exists(#identity) or #identity = :identity) or " +
"#expires <= :now",
),
ConditionExpression: aws.String(conditionExpression),
ExpressionAttributeNames: map[string]*string{
"#path": aws.String("Path"),
"#key": aws.String("Key"),
Expand Down
85 changes: 85 additions & 0 deletions physical/dynamodb/dynamodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func TestDynamoDBHABackend(t *testing.T) {

physical.ExerciseHABackend(t, b.(physical.HABackend), b2.(physical.HABackend))
testDynamoDBLockTTL(t, b.(physical.HABackend))
testDynamoDBLockRewewal(t, b.(physical.HABackend))
}

// Similar to testHABackend, but using internal implementation details to
Expand Down Expand Up @@ -276,6 +277,90 @@ func testDynamoDBLockTTL(t *testing.T, ha physical.HABackend) {
lock2.Unlock()
}

// Similar to testHABackend, but using internal implementation details to
// trigger a renewal before a "watch" check, which has been a source of
// race conditions.
func testDynamoDBLockRewewal(t *testing.T, ha physical.HABackend) {
renewInterval := time.Second * 1
watchInterval := time.Second * 5

// Get the lock
origLock, err := ha.LockWith("dynamodbrenewal", "bar")
if err != nil {
t.Fatalf("err: %v", err)
}

// customize the renewal and watch intervals
lock := origLock.(*DynamoDBLock)
lock.renewInterval = renewInterval
lock.watchRetryInterval = watchInterval

// Attempt to lock
leaderCh, err := lock.Lock(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if leaderCh == nil {
t.Fatalf("failed to get leader ch")
}

// Check the value
held, val, err := lock.Value()
if err != nil {
t.Fatalf("err: %v", err)
}
if !held {
t.Fatalf("should be held")
}
if val != "bar" {
t.Fatalf("bad value: %v", err)
}

// Release the lock, which will delete the stored item
if err := lock.Unlock(); err != nil {
t.Fatalf("err: %v", err)
}

// Wait longer than the renewal time, but less than the watch time
time.Sleep(1500 * time.Millisecond)

// Attempt to lock with new lock
newLock, err := ha.LockWith("dynamodbrenewal", "baz")
if err != nil {
t.Fatalf("err: %v", err)
}

// Cancel attempt in 6 sec so as not to block unit tests forever
stopCh := make(chan struct{})
time.AfterFunc(6*time.Second, func() {
close(stopCh)
})

// Attempt to lock should work
leaderCh2, err := newLock.Lock(stopCh)
if err != nil {
t.Fatalf("err: %v", err)
}
if leaderCh2 == nil {
t.Fatalf("should get leader ch")
}

// Check the value
held, val, err = newLock.Value()
if err != nil {
t.Fatalf("err: %v", err)
}
if !held {
t.Fatalf("should be held")
}
if val != "baz" {
t.Fatalf("bad value: %v", err)
}

// Cleanup
newLock.Unlock()
}

func prepareDynamoDBTestContainer(t *testing.T) (cleanup func(), retAddress string, creds *credentials.Credentials) {
// If environment variable is set, assume caller wants to target a real
// DynamoDB.
Expand Down

0 comments on commit 98079d8

Please sign in to comment.