Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi-database support #450

Merged
merged 4 commits into from
Apr 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions src/driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,13 @@ class Driver {

/**
* Verifies connectivity of this driver by trying to open a connection with the provided driver options.
* @param {string} [db=''] the target database to verify connectivity for.
* @returns {Promise<object>} promise resolved with server info or rejected with error.
*/
verifyConnectivity () {
verifyConnectivity ({ db = '' } = {}) {
const connectionProvider = this._getOrCreateConnectionProvider()
const connectivityVerifier = new ConnectivityVerifier(connectionProvider)
return connectivityVerifier.verify()
return connectivityVerifier.verify({ db })
}

/**
Expand Down Expand Up @@ -178,18 +179,29 @@ class Driver {
* it is closed, the underlying connection will be released to the connection
* pool and made available for others to use.
*
* @param {string} [mode=WRITE] the access mode of this session, allowed values are {@link READ} and {@link WRITE}.
* @param {string|string[]} [bookmarkOrBookmarks=null] the initial reference or references to some previous
* @param {string} [defaultAccessMode=WRITE] the access mode of this session, allowed values are {@link READ} and {@link WRITE}.
* @param {string|string[]} [bookmarks=null] the initial reference or references to some previous
* transactions. Value is optional and absence indicates that that the bookmarks do not exist or are unknown.
* @param {string} [db=''] the database this session will connect to.
* @return {Session} new session.
*/
session (mode, bookmarkOrBookmarks) {
const sessionMode = Driver._validateSessionMode(mode)
session ({
defaultAccessMode = WRITE,
bookmarks: bookmarkOrBookmarks,
db = ''
} = {}) {
const sessionMode = Driver._validateSessionMode(defaultAccessMode)
const connectionProvider = this._getOrCreateConnectionProvider()
const bookmark = bookmarkOrBookmarks
? new Bookmark(bookmarkOrBookmarks)
: Bookmark.empty()
return new Session(sessionMode, connectionProvider, bookmark, this._config)
return new Session({
mode: sessionMode,
db,
connectionProvider,
bookmark,
config: this._config
})
}

static _validateSessionMode (rawMode) {
Expand Down
60 changes: 60 additions & 0 deletions src/internal/bolt-protocol-util.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { newError } from '../error'

/**
* @param {TxConfig} txConfig the auto-commit transaction configuration.
* @param {Connection} connection the connection.
* @param {StreamObserver} observer the response observer.
*/
function assertTxConfigIsEmpty (txConfig, connection, observer) {
if (txConfig && !txConfig.isEmpty()) {
const error = newError(
'Driver is connected to the database that does not support transaction configuration. ' +
'Please upgrade to neo4j 3.5.0 or later in order to use this functionality'
)

// unsupported API was used, consider this a fatal error for the current connection
connection._handleFatalError(error)
observer.onError(error)
throw error
}
}

/**
* Asserts that the passed-in database name is empty.
* @param {string} db
* @param {Connection} connection
* @param {StreamObserver} observer
*/
function assertDbIsEmpty (db, connection, observer) {
if (db) {
const error = newError(
'Driver is connected to the database that does not support multiple databases. ' +
'Please upgrade to neo4j 4.0.0 or later in order to use this functionality'
)

// unsupported API was used, consider this a fatal error for the current connection
connection._handleFatalError(error)
observer.onError(error)
throw error
}
}

export { assertDbIsEmpty, assertTxConfigIsEmpty }
62 changes: 21 additions & 41 deletions src/internal/bolt-protocol-v1.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
*/
import RequestMessage from './request-message'
import * as v1 from './packstream-v1'
import { newError } from '../error'
import Bookmark from './bookmark'
import TxConfig from './tx-config'
import { ACCESS_MODE_WRITE } from './constants'
import { assertDbIsEmpty, assertTxConfigIsEmpty } from './bolt-protocol-util'

export default class BoltProtocol {
/**
Expand Down Expand Up @@ -78,13 +78,15 @@ export default class BoltProtocol {

/**
* Begin an explicit transaction.
* @param {StreamObserver} observer the response observer.
* @param {Bookmark} bookmark the bookmark.
* @param {TxConfig} txConfig the configuration.
* @param {string} db the target database name.
* @param {string} mode the access mode.
* @param {StreamObserver} observer the response observer.
*/
beginTransaction (bookmark, txConfig, mode, observer) {
beginTransaction (observer, { bookmark, txConfig, db, mode }) {
assertTxConfigIsEmpty(txConfig, this._connection, observer)
assertDbIsEmpty(db, this._connection, observer)

const runMessage = RequestMessage.run(
'BEGIN',
Expand All @@ -103,14 +105,11 @@ export default class BoltProtocol {
commitTransaction (observer) {
// WRITE access mode is used as a place holder here, it has
// no effect on behaviour for Bolt V1 & V2
this.run(
'COMMIT',
{},
Bookmark.empty(),
TxConfig.empty(),
ACCESS_MODE_WRITE,
observer
)
this.run('COMMIT', {}, observer, {
bookmark: Bookmark.empty(),
txConfig: TxConfig.empty(),
mode: ACCESS_MODE_WRITE
})
}

/**
Expand All @@ -120,28 +119,28 @@ export default class BoltProtocol {
rollbackTransaction (observer) {
// WRITE access mode is used as a place holder here, it has
// no effect on behaviour for Bolt V1 & V2
this.run(
'ROLLBACK',
{},
Bookmark.empty(),
TxConfig.empty(),
ACCESS_MODE_WRITE,
observer
)
this.run('ROLLBACK', {}, observer, {
bookmark: Bookmark.empty(),
txConfig: TxConfig.empty(),
mode: ACCESS_MODE_WRITE
})
}

/**
* Send a Cypher statement through the underlying connection.
* @param {string} statement the cypher statement.
* @param {object} parameters the statement parameters.
* @param {StreamObserver} observer the response observer.
* @param {Bookmark} bookmark the bookmark.
* @param {TxConfig} txConfig the auto-commit transaction configuration.
* @param {string} db the target database name.
* @param {string} mode the access mode.
* @param {StreamObserver} observer the response observer.
*/
run (statement, parameters, bookmark, txConfig, mode, observer) {
// bookmark and mode are ignored in this versioon of the protocol
run (statement, parameters, observer, { bookmark, txConfig, db, mode }) {
// bookmark and mode are ignored in this version of the protocol
assertTxConfigIsEmpty(txConfig, this._connection, observer)
// passing in a db name on this protocol version throws an error
assertDbIsEmpty(db, this._connection, observer)

const runMessage = RequestMessage.run(statement, parameters)
const pullAllMessage = RequestMessage.pullAll()
Expand All @@ -167,22 +166,3 @@ export default class BoltProtocol {
return new v1.Unpacker(disableLosslessIntegers)
}
}

/**
* @param {TxConfig} txConfig the auto-commit transaction configuration.
* @param {Connection} connection the connection.
* @param {StreamObserver} observer the response observer.
*/
function assertTxConfigIsEmpty (txConfig, connection, observer) {
if (!txConfig.isEmpty()) {
const error = newError(
'Driver is connected to the database that does not support transaction configuration. ' +
'Please upgrade to neo4j 3.5.0 or later in order to use this functionality'
)

// unsupported API was used, consider this a fatal error for the current connection
connection._handleFatalError(error)
observer.onError(error)
throw error
}
}
17 changes: 10 additions & 7 deletions src/internal/bolt-protocol-v3.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
import BoltProtocolV2 from './bolt-protocol-v2'
import RequestMessage from './request-message'
import { assertDbIsEmpty } from './bolt-protocol-util'

export default class BoltProtocol extends BoltProtocolV2 {
transformMetadata (metadata) {
Expand Down Expand Up @@ -47,9 +48,10 @@ export default class BoltProtocol extends BoltProtocolV2 {
this._connection.write(message, observer, true)
}

beginTransaction (bookmark, txConfig, mode, observer) {
beginTransaction (observer, { bookmark, txConfig, db, mode }) {
assertDbIsEmpty(db, this._connection, observer)
prepareToHandleSingleResponse(observer)
const message = RequestMessage.begin(bookmark, txConfig, mode)
const message = RequestMessage.begin({ bookmark, txConfig, mode })
this._connection.write(message, observer, true)
}

Expand All @@ -65,14 +67,15 @@ export default class BoltProtocol extends BoltProtocolV2 {
this._connection.write(message, observer, true)
}

run (statement, parameters, bookmark, txConfig, mode, observer) {
const runMessage = RequestMessage.runWithMetadata(
statement,
parameters,
run (statement, parameters, observer, { bookmark, txConfig, db, mode }) {
// passing in a db name on this protocol version throws an error
assertDbIsEmpty(db, this._connection, observer)

const runMessage = RequestMessage.runWithMetadata(statement, parameters, {
bookmark,
txConfig,
mode
)
})
const pullAllMessage = RequestMessage.pullAll()

this._connection.write(runMessage, observer, false)
Expand Down
40 changes: 40 additions & 0 deletions src/internal/bolt-protocol-v4.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import BoltProtocolV3 from './bolt-protocol-v3'
import RequestMessage from './request-message'

export default class BoltProtocol extends BoltProtocolV3 {
beginTransaction (observer, { bookmark, txConfig, db, mode }) {
const message = RequestMessage.begin({ bookmark, txConfig, db, mode })
this._connection.write(message, observer, true)
}

run (statement, parameters, observer, { bookmark, txConfig, db, mode }) {
const runMessage = RequestMessage.runWithMetadata(statement, parameters, {
bookmark,
txConfig,
db,
mode
})
const pullMessage = RequestMessage.pull()

this._connection.write(runMessage, observer, false)
this._connection.write(pullMessage, observer, true)
}
}
17 changes: 15 additions & 2 deletions src/internal/connection-holder.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/

import { newError } from '../error'
import { assertString } from './util'
import { ACCESS_MODE_WRITE } from './constants'

/**
* Utility to lazily initialize connections and return them back to the pool when unused.
Expand All @@ -26,10 +28,12 @@ export default class ConnectionHolder {
/**
* @constructor
* @param {string} mode - the access mode for new connection holder.
* @param {string} db - the target database name.
* @param {ConnectionProvider} connectionProvider - the connection provider to acquire connections from.
*/
constructor (mode, connectionProvider) {
constructor ({ mode = ACCESS_MODE_WRITE, db = '', connectionProvider } = {}) {
this._mode = mode
this._db = db ? assertString(db, 'db') : ''
this._connectionProvider = connectionProvider
this._referenceCount = 0
this._connectionPromise = Promise.resolve(null)
Expand All @@ -43,14 +47,23 @@ export default class ConnectionHolder {
return this._mode
}

/**
* Returns the target database name
* @returns {string} db name
*/
db () {
return this._db
}

/**
* Make this holder initialize new connection if none exists already.
* @return {undefined}
*/
initializeConnection () {
if (this._referenceCount === 0) {
this._connectionPromise = this._connectionProvider.acquireConnection(
this._mode
this._mode,
this._db
)
}
this._referenceCount++
Expand Down
Loading