Skip to content

Commit

Permalink
[FAB-5854] Add additional unit tests ApplyUpdates()
Browse files Browse the repository at this point in the history
Add unit tests for additional ApplyUpdates() scenarios:

1) If an individual update in the CouchDB bulk update fails,
we fall back to one-by-one processing of the failed updates with
SaveDoc().
This scenario needs additional unit tests.

Change-Id: I817f6672d5f1336a468e12e6875bf12841a1aaae
Signed-off-by: Chris Elder <chris.elder@us.ibm.com>
  • Loading branch information
Chris Elder committed Oct 23, 2017
1 parent 7f4f74d commit 329e392
Show file tree
Hide file tree
Showing 4 changed files with 260 additions and 76 deletions.
108 changes: 108 additions & 0 deletions core/ledger/kvledger/txmgmt/statedb/commontests/test_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,5 +614,113 @@ func TestSmallBatchSize(t *testing.T, dbProvider statedb.VersionedDBProvider) {

vv, _ = db.GetState("ns1", "key11")
testutil.AssertEquals(t, vv.Value, jsonValue11)
}

// TestBatchWithIndividualRetry tests a single failure in a batch
func TestBatchWithIndividualRetry(t *testing.T, dbProvider statedb.VersionedDBProvider) {

db, err := dbProvider.GetDBHandle("testbatchretry")
testutil.AssertNoError(t, err, "")

batch := statedb.NewUpdateBatch()
vv1 := statedb.VersionedValue{Value: []byte("value1"), Version: version.NewHeight(1, 1)}
vv2 := statedb.VersionedValue{Value: []byte("value2"), Version: version.NewHeight(1, 2)}
vv3 := statedb.VersionedValue{Value: []byte("value3"), Version: version.NewHeight(1, 3)}
vv4 := statedb.VersionedValue{Value: []byte("value4"), Version: version.NewHeight(1, 4)}

batch.Put("ns", "key1", vv1.Value, vv1.Version)
batch.Put("ns", "key2", vv2.Value, vv2.Version)
batch.Put("ns", "key3", vv3.Value, vv3.Version)
batch.Put("ns", "key4", vv4.Value, vv4.Version)
savePoint := version.NewHeight(1, 5)
err = db.ApplyUpdates(batch, savePoint)
testutil.AssertNoError(t, err, "")

// Clear the cache for the next batch, in place of simulation
if bulkdb, ok := db.(statedb.BulkOptimizable); ok {
//clear the cached versions, this will force a read when getVerion is called
bulkdb.ClearCachedVersions()
}

batch = statedb.NewUpdateBatch()
batch.Put("ns", "key1", vv1.Value, vv1.Version)
batch.Put("ns", "key2", vv2.Value, vv2.Version)
batch.Put("ns", "key3", vv3.Value, vv3.Version)
batch.Put("ns", "key4", vv4.Value, vv4.Version)
savePoint = version.NewHeight(1, 6)
err = db.ApplyUpdates(batch, savePoint)
testutil.AssertNoError(t, err, "")

// Update document key3
batch = statedb.NewUpdateBatch()
batch.Delete("ns", "key2", vv2.Version)
batch.Put("ns", "key3", vv3.Value, vv3.Version)
savePoint = version.NewHeight(1, 7)
err = db.ApplyUpdates(batch, savePoint)
testutil.AssertNoError(t, err, "")

// This should force a retry for couchdb revision conflict for both delete and update
// Retry logic should correct the update and prevent delete from throwing an error
batch = statedb.NewUpdateBatch()
batch.Delete("ns", "key2", vv2.Version)
batch.Put("ns", "key3", vv3.Value, vv3.Version)
savePoint = version.NewHeight(1, 8)
err = db.ApplyUpdates(batch, savePoint)
testutil.AssertNoError(t, err, "")

//Create a new set of values that use JSONs instead of binary
jsonValue5 := []byte(`{"asset_name": "marble5","color": "blue","size": 5,"owner": "fred"}`)
jsonValue6 := []byte(`{"asset_name": "marble6","color": "blue","size": 6,"owner": "elaine"}`)
jsonValue7 := []byte(`{"asset_name": "marble7","color": "blue","size": 7,"owner": "fred"}`)
jsonValue8 := []byte(`{"asset_name": "marble8","color": "blue","size": 8,"owner": "elaine"}`)

// Clear the cache for the next batch, in place of simulation
if bulkdb, ok := db.(statedb.BulkOptimizable); ok {
//clear the cached versions, this will force a read when getVersion is called
bulkdb.ClearCachedVersions()
}

batch = statedb.NewUpdateBatch()
batch.Put("ns1", "key5", jsonValue5, version.NewHeight(1, 9))
batch.Put("ns1", "key6", jsonValue6, version.NewHeight(1, 10))
batch.Put("ns1", "key7", jsonValue7, version.NewHeight(1, 11))
batch.Put("ns1", "key8", jsonValue8, version.NewHeight(1, 12))
savePoint = version.NewHeight(1, 6)
err = db.ApplyUpdates(batch, savePoint)
testutil.AssertNoError(t, err, "")

// Clear the cache for the next batch, in place of simulation
if bulkdb, ok := db.(statedb.BulkOptimizable); ok {
//clear the cached versions, this will force a read when getVersion is called
bulkdb.ClearCachedVersions()
}

//Send the batch through again to test updates
batch = statedb.NewUpdateBatch()
batch.Put("ns1", "key5", jsonValue5, version.NewHeight(1, 9))
batch.Put("ns1", "key6", jsonValue6, version.NewHeight(1, 10))
batch.Put("ns1", "key7", jsonValue7, version.NewHeight(1, 11))
batch.Put("ns1", "key8", jsonValue8, version.NewHeight(1, 12))
savePoint = version.NewHeight(1, 6)
err = db.ApplyUpdates(batch, savePoint)
testutil.AssertNoError(t, err, "")

// Update document key3
// this will cause an inconsistent cache entry for connection db2
batch = statedb.NewUpdateBatch()
batch.Delete("ns1", "key6", version.NewHeight(1, 13))
batch.Put("ns1", "key7", jsonValue7, version.NewHeight(1, 14))
savePoint = version.NewHeight(1, 15)
err = db.ApplyUpdates(batch, savePoint)
testutil.AssertNoError(t, err, "")

// This should force a retry for couchdb revision conflict for both delete and update
// Retry logic should correct the update and prevent delete from throwing an error
batch = statedb.NewUpdateBatch()
batch.Delete("ns1", "key6", version.NewHeight(1, 16))
batch.Put("ns1", "key7", jsonValue7, version.NewHeight(1, 17))
savePoint = version.NewHeight(1, 18)
err = db.ApplyUpdates(batch, savePoint)
testutil.AssertNoError(t, err, "")

}
63 changes: 52 additions & 11 deletions core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var binaryWrapper = "valueBytes"
// currently defaulted to 0 and is not used
var querySkip = 0

// BatchDocument defines a document for a batch
//BatchableDocument defines a document for a batch
type BatchableDocument struct {
CouchDoc couchdb.CouchDoc
Deleted bool
Expand Down Expand Up @@ -158,7 +158,7 @@ func (vdb *VersionedDB) GetState(namespace string, key string) (*statedb.Version
return &statedb.VersionedValue{Value: returnValue, Version: returnVersion}, nil
}

// GetVersion implements method in VersionedDB interface
//GetCachedVersion implements method in VersionedDB interface
func (vdb *VersionedDB) GetCachedVersion(namespace string, key string) (*version.Height, bool) {

logger.Debugf("Retrieving cached version: %s~%s", key, namespace)
Expand Down Expand Up @@ -214,9 +214,6 @@ func removeDataWrapper(wrappedValue []byte, attachments []*couchdb.AttachmentInf
// initialize the return value
returnValue := []byte{}

// initialize a default return version
returnVersion := version.NewHeight(0, 0)

// create a generic map for the json
jsonResult := make(map[string]interface{})

Expand All @@ -242,7 +239,7 @@ func removeDataWrapper(wrappedValue []byte, attachments []*couchdb.AttachmentInf

}

returnVersion = createVersionHeightFromVersionString(jsonResult["version"].(string))
returnVersion := createVersionHeightFromVersionString(jsonResult["version"].(string))

return returnValue, returnVersion

Expand Down Expand Up @@ -417,7 +414,7 @@ func (vdb *VersionedDB) processUpdateBatch(updateBatch *statedb.UpdateBatch, mis

// Use the batchUpdateMap for tracking couchdb updates by ID
// this will be used in case there are retries required
batchUpdateMap := make(map[string]interface{})
batchUpdateMap := make(map[string]*BatchableDocument)

namespaces := updateBatch.GetUpdatedNamespaces()
for _, ns := range namespaces {
Expand Down Expand Up @@ -465,8 +462,8 @@ func (vdb *VersionedDB) processUpdateBatch(updateBatch *statedb.UpdateBatch, mis
}
}

// Add the current docment to the update map
batchUpdateMap[string(compositeKey)] = BatchableDocument{CouchDoc: *couchDoc, Deleted: isDelete}
// Add the current docment, revision and delete flag to the update map
batchUpdateMap[string(compositeKey)] = &BatchableDocument{CouchDoc: *couchDoc, Deleted: isDelete}

}
}
Expand All @@ -476,7 +473,7 @@ func (vdb *VersionedDB) processUpdateBatch(updateBatch *statedb.UpdateBatch, mis
//Add the documents to the batch update array
batchUpdateDocs := []*couchdb.CouchDoc{}
for _, updateDocument := range batchUpdateMap {
batchUpdateDocument := updateDocument.(BatchableDocument)
batchUpdateDocument := updateDocument
batchUpdateDocs = append(batchUpdateDocs, &batchUpdateDocument.CouchDoc)
}

Expand All @@ -495,17 +492,35 @@ func (vdb *VersionedDB) processUpdateBatch(updateBatch *statedb.UpdateBatch, mis
// If the document returned an error, retry the individual document
if respDoc.Ok != true {

batchUpdateDocument := batchUpdateMap[respDoc.ID].(BatchableDocument)
batchUpdateDocument := batchUpdateMap[respDoc.ID]

var err error

//Remove the "_rev" from the JSON before saving
//this will allow the CouchDB retry logic to retry revisions without encountering
//a mismatch between the "If-Match" and the "_rev" tag in the JSON
if batchUpdateDocument.CouchDoc.JSONValue != nil {
err = removeJSONRevision(&batchUpdateDocument.CouchDoc.JSONValue)
if err != nil {
return err
}
}

// Check to see if the document was added to the batch as a delete type document
if batchUpdateDocument.Deleted {

//Log the warning message that a retry is being attempted for batch delete issue
logger.Warningf("CouchDB batch document delete encountered an problem. Retrying delete for document ID:%s", respDoc.ID)

// If this is a deleted document, then retry the delete
// If the delete fails due to a document not being found (404 error),
// the document has already been deleted and the DeleteDoc will not return an error
err = vdb.db.DeleteDoc(respDoc.ID, "")
} else {

//Log the warning message that a retry is being attempted for batch update issue
logger.Warningf("CouchDB batch document update encountered an problem. Retrying update for document ID:%s", respDoc.ID)

// Save the individual document to couchdb
// Note that this will do retries as needed
_, err = vdb.db.SaveDoc(respDoc.ID, "", &batchUpdateDocument.CouchDoc)
Expand Down Expand Up @@ -674,6 +689,32 @@ func createCouchdbDocJSON(id, revision string, value []byte, chaincodeID string,
return documentJSON
}

// removeJSONRevision removes the "_rev" if this is a JSON
func removeJSONRevision(jsonValue *[]byte) error {

jsonMap := make(map[string]interface{})

//Unmarshal the value into a map
err := json.Unmarshal(*jsonValue, &jsonMap)
if err != nil {
logger.Errorf("Failed to unmarshal couchdb JSON data %s\n", err.Error())
return err
}

//delete the "_rev" entry
delete(jsonMap, "_rev")

//marshal the updated map back into the byte array
*jsonValue, err = json.Marshal(jsonMap)
if err != nil {
logger.Errorf("Failed to marshal couchdb JSON data %s\n", err.Error())
return err
}

return nil

}

// Savepoint docid (key) for couchdb
const savepointDocID = "statedb_savepoint"

Expand Down
Loading

0 comments on commit 329e392

Please sign in to comment.