diff --git a/core/ledger/kvledger/kv_ledger.go b/core/ledger/kvledger/kv_ledger.go index afb1d18d3fd..84599647b9d 100644 --- a/core/ledger/kvledger/kv_ledger.go +++ b/core/ledger/kvledger/kv_ledger.go @@ -77,12 +77,13 @@ func NewKVLedger(conf *Conf) (*KVLedger, error) { if kvledgerconfig.IsCouchDBEnabled() == true { //By default we can talk to CouchDB with empty id and pw (""), or you can add your own id and password to talk to a secured CouchDB logger.Debugf("===COUCHDB=== NewKVLedger() Using CouchDB instead of RocksDB...hardcoding and passing connection config for now") + //TODO Hardcoding and passing connection config for now, eventually this will be passed from external config txmgmt := couchdbtxmgmt.NewCouchDBTxMgr(&couchdbtxmgmt.Conf{DBPath: conf.txMgrDBPath}, - "127.0.0.1", //couchDB host - 5984, //couchDB port - "marbles_app", //couchDB db name - "", //enter couchDB id here - "") //enter couchDB pw here + "127.0.0.1", //couchDB host + 5984, //couchDB port + "system", //couchDB db name matches ledger name, TODO for now use system ledger, eventually allow passing in subledger name + "", //enter couchDB id here + "") //enter couchDB pw here return &KVLedger{blockStore, txmgmt, nil}, nil } diff --git a/core/ledger/kvledger/kvledgerconfig/kv_ledger_config.go b/core/ledger/kvledger/kvledgerconfig/kv_ledger_config.go index 713a3472d48..2683329dcfe 100644 --- a/core/ledger/kvledger/kvledgerconfig/kv_ledger_config.go +++ b/core/ledger/kvledger/kvledgerconfig/kv_ledger_config.go @@ -17,6 +17,8 @@ limitations under the License. package kvledgerconfig // Change this feature toggle to true to use CouchDB for state database +// TODO Eventually this feature toggle will be externalized via a real +// config option on the peer var useCouchDB = false //IsCouchDBEnabled exposes the useCouchDB variable diff --git a/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb/couchdb.go b/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb/couchdb.go index 6885ce0a8dc..71b5d8ed371 100644 --- a/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb/couchdb.go +++ b/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb/couchdb.go @@ -62,8 +62,8 @@ type DBInfo struct { InstanceStartTime string `json:"instance_start_time"` } -// DBConnectionDef contains parameters -type DBConnectionDef struct { +// CouchDBConnectionDef contains parameters +type CouchDBConnectionDef struct { URL string Username string Password string @@ -78,7 +78,7 @@ type CouchDBReturn struct { } //CreateConnectionDefinition for a new client connection -func CreateConnectionDefinition(host string, port int, databaseName, username, password string) (*DBConnectionDef, error) { +func CreateConnectionDefinition(host string, port int, databaseName, username, password string) (*CouchDBConnectionDef, error) { logger.Debugf("===COUCHDB=== Entering CreateConnectionDefinition()") @@ -94,6 +94,7 @@ func CreateConnectionDefinition(host string, port int, databaseName, username, p //parse the constructed URL to verify no errors finalURL, err := url.Parse(urlConcat) if err != nil { + logger.Errorf("===COUCHDB=== URL parse error: %s", err.Error()) return nil, err } @@ -102,11 +103,11 @@ func CreateConnectionDefinition(host string, port int, databaseName, username, p logger.Debugf("===COUCHDB=== Exiting CreateConnectionDefinition()") //return an object containing the connection information - return &DBConnectionDef{finalURL.String(), username, password, databaseName}, nil + return &CouchDBConnectionDef{finalURL.String(), username, password, databaseName}, nil } //CreateDatabaseIfNotExist method provides function to create database -func (dbclient *DBConnectionDef) CreateDatabaseIfNotExist() (*DBOperationResponse, error) { +func (dbclient *CouchDBConnectionDef) CreateDatabaseIfNotExist() (*DBOperationResponse, error) { logger.Debugf("===COUCHDB=== Entering CreateDatabaseIfNotExist()") @@ -154,7 +155,7 @@ func (dbclient *DBConnectionDef) CreateDatabaseIfNotExist() (*DBOperationRespons } //GetDatabaseInfo method provides function to retrieve database information -func (dbclient *DBConnectionDef) GetDatabaseInfo() (*DBInfo, *CouchDBReturn, error) { +func (dbclient *CouchDBConnectionDef) GetDatabaseInfo() (*DBInfo, *CouchDBReturn, error) { url := fmt.Sprintf("%s/%s", dbclient.URL, dbclient.Database) @@ -167,12 +168,20 @@ func (dbclient *DBConnectionDef) GetDatabaseInfo() (*DBInfo, *CouchDBReturn, err dbResponse := &DBInfo{} json.NewDecoder(resp.Body).Decode(&dbResponse) + // trace the database info response + if logger.IsEnabledFor(logging.DEBUG) { + dbResponseJSON, err := json.Marshal(dbResponse) + if err == nil { + logger.Debugf("===COUCHDB=== GetDatabaseInfo() dbResponseJSON: %s", dbResponseJSON) + } + } + return dbResponse, couchDBReturn, nil } //DropDatabase provides method to drop an existing database -func (dbclient *DBConnectionDef) DropDatabase() (*DBOperationResponse, error) { +func (dbclient *CouchDBConnectionDef) DropDatabase() (*DBOperationResponse, error) { logger.Debugf("===COUCHDB=== Entering DropDatabase()") @@ -206,7 +215,7 @@ func (dbclient *DBConnectionDef) DropDatabase() (*DBOperationResponse, error) { } //SaveDoc method provides a function to save a document, id and byte array -func (dbclient *DBConnectionDef) SaveDoc(id string, bytesDoc []byte) (string, error) { +func (dbclient *CouchDBConnectionDef) SaveDoc(id string, bytesDoc []byte) (string, error) { logger.Debugf("===COUCHDB=== Entering SaveDoc()") @@ -246,7 +255,7 @@ func getRevisionHeader(resp *http.Response) (string, error) { } //ReadDoc method provides function to retrieve a document from the database by id -func (dbclient *DBConnectionDef) ReadDoc(id string) ([]byte, string, error) { +func (dbclient *CouchDBConnectionDef) ReadDoc(id string) ([]byte, string, error) { logger.Debugf("===COUCHDB=== Entering ReadDoc() id=%s", id) @@ -277,7 +286,7 @@ func (dbclient *DBConnectionDef) ReadDoc(id string) ([]byte, string, error) { } //handleRequest method is a generic http request handler -func (dbclient *DBConnectionDef) handleRequest(method, url string, data io.Reader) (*http.Response, *CouchDBReturn, error) { +func (dbclient *CouchDBConnectionDef) handleRequest(method, url string, data io.Reader) (*http.Response, *CouchDBReturn, error) { logger.Debugf("===COUCHDB=== Entering handleRequest() method=%s url=%s", method, url) diff --git a/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_tx_simulator.go b/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_tx_simulator.go index e28d3b3fae4..51a215fb753 100644 --- a/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_tx_simulator.go +++ b/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_tx_simulator.go @@ -21,6 +21,7 @@ import ( "reflect" "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt" + logging "github.com/op/go-logging" ) type kvReadCache struct { @@ -62,19 +63,41 @@ func (s *CouchDBTxSimulator) GetState(ns string, key string) ([]byte, error) { // check if it was written kvWrite, ok := nsRWs.writeMap[key] if ok { - logger.Debugf("Returing value for key=[%s:%s] from write set", ns, key, kvWrite.Value) + // trace the first 500 bytes of value only, in case it is huge + if logger.IsEnabledFor(logging.DEBUG) { + if len(kvWrite.Value) < 500 { + logger.Debugf("Returing value for key=[%s:%s] from write set", ns, key, kvWrite.Value) + } else { + logger.Debugf("Returing value for key=[%s:%s] from write set", ns, key, kvWrite.Value[0:500]) + } + } return kvWrite.Value, nil } // check if it was read readCache, ok := nsRWs.readMap[key] if ok { - logger.Debugf("Returing value for key=[%s:%s] from read set", ns, key, readCache.cachedValue) + // trace the first 500 bytes of value only, in case it is huge + if logger.IsEnabledFor(logging.DEBUG) { + if len(readCache.cachedValue) < 500 { + logger.Debugf("Returing value for key=[%s:%s] from read set", ns, key, readCache.cachedValue) + } else { + logger.Debugf("Returing value for key=[%s:%s] from read set", ns, key, readCache.cachedValue[0:500]) + } + } return readCache.cachedValue, nil } // read from storage value, version, err := s.txmgr.getCommittedValueAndVersion(ns, key) - logger.Debugf("Read state from storage key=[%s:%s], value=[%#v], version=[%d]", ns, key, value, version) + + // trace the first 500 bytes of value only, in case it is huge + if logger.IsEnabledFor(logging.DEBUG) { + if len(value) < 500 { + logger.Debugf("Read state from storage key=[%s:%s], value=[%#v], version=[%d]", ns, key, value, version) + } else { + logger.Debugf("Read state from storage key=[%s:%s], value=[%#v...], version=[%d]", ns, key, value[0:500], version) + } + } if err != nil { return nil, err } @@ -139,7 +162,17 @@ func (s *CouchDBTxSimulator) getTxReadWriteSet() *txmgmt.TxReadWriteSet { nsRWs := &txmgmt.NsReadWriteSet{NameSpace: ns, Reads: reads, Writes: writes} txRWSet.NsRWs = append(txRWSet.NsRWs, nsRWs) } - logger.Debugf("txRWSet = [%s]", txRWSet) + + // trace the first 2000 characters of RWSet only, in case it is huge + if logger.IsEnabledFor(logging.DEBUG) { + txRWSetString := txRWSet.String() + if len(txRWSetString) < 2000 { + logger.Debugf("txRWSet = [%s]", txRWSetString) + } else { + logger.Debugf("txRWSet = [%s...]", txRWSetString[0:2000]) + } + } + return txRWSet } @@ -155,6 +188,7 @@ func getSortedKeys(m interface{}) []string { // GetTxSimulationResults implements method in interface `ledger.TxSimulator` func (s *CouchDBTxSimulator) GetTxSimulationResults() ([]byte, error) { + logger.Debugf("Simulation completed, getting simulation results") return s.getTxReadWriteSet().Marshal() } diff --git a/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgmt_test.go b/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgmt_test.go new file mode 100644 index 00000000000..8f94c2b1eb0 --- /dev/null +++ b/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgmt_test.go @@ -0,0 +1,106 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package couchdbtxmgmt + +import ( + "fmt" + "os" + "testing" + + "github.com/hyperledger/fabric/core/ledger/kvledger/kvledgerconfig" + "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb" + "github.com/hyperledger/fabric/core/ledger/testutil" +) + +type testEnv struct { + conf *Conf + couchHost string + couchPort int + couchDatabaseName string + couchUsername string + couchPassword string +} + +func newTestEnv(t testing.TB) *testEnv { + conf := &Conf{"/tmp/tests/ledger/kvledger/txmgmt/couchdbtxmgmt"} + os.RemoveAll(conf.DBPath) + return &testEnv{ + conf: conf, + couchHost: "127.0.0.1", + couchPort: 5984, + couchDatabaseName: "system_test", + couchUsername: "", + couchPassword: "", + } +} + +func (env *testEnv) Cleanup() { + os.RemoveAll(env.conf.DBPath) + + //create a new connection + couchDB, _ := couchdb.CreateConnectionDefinition(env.couchHost, env.couchPort, env.couchDatabaseName, env.couchUsername, env.couchPassword) + + //drop the test database if it already existed + couchDB.DropDatabase() +} + +// couchdb_test.go tests couchdb functions already. This test just tests that a CouchDB state database is auto-created +// upon creating a new ledger transaction manager +func TestDatabaseAutoCreate(t *testing.T) { + + //Only run the tests if CouchDB is explitily enabled in the code, + //otherwise CouchDB may not be installed and all the tests would fail + //TODO replace this with external config property rather than config within the code + if kvledgerconfig.IsCouchDBEnabled() == true { + + env := newTestEnv(t) + env.Cleanup() //cleanup at the beginning to ensure the database doesn't exist already + defer env.Cleanup() //and cleanup at the end + + txMgr := NewCouchDBTxMgr(env.conf, + env.couchHost, //couchDB host + env.couchPort, //couchDB port + env.couchDatabaseName, //couchDB db name + env.couchUsername, //enter couchDB id + env.couchPassword) //enter couchDB pw + + //NewCouchDBTxMgr should have automatically created the database, let's make sure it has been created + //Retrieve the info for the new database and make sure the name matches + dbResp, _, errdb := txMgr.couchDB.GetDatabaseInfo() + testutil.AssertNoError(t, errdb, fmt.Sprintf("Error when trying to retrieve database information")) + testutil.AssertEquals(t, dbResp.DbName, env.couchDatabaseName) + + txMgr.Shutdown() + + //Call NewCouchDBTxMgr again, this time the database will already exist from last time + txMgr2 := NewCouchDBTxMgr(env.conf, + env.couchHost, //couchDB host + env.couchPort, //couchDB port + env.couchDatabaseName, //couchDB db name + env.couchUsername, //enter couchDB id + env.couchPassword) //enter couchDB pw + + //Retrieve the info for the database again, and make sure the name still matches + dbResp2, _, errdb2 := txMgr.couchDB.GetDatabaseInfo() + testutil.AssertNoError(t, errdb2, fmt.Sprintf("Error when trying to retrieve database information")) + testutil.AssertEquals(t, dbResp2.DbName, env.couchDatabaseName) + + txMgr2.Shutdown() + + } + +} diff --git a/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgr.go b/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgr.go index 6f2cb394eaa..bae6e7b5b34 100644 --- a/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgr.go +++ b/core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgr.go @@ -67,11 +67,11 @@ func (u *updateSet) get(compositeKey []byte) *versionedValue { // CouchDBTxMgr a simple implementation of interface `txmgmt.TxMgr`. // This implementation uses a read-write lock to prevent conflicts between transaction simulation and committing type CouchDBTxMgr struct { - db *db.DB - stateIndexCF *gorocksdb.ColumnFamilyHandle - updateSet *updateSet - commitRWLock sync.RWMutex - couchConnectionInfo CouchConnection // COUCHDB new properties for CouchDB + db *db.DB + stateIndexCF *gorocksdb.ColumnFamilyHandle + updateSet *updateSet + commitRWLock sync.RWMutex + couchDB *couchdb.CouchDBConnectionDef // COUCHDB new properties for CouchDB } // CouchConnection provides connection info for CouchDB @@ -85,11 +85,28 @@ type CouchConnection struct { // NewCouchDBTxMgr constructs a `CouchDBTxMgr` func NewCouchDBTxMgr(conf *Conf, host string, port int, dbName string, id string, pw string) *CouchDBTxMgr { - couchConnectionInfo := CouchConnection{host: host, port: port, dbName: dbName, id: id, pw: pw} + + // TODO cleanup this RocksDB handle db := db.CreateDB(&db.Conf{DBPath: conf.DBPath, CFNames: []string{}}) db.Open() + + couchDB, err := couchdb.CreateConnectionDefinition(host, + port, + dbName, + id, + pw) + if err != nil { + logger.Errorf("===COUCHDB=== Error during CreateConnectionDefinition(): %s\n", err.Error()) + } + + // Create CouchDB database upon ledger startup, if it doesn't already exist + _, err = couchDB.CreateDatabaseIfNotExist() + if err != nil { + logger.Errorf("===COUCHDB=== Error during CreateDatabaseIfNotExist(): %s\n", err.Error()) + } + // db and stateIndexCF will not be used for CouchDB. TODO to cleanup - return &CouchDBTxMgr{db: db, stateIndexCF: db.GetDefaultCFHandle(), couchConnectionInfo: couchConnectionInfo} + return &CouchDBTxMgr{db: db, stateIndexCF: db.GetDefaultCFHandle(), couchDB: couchDB} } // NewQueryExecutor implements method in interface `txmgmt.TxMgr` @@ -145,7 +162,16 @@ func (txmgr *CouchDBTxMgr) ValidateAndPrepare(block *protos.Block2) (*protos.Blo return nil, nil, err } - logger.Debugf("validating txRWSet:[%s]", txRWSet) + // trace the first 2000 characters of RWSet only, in case it is huge + if logger.IsEnabledFor(logging.DEBUG) { + txRWSetString := txRWSet.String() + if len(txRWSetString) < 2000 { + logger.Debugf("validating txRWSet:[%s]", txRWSetString) + } else { + logger.Debugf("validating txRWSet:[%s...]", txRWSetString[0:2000]) + } + } + if valid, err = txmgr.validateTx(txRWSet); err != nil { return nil, nil, err } @@ -170,7 +196,17 @@ func (txmgr *CouchDBTxMgr) Shutdown() { } func (txmgr *CouchDBTxMgr) validateTx(txRWSet *txmgmt.TxReadWriteSet) (bool, error) { - logger.Debugf("Validating txRWSet:%s", txRWSet) + + // trace the first 2000 characters of RWSet only, in case it is huge + if logger.IsEnabledFor(logging.DEBUG) { + txRWSetString := txRWSet.String() + if len(txRWSetString) < 2000 { + logger.Debugf("Validating txRWSet:%s", txRWSetString) + } else { + logger.Debugf("Validating txRWSet:%s...", txRWSetString[0:2000]) + } + } + var err error var currentVersion uint64 @@ -228,12 +264,6 @@ func (txmgr *CouchDBTxMgr) Commit() error { panic("validateAndPrepare() method should have been called before calling commit()") } - db, _ := couchdb.CreateConnectionDefinition(txmgr.couchConnectionInfo.host, - txmgr.couchConnectionInfo.port, - txmgr.couchConnectionInfo.dbName, - txmgr.couchConnectionInfo.id, - txmgr.couchConnectionInfo.pw) - for k, v := range txmgr.updateSet.m { txmgr.commitRWLock.Lock() @@ -241,7 +271,7 @@ func (txmgr *CouchDBTxMgr) Commit() error { defer func() { txmgr.updateSet = nil }() // SaveDoc using couchdb client - rev, err := db.SaveDoc(k, v.value) + rev, err := txmgr.couchDB.SaveDoc(k, v.value) if err != nil { logger.Errorf("===COUCHDB=== Error during Commit(): %s\n", err.Error()) @@ -286,13 +316,7 @@ func (txmgr *CouchDBTxMgr) getCommittedValueAndVersion(ns string, key string) ([ value, version := decodeValue(encodedValue) */ - db, _ := couchdb.CreateConnectionDefinition(txmgr.couchConnectionInfo.host, - txmgr.couchConnectionInfo.port, - txmgr.couchConnectionInfo.dbName, - txmgr.couchConnectionInfo.id, - txmgr.couchConnectionInfo.pw) - - jsonBytes, _, _ := db.ReadDoc(string(compositeKey)) // TODO add error handling + jsonBytes, _, _ := txmgr.couchDB.ReadDoc(string(compositeKey)) // TODO add error handling if jsonBytes != nil { jsonString := string(jsonBytes[:])