Skip to content

Commit

Permalink
Merge pull request #450 from ali-ince/2.0-bolt-v4
Browse files Browse the repository at this point in the history
Add multi-database support
  • Loading branch information
zhenlineo committed Apr 17, 2019
2 parents 066a410 + a263240 commit 9a5440c
Show file tree
Hide file tree
Showing 33 changed files with 1,203 additions and 300 deletions.
26 changes: 19 additions & 7 deletions src/driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,13 @@ class Driver {

/**
* Verifies connectivity of this driver by trying to open a connection with the provided driver options.
* @param {string} [db=''] the target database to verify connectivity for.
* @returns {Promise<object>} promise resolved with server info or rejected with error.
*/
verifyConnectivity () {
verifyConnectivity ({ db = '' } = {}) {
const connectionProvider = this._getOrCreateConnectionProvider()
const connectivityVerifier = new ConnectivityVerifier(connectionProvider)
return connectivityVerifier.verify()
return connectivityVerifier.verify({ db })
}

/**
Expand Down Expand Up @@ -178,18 +179,29 @@ class Driver {
* it is closed, the underlying connection will be released to the connection
* pool and made available for others to use.
*
* @param {string} [mode=WRITE] the access mode of this session, allowed values are {@link READ} and {@link WRITE}.
* @param {string|string[]} [bookmarkOrBookmarks=null] the initial reference or references to some previous
* @param {string} [defaultAccessMode=WRITE] the access mode of this session, allowed values are {@link READ} and {@link WRITE}.
* @param {string|string[]} [bookmarks=null] the initial reference or references to some previous
* transactions. Value is optional and absence indicates that that the bookmarks do not exist or are unknown.
* @param {string} [db=''] the database this session will connect to.
* @return {Session} new session.
*/
session (mode, bookmarkOrBookmarks) {
const sessionMode = Driver._validateSessionMode(mode)
session ({
defaultAccessMode = WRITE,
bookmarks: bookmarkOrBookmarks,
db = ''
} = {}) {
const sessionMode = Driver._validateSessionMode(defaultAccessMode)
const connectionProvider = this._getOrCreateConnectionProvider()
const bookmark = bookmarkOrBookmarks
? new Bookmark(bookmarkOrBookmarks)
: Bookmark.empty()
return new Session(sessionMode, connectionProvider, bookmark, this._config)
return new Session({
mode: sessionMode,
db,
connectionProvider,
bookmark,
config: this._config
})
}

static _validateSessionMode (rawMode) {
Expand Down
60 changes: 60 additions & 0 deletions src/internal/bolt-protocol-util.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { newError } from '../error'

/**
* @param {TxConfig} txConfig the auto-commit transaction configuration.
* @param {Connection} connection the connection.
* @param {StreamObserver} observer the response observer.
*/
function assertTxConfigIsEmpty (txConfig, connection, observer) {
if (txConfig && !txConfig.isEmpty()) {
const error = newError(
'Driver is connected to the database that does not support transaction configuration. ' +
'Please upgrade to neo4j 3.5.0 or later in order to use this functionality'
)

// unsupported API was used, consider this a fatal error for the current connection
connection._handleFatalError(error)
observer.onError(error)
throw error
}
}

/**
* Asserts that the passed-in database name is empty.
* @param {string} db
* @param {Connection} connection
* @param {StreamObserver} observer
*/
function assertDbIsEmpty (db, connection, observer) {
if (db) {
const error = newError(
'Driver is connected to the database that does not support multiple databases. ' +
'Please upgrade to neo4j 4.0.0 or later in order to use this functionality'
)

// unsupported API was used, consider this a fatal error for the current connection
connection._handleFatalError(error)
observer.onError(error)
throw error
}
}

export { assertDbIsEmpty, assertTxConfigIsEmpty }
62 changes: 21 additions & 41 deletions src/internal/bolt-protocol-v1.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
*/
import RequestMessage from './request-message'
import * as v1 from './packstream-v1'
import { newError } from '../error'
import Bookmark from './bookmark'
import TxConfig from './tx-config'
import { ACCESS_MODE_WRITE } from './constants'
import { assertDbIsEmpty, assertTxConfigIsEmpty } from './bolt-protocol-util'

export default class BoltProtocol {
/**
Expand Down Expand Up @@ -78,13 +78,15 @@ export default class BoltProtocol {

/**
* Begin an explicit transaction.
* @param {StreamObserver} observer the response observer.
* @param {Bookmark} bookmark the bookmark.
* @param {TxConfig} txConfig the configuration.
* @param {string} db the target database name.
* @param {string} mode the access mode.
* @param {StreamObserver} observer the response observer.
*/
beginTransaction (bookmark, txConfig, mode, observer) {
beginTransaction (observer, { bookmark, txConfig, db, mode }) {
assertTxConfigIsEmpty(txConfig, this._connection, observer)
assertDbIsEmpty(db, this._connection, observer)

const runMessage = RequestMessage.run(
'BEGIN',
Expand All @@ -103,14 +105,11 @@ export default class BoltProtocol {
commitTransaction (observer) {
// WRITE access mode is used as a place holder here, it has
// no effect on behaviour for Bolt V1 & V2
this.run(
'COMMIT',
{},
Bookmark.empty(),
TxConfig.empty(),
ACCESS_MODE_WRITE,
observer
)
this.run('COMMIT', {}, observer, {
bookmark: Bookmark.empty(),
txConfig: TxConfig.empty(),
mode: ACCESS_MODE_WRITE
})
}

/**
Expand All @@ -120,28 +119,28 @@ export default class BoltProtocol {
rollbackTransaction (observer) {
// WRITE access mode is used as a place holder here, it has
// no effect on behaviour for Bolt V1 & V2
this.run(
'ROLLBACK',
{},
Bookmark.empty(),
TxConfig.empty(),
ACCESS_MODE_WRITE,
observer
)
this.run('ROLLBACK', {}, observer, {
bookmark: Bookmark.empty(),
txConfig: TxConfig.empty(),
mode: ACCESS_MODE_WRITE
})
}

/**
* Send a Cypher statement through the underlying connection.
* @param {string} statement the cypher statement.
* @param {object} parameters the statement parameters.
* @param {StreamObserver} observer the response observer.
* @param {Bookmark} bookmark the bookmark.
* @param {TxConfig} txConfig the auto-commit transaction configuration.
* @param {string} db the target database name.
* @param {string} mode the access mode.
* @param {StreamObserver} observer the response observer.
*/
run (statement, parameters, bookmark, txConfig, mode, observer) {
// bookmark and mode are ignored in this versioon of the protocol
run (statement, parameters, observer, { bookmark, txConfig, db, mode }) {
// bookmark and mode are ignored in this version of the protocol
assertTxConfigIsEmpty(txConfig, this._connection, observer)
// passing in a db name on this protocol version throws an error
assertDbIsEmpty(db, this._connection, observer)

const runMessage = RequestMessage.run(statement, parameters)
const pullAllMessage = RequestMessage.pullAll()
Expand All @@ -167,22 +166,3 @@ export default class BoltProtocol {
return new v1.Unpacker(disableLosslessIntegers)
}
}

/**
* @param {TxConfig} txConfig the auto-commit transaction configuration.
* @param {Connection} connection the connection.
* @param {StreamObserver} observer the response observer.
*/
function assertTxConfigIsEmpty (txConfig, connection, observer) {
if (!txConfig.isEmpty()) {
const error = newError(
'Driver is connected to the database that does not support transaction configuration. ' +
'Please upgrade to neo4j 3.5.0 or later in order to use this functionality'
)

// unsupported API was used, consider this a fatal error for the current connection
connection._handleFatalError(error)
observer.onError(error)
throw error
}
}
17 changes: 10 additions & 7 deletions src/internal/bolt-protocol-v3.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
import BoltProtocolV2 from './bolt-protocol-v2'
import RequestMessage from './request-message'
import { assertDbIsEmpty } from './bolt-protocol-util'

export default class BoltProtocol extends BoltProtocolV2 {
transformMetadata (metadata) {
Expand Down Expand Up @@ -47,9 +48,10 @@ export default class BoltProtocol extends BoltProtocolV2 {
this._connection.write(message, observer, true)
}

beginTransaction (bookmark, txConfig, mode, observer) {
beginTransaction (observer, { bookmark, txConfig, db, mode }) {
assertDbIsEmpty(db, this._connection, observer)
prepareToHandleSingleResponse(observer)
const message = RequestMessage.begin(bookmark, txConfig, mode)
const message = RequestMessage.begin({ bookmark, txConfig, mode })
this._connection.write(message, observer, true)
}

Expand All @@ -65,14 +67,15 @@ export default class BoltProtocol extends BoltProtocolV2 {
this._connection.write(message, observer, true)
}

run (statement, parameters, bookmark, txConfig, mode, observer) {
const runMessage = RequestMessage.runWithMetadata(
statement,
parameters,
run (statement, parameters, observer, { bookmark, txConfig, db, mode }) {
// passing in a db name on this protocol version throws an error
assertDbIsEmpty(db, this._connection, observer)

const runMessage = RequestMessage.runWithMetadata(statement, parameters, {
bookmark,
txConfig,
mode
)
})
const pullAllMessage = RequestMessage.pullAll()

this._connection.write(runMessage, observer, false)
Expand Down
40 changes: 40 additions & 0 deletions src/internal/bolt-protocol-v4.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import BoltProtocolV3 from './bolt-protocol-v3'
import RequestMessage from './request-message'

export default class BoltProtocol extends BoltProtocolV3 {
beginTransaction (observer, { bookmark, txConfig, db, mode }) {
const message = RequestMessage.begin({ bookmark, txConfig, db, mode })
this._connection.write(message, observer, true)
}

run (statement, parameters, observer, { bookmark, txConfig, db, mode }) {
const runMessage = RequestMessage.runWithMetadata(statement, parameters, {
bookmark,
txConfig,
db,
mode
})
const pullMessage = RequestMessage.pull()

this._connection.write(runMessage, observer, false)
this._connection.write(pullMessage, observer, true)
}
}
17 changes: 15 additions & 2 deletions src/internal/connection-holder.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/

import { newError } from '../error'
import { assertString } from './util'
import { ACCESS_MODE_WRITE } from './constants'

/**
* Utility to lazily initialize connections and return them back to the pool when unused.
Expand All @@ -26,10 +28,12 @@ export default class ConnectionHolder {
/**
* @constructor
* @param {string} mode - the access mode for new connection holder.
* @param {string} db - the target database name.
* @param {ConnectionProvider} connectionProvider - the connection provider to acquire connections from.
*/
constructor (mode, connectionProvider) {
constructor ({ mode = ACCESS_MODE_WRITE, db = '', connectionProvider } = {}) {
this._mode = mode
this._db = db ? assertString(db, 'db') : ''
this._connectionProvider = connectionProvider
this._referenceCount = 0
this._connectionPromise = Promise.resolve(null)
Expand All @@ -43,14 +47,23 @@ export default class ConnectionHolder {
return this._mode
}

/**
* Returns the target database name
* @returns {string} db name
*/
db () {
return this._db
}

/**
* Make this holder initialize new connection if none exists already.
* @return {undefined}
*/
initializeConnection () {
if (this._referenceCount === 0) {
this._connectionPromise = this._connectionProvider.acquireConnection(
this._mode
this._mode,
this._db
)
}
this._referenceCount++
Expand Down
Loading

0 comments on commit 9a5440c

Please sign in to comment.