Skip to content

Commit

Permalink
FAB-3046 Add CouchDB batch select operations
Browse files Browse the repository at this point in the history
This is change 2 of 4 for FAB-2725 CouchDB optimizations

Motivation for this change:
Interactions with CouchDB are currently done individually.
Need to switch to using bulk operations to get optimal performance
from CouchDB. Need to performance test and stress test.

- Add bulk select methods to couchdb

Change-Id: I463530e0875176de0e0ab3cdf7971b1e3d7b4b36
Signed-off-by: Chris Elder <chris.elder@us.ibm.com>
  • Loading branch information
Chris Elder committed Apr 10, 2017
1 parent 0640d43 commit a932b54
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 1 deletion.
64 changes: 64 additions & 0 deletions core/ledger/util/couchdb/couchdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,70 @@ func (dbclient *CouchDatabase) QueryDocuments(query string) (*[]QueryResult, err

}

//BatchRetrieveIDRevision - batch method to retrieve IDs and revisions
func (dbclient *CouchDatabase) BatchRetrieveIDRevision(keys []string) ([]*DocMetadata, error) {

batchURL, err := url.Parse(dbclient.CouchInstance.conf.URL)
if err != nil {
logger.Errorf("URL parse error: %s", err.Error())
return nil, err
}
batchURL.Path = dbclient.DBName + "/_all_docs"

queryParms := batchURL.Query()
queryParms.Add("include_docs", "true")
batchURL.RawQuery = queryParms.Encode()

keymap := make(map[string]interface{})

keymap["keys"] = keys

jsonKeys, err := json.Marshal(keymap)
if err != nil {
return nil, err
}

//Set up a buffer for the data response from CouchDB
data := new(bytes.Buffer)

data.ReadFrom(bytes.NewReader(jsonKeys))

resp, _, err := dbclient.CouchInstance.handleRequest(http.MethodPost, batchURL.String(), data, "", "")
if err != nil {
return nil, err
}
defer resp.Body.Close()

if logger.IsEnabledFor(logging.DEBUG) {
dump, _ := httputil.DumpResponse(resp, false)
// compact debug log by replacing carriage return / line feed with dashes to separate http headers
logger.Debugf("HTTP Response: %s", bytes.Replace(dump, []byte{0x0d, 0x0a}, []byte{0x20, 0x7c, 0x20}, -1))
}

//handle as JSON document
jsonResponseRaw, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}

var jsonResponse = &BatchRetrieveDocMedatadataResponse{}

err2 := json.Unmarshal(jsonResponseRaw, &jsonResponse)
if err2 != nil {
return nil, err2
}

revisionDocs := []*DocMetadata{}

for _, row := range jsonResponse.Rows {
revisionDoc := &DocMetadata{ID: row.ID, Rev: row.Doc.Rev, Version: row.Doc.Version}
revisionDocs = append(revisionDocs, revisionDoc)
}

return revisionDocs, nil

}

//BatchUpdateDocuments - batch method to batch update documents
func (dbclient *CouchDatabase) BatchUpdateDocuments(documents []*CouchDoc) ([]*BatchUpdateResponse, error) {

Expand Down
112 changes: 111 additions & 1 deletion core/ledger/util/couchdb/couchdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ func TestRichQuery(t *testing.T) {
}
}

func TestBatchCreateRetrieve(t *testing.T) {
func TestBatchBatchOperations(t *testing.T) {

if ledgerconfig.IsCouchDBEnabled() == true {

Expand Down Expand Up @@ -893,5 +893,115 @@ func TestBatchCreateRetrieve(t *testing.T) {
testutil.AssertEquals(t, updateDoc.Error, updateDocumentConflictError)
testutil.AssertEquals(t, updateDoc.Reason, updateDocumentConflictReason)
}

//----------------------------------------------
//Test Batch Retrieve Keys and Update

var keys []string

keys = append(keys, "marble01")
keys = append(keys, "marble03")

batchRevs, err := db.BatchRetrieveIDRevision(keys)
testutil.AssertNoError(t, err, fmt.Sprintf("Error when attempting retrieve revisions"))

batchUpdateDocs = []*CouchDoc{}

//iterate through the revision docs
for _, revdoc := range batchRevs {
if revdoc.ID == "marble01" {
//update the json with the rev and add to the batch
marble01Doc := addRevisionAndDeleteStatus(revdoc.Rev, byteJSON01, false)
batchUpdateDocs = append(batchUpdateDocs, &CouchDoc{JSONValue: marble01Doc, Attachments: attachments1})
}

if revdoc.ID == "marble03" {
//update the json with the rev and add to the batch
marble03Doc := addRevisionAndDeleteStatus(revdoc.Rev, byteJSON03, false)
batchUpdateDocs = append(batchUpdateDocs, &CouchDoc{JSONValue: marble03Doc, Attachments: attachments3})
}
}

//Update couchdb with the batch
batchUpdateResp, err = db.BatchUpdateDocuments(batchUpdateDocs)
testutil.AssertNoError(t, err, fmt.Sprintf("Error when attempting to update a batch of documents"))
//check to make sure each batch update response was successful
for _, updateDoc := range batchUpdateResp {
testutil.AssertEquals(t, updateDoc.Ok, true)
}

//----------------------------------------------
//Test Batch Delete

keys = []string{}

keys = append(keys, "marble02")
keys = append(keys, "marble04")

batchRevs, err = db.BatchRetrieveIDRevision(keys)
testutil.AssertNoError(t, err, fmt.Sprintf("Error when attempting retrieve revisions"))

batchUpdateDocs = []*CouchDoc{}

//iterate through the revision docs
for _, revdoc := range batchRevs {
if revdoc.ID == "marble02" {
//update the json with the rev and add to the batch
marble02Doc := addRevisionAndDeleteStatus(revdoc.Rev, byteJSON02, true)
batchUpdateDocs = append(batchUpdateDocs, &CouchDoc{JSONValue: marble02Doc, Attachments: attachments1})
}
if revdoc.ID == "marble04" {
//update the json with the rev and add to the batch
marble04Doc := addRevisionAndDeleteStatus(revdoc.Rev, byteJSON04, true)
batchUpdateDocs = append(batchUpdateDocs, &CouchDoc{JSONValue: marble04Doc, Attachments: attachments3})
}
}

//Update couchdb with the batch
batchUpdateResp, err = db.BatchUpdateDocuments(batchUpdateDocs)
testutil.AssertNoError(t, err, fmt.Sprintf("Error when attempting to update a batch of documents"))

//check to make sure each batch update response was successful
for _, updateDoc := range batchUpdateResp {
testutil.AssertEquals(t, updateDoc.Ok, true)
}

//Retrieve the test document
dbGetResp, _, geterr = db.ReadDoc("marble02")
testutil.AssertNoError(t, geterr, fmt.Sprintf("Error when trying to retrieve a document"))

//assert the value was deleted
testutil.AssertNil(t, dbGetResp)

//Retrieve the test document
dbGetResp, _, geterr = db.ReadDoc("marble04")
testutil.AssertNoError(t, geterr, fmt.Sprintf("Error when trying to retrieve a document"))

//assert the value was deleted
testutil.AssertNil(t, dbGetResp)
}
}

//addRevisionAndDeleteStatus adds keys for version and chaincodeID to the JSON value
func addRevisionAndDeleteStatus(revision string, value []byte, deleted bool) []byte {

//create a version mapping
jsonMap := make(map[string]interface{})

json.Unmarshal(value, &jsonMap)

//add the revision
if revision != "" {
jsonMap["_rev"] = revision
}

//If this record is to be deleted, set the "_deleted" property to true
if deleted {
jsonMap["_deleted"] = true
}
//marshal the data to a byte array
returnJSON, _ := json.Marshal(jsonMap)

return returnJSON

}

0 comments on commit a932b54

Please sign in to comment.