Skip to content

Commit

Permalink
FAB-2724: Fix CouchDB max open connections
Browse files Browse the repository at this point in the history
Client and Transport are reusable and thread safe, so
initialize once and reuse across requests. Also close the response
body properly by consuming it fully before closing it to allow
the underlying RoundTripper to reuse the transport for subsequent
requests.

Change-Id: I1d1692ee96db1ed91a0eaccbe81439312c6935ac
Signed-off-by: Balaji Viswanathan <balaji.viswanathan@gmail.com>
Signed-off-by: denyeart <enyeart@us.ibm.com>
  • Loading branch information
bviswana101 authored and ghaskins committed Apr 24, 2017
1 parent 8ce1073 commit 3dcc32f
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 27 deletions.
55 changes: 29 additions & 26 deletions core/ledger/util/couchdb/couchdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ type CouchConnectionDef struct {

//CouchInstance represents a CouchDB instance
type CouchInstance struct {
conf CouchConnectionDef //connection configuration
conf CouchConnectionDef //connection configuration
client *http.Client // a client to connect to this instance
}

//CouchDatabase represents a database within a CouchDB instance
Expand Down Expand Up @@ -206,6 +207,13 @@ type Base64Attachment struct {
AttachmentData string `json:"data"`
}

// closeResponseBody discards the body and then closes it to enable returning it to
// connection pool
func closeResponseBody(resp *http.Response) {
io.Copy(ioutil.Discard, resp.Body) // discard whatever is remaining of body
resp.Body.Close()
}

//CreateConnectionDefinition for a new client connection
func CreateConnectionDefinition(couchDBAddress, username, password string, maxRetries,
maxRetriesOnStartup int, requestTimeout time.Duration) (*CouchConnectionDef, error) {
Expand Down Expand Up @@ -264,7 +272,7 @@ func (dbclient *CouchDatabase) CreateDatabaseIfNotExist() (*DBOperationResponse,
if err != nil {
return nil, err
}
defer resp.Body.Close()
defer closeResponseBody(resp)

//Get the response from the create REST call
dbResponse := &DBOperationResponse{}
Expand Down Expand Up @@ -305,7 +313,7 @@ func (dbclient *CouchDatabase) GetDatabaseInfo() (*DBInfo, *DBReturn, error) {
if err != nil {
return nil, couchDBReturn, err
}
defer resp.Body.Close()
defer closeResponseBody(resp)

dbResponse := &DBInfo{}
json.NewDecoder(resp.Body).Decode(&dbResponse)
Expand Down Expand Up @@ -344,7 +352,7 @@ func (couchInstance *CouchInstance) VerifyCouchConfig() (*ConnectionInfo, *DBRet
if err != nil {
return nil, couchDBReturn, fmt.Errorf("Unable to connect to CouchDB, check the hostname and port: %s", err.Error())
}
defer resp.Body.Close()
defer closeResponseBody(resp)

dbResponse := &ConnectionInfo{}
errJSON := json.NewDecoder(resp.Body).Decode(&dbResponse)
Expand All @@ -371,7 +379,6 @@ func (couchInstance *CouchInstance) VerifyCouchConfig() (*ConnectionInfo, *DBRet
}

return dbResponse, couchDBReturn, nil

}

//DropDatabase provides method to drop an existing database
Expand All @@ -393,7 +400,7 @@ func (dbclient *CouchDatabase) DropDatabase() (*DBOperationResponse, error) {
if err != nil {
return nil, err
}
defer resp.Body.Close()
defer closeResponseBody(resp)

dbResponse := &DBOperationResponse{}
json.NewDecoder(resp.Body).Decode(&dbResponse)
Expand Down Expand Up @@ -434,7 +441,7 @@ func (dbclient *CouchDatabase) EnsureFullCommit() (*DBOperationResponse, error)
logger.Errorf("Failed to invoke _ensure_full_commit Error: %s\n", err.Error())
return nil, err
}
defer resp.Body.Close()
defer closeResponseBody(resp)

dbResponse := &DBOperationResponse{}
json.NewDecoder(resp.Body).Decode(&dbResponse)
Expand Down Expand Up @@ -528,7 +535,7 @@ func (dbclient *CouchDatabase) SaveDoc(id string, rev string, couchDoc *CouchDoc
if err != nil {
return "", err
}
defer resp.Body.Close()
defer closeResponseBody(resp)

//get the revision and return
revision, err := getRevisionHeader(resp)
Expand Down Expand Up @@ -666,7 +673,7 @@ func (dbclient *CouchDatabase) ReadDoc(id string) (*CouchDoc, string, error) {
logger.Debugf("couchDBReturn=%v\n", couchDBReturn)
return nil, "", err
}
defer resp.Body.Close()
defer closeResponseBody(resp)

//Get the media type from the Content-Type header
mediaType, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
Expand Down Expand Up @@ -813,7 +820,7 @@ func (dbclient *CouchDatabase) ReadDocRange(startKey, endKey string, limit, skip
if err != nil {
return nil, err
}
defer resp.Body.Close()
defer closeResponseBody(resp)

if logger.IsEnabledFor(logging.DEBUG) {
dump, err2 := httputil.DumpResponse(resp, true)
Expand Down Expand Up @@ -918,7 +925,7 @@ func (dbclient *CouchDatabase) DeleteDoc(id, rev string) error {
}
return err
}
defer resp.Body.Close()
defer closeResponseBody(resp)

logger.Debugf("Exiting DeleteDoc()")

Expand Down Expand Up @@ -948,7 +955,7 @@ func (dbclient *CouchDatabase) QueryDocuments(query string) (*[]QueryResult, err
if err != nil {
return nil, err
}
defer resp.Body.Close()
defer closeResponseBody(resp)

if logger.IsEnabledFor(logging.DEBUG) {
dump, err2 := httputil.DumpResponse(resp, true)
Expand Down Expand Up @@ -1034,7 +1041,7 @@ func (dbclient *CouchDatabase) BatchRetrieveIDRevision(keys []string) ([]*DocMet
if err != nil {
return nil, err
}
defer resp.Body.Close()
defer closeResponseBody(resp)

if logger.IsEnabledFor(logging.DEBUG) {
dump, _ := httputil.DumpResponse(resp, false)
Expand Down Expand Up @@ -1129,7 +1136,7 @@ func (dbclient *CouchDatabase) BatchUpdateDocuments(documents []*CouchDoc) ([]*B
if err != nil {
return nil, err
}
defer resp.Body.Close()
defer closeResponseBody(resp)

if logger.IsEnabledFor(logging.DEBUG) {
dump, _ := httputil.DumpResponse(resp, false)
Expand All @@ -1156,7 +1163,9 @@ func (dbclient *CouchDatabase) BatchUpdateDocuments(documents []*CouchDoc) ([]*B

}

//handleRequest method is a generic http request handler
//handleRequest method is a generic http request handler.
// if it returns an error, it ensures that the response body is closed, else it is the
// callee's responsibility to close response correctly
func (couchInstance *CouchInstance) handleRequest(method, connectURL string, data []byte, rev string,
multipartBoundary string, maxRetries int) (*http.Response, *DBReturn, error) {

Expand All @@ -1170,9 +1179,6 @@ func (couchInstance *CouchInstance) handleRequest(method, connectURL string, dat
//set initial wait duration for retries
waitDuration := retryWaitTime * time.Millisecond

//get the connection timeout
requestTimeout := couchInstance.conf.RequestTimeout

//attempt the http request for the max number of retries
for attempts := 0; attempts < maxRetries; attempts++ {

Expand Down Expand Up @@ -1225,14 +1231,8 @@ func (couchInstance *CouchInstance) handleRequest(method, connectURL string, dat
logger.Debugf("HTTP Request: %s", bytes.Replace(dump, []byte{0x0d, 0x0a}, []byte{0x20, 0x7c, 0x20}, -1))
}

//Create the http client
client := &http.Client{Timeout: requestTimeout}

transport := &http.Transport{Proxy: http.ProxyFromEnvironment}
transport.DisableCompression = false
client.Transport = transport
//Execute http request
resp, errResp = client.Do(req)
resp, errResp = couchInstance.client.Do(req)

//if an error is not detected then drop out of the retry
if errResp == nil && resp != nil && resp.StatusCode < 500 {
Expand All @@ -1248,8 +1248,9 @@ func (couchInstance *CouchInstance) handleRequest(method, connectURL string, dat

} else {

//Read the response body
//Read the response body and close it for next attempt
jsonError, err := ioutil.ReadAll(resp.Body)
closeResponseBody(resp)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -1283,6 +1284,8 @@ func (couchInstance *CouchInstance) handleRequest(method, connectURL string, dat
//check to see if the status code is 400 or higher
//response codes 4XX and 500 will be treated as errors
if resp.StatusCode >= 400 {
// close the response before returning error
defer closeResponseBody(resp)

//Read the response body
jsonError, err := ioutil.ReadAll(resp.Body)
Expand Down
12 changes: 11 additions & 1 deletion core/ledger/util/couchdb/couchdbutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package couchdb

import (
"fmt"
"net/http"
"regexp"
"strconv"
"strings"
Expand All @@ -38,8 +39,17 @@ func CreateCouchInstance(couchDBConnectURL, id, pw string, maxRetries,
return nil, err
}

// Create the http client once
// Clients and Transports are safe for concurrent use by multiple goroutines
// and for efficiency should only be created once and re-used.
client := &http.Client{Timeout: couchConf.RequestTimeout}

transport := &http.Transport{Proxy: http.ProxyFromEnvironment}
transport.DisableCompression = false
client.Transport = transport

//Create the CouchDB instance
couchInstance := &CouchInstance{conf: *couchConf}
couchInstance := &CouchInstance{conf: *couchConf, client: client}

connectInfo, retVal, verifyErr := couchInstance.VerifyCouchConfig()
if verifyErr != nil {
Expand Down

0 comments on commit 3dcc32f

Please sign in to comment.