Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow overriding Promise library #265 #599

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
b35f0ce
Update to lock file
Worthaboutapig Oct 24, 2019
59bd4a3
Update to use custom Promise libray
Worthaboutapig Oct 29, 2019
c0eb7ed
Update to use custom Promise libra
Worthaboutapig Oct 29, 2019
7e67e50
Update lock file
Worthaboutapig Oct 29, 2019
f21cf70
Merge branch 'master' of github.com:squaremo/amqp.node
Worthaboutapig Dec 13, 2020
9a3f3e7
Use pnpm
Worthaboutapig Dec 13, 2020
5d6c6ab
Update Promise usage
Worthaboutapig Dec 13, 2020
73dfe6a
Return to npm
Worthaboutapig Dec 15, 2020
0e70245
Update lockfile
Worthaboutapig Dec 15, 2020
92e29f2
use pnpm
Worthaboutapig Dec 17, 2020
88a6cd7
Update to npm and force all deps to be installed
Worthaboutapig Dec 17, 2020
2181e94
Update package versions
Worthaboutapig Dec 22, 2020
302aea8
Revert package version
Worthaboutapig Dec 22, 2020
754e08d
Merge branch 'master' of github.com:Worthaboutapig/amqp.node
Worthaboutapig Dec 22, 2020
13191b4
Max uglify to 2.x.x
Worthaboutapig Dec 22, 2020
38ed5f2
Remove the unnecessary node engine restriction
Worthaboutapig Dec 23, 2020
b86270a
Ensure the consumer registration runs before messages may arrive
Worthaboutapig Jan 13, 2021
ca9d6d0
Swap Promise.joint to Promise.all
Worthaboutapig Jan 13, 2021
df1e86b
Add mocha.json to run tests without make and other updates
Worthaboutapig Jan 13, 2021
aabeb25
Correct this scoping
Worthaboutapig Jan 13, 2021
764e0ad
Remove port
Worthaboutapig Jan 13, 2021
4ed3d35
Swap Promise.join to Promise.all
Worthaboutapig Jan 13, 2021
d52cef9
Update packages
Worthaboutapig Jan 13, 2021
cb63680
Update lockfile
Worthaboutapig May 25, 2021
c89c73f
Scope the package
Worthaboutapig Jan 18, 2022
6aaa99f
Minor tweaks pre-publish
Worthaboutapig Jan 18, 2022
763e763
Bump version for publish
Worthaboutapig Jan 18, 2022
b388133
Add some simple TypeScript typedefs
Worthaboutapig Jan 18, 2022
644bea7
Bump version
Worthaboutapig Jan 18, 2022
3f6c465
Remove unnecessary "declare"
Worthaboutapig Jan 18, 2022
968b6c1
Remove from build
Worthaboutapig Jan 18, 2022
6e365dc
We Do need declare module :D
Worthaboutapig Jan 18, 2022
eb73e6e
Improve the typedefs further
Worthaboutapig Jan 18, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
root = true

[*]
indent_style = space
indent_size = 2
end_of_line = lf
charset = utf-8
trim_trailing_whitespace = true
insert_final_newline = true

[*.md]
trim_trailing_whitespace = false
5 changes: 5 additions & 0 deletions .eslintignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
.vscode/
config/
dist/
node_modules/
webpack
15 changes: 15 additions & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module.exports = {
env: {
es6: true,
node: true,
},
extends: ["eslint:recommended", "plugin:import/errors", "plugin:import/warnings", "prettier"],
parserOptions: {
ecmaVersion: 2017,
sourceType: "module",
},
rules: {
quotes: ["error", "double", { avoidEscape: true }],
semi: ["error", "always"],
},
};
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ node_modules/
bin/amqp-rabbitmq-0.9.1.json
etc/
coverage/
.nyc_output
5 changes: 5 additions & 0 deletions .mocharc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"extension": ["js"],
"ui": "tdd",
"spec": "test/**/*.js"
}
1 change: 1 addition & 0 deletions .npmignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ scratch
bin/amqp-rabbitmq-0.9.1.json
etc/
coverage/
.nyc_output/
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ bin/amqp-rabbitmq-0.9.1.json:
curl -L $(AMQP_JSON) > $@

$(ISTANBUL):
npm install
npm install --production=false

$(UGLIFY):
npm install
npm install --production=false
36 changes: 36 additions & 0 deletions amqplib.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
declare module "@worthaboutapig/amqplib" {
export type CloseHandler = () => void;
export type ErrorHandler = (error: Error) => void;
export type IMessage = { content: string };

export interface CommonChannelConnection {
close(): Promise<void>;
on(event: string, handler: CloseHandler | ErrorHandler): void;
removeListener(name: string, handler: CloseHandler | ErrorHandler): void;
valid: boolean;
}

export interface IChannel extends CommonChannelConnection {
handlers?: ChannelEventHandlers;
assertQueue(name: string, options: unknown): Promise<void>;
consume(name: string, handler: (message: IMessage) => Promise<void>, options: { noAck: boolean }): Promise<void>;
sendToQueue(name: string, data: unknown, options: { persistent: true }): boolean;
}

export interface IConnection extends CommonChannelConnection {
handlers?: ConnectionEventHandlers;
createChannel(): Promise<IChannel>;
}

export type ConnectionEventHandlers = {
onConnectionError(error: Error): Promise<void>;
onConnectionClose(): Promise<void>;
};

export type ChannelEventHandlers = {
onChannelError(error: Error): Promise<void>;
onChannelClose(): Promise<void>;
};

export function connect(url: string, connOptions: unknown): Promise<IConnection>;
}
28 changes: 17 additions & 11 deletions channel_api.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
var raw_connect = require('./lib/connect').connect;
var ChannelModel = require('./lib/channel_model').ChannelModel;
var Promise = require('bluebird');
var raw_connect = require("./lib/connect").connect;
var ChannelModel = require("./lib/channel_model").ChannelModel;

function connect(url, connOptions) {
return Promise.fromCallback(function(cb) {
return raw_connect(url, connOptions, cb);
})
.then(function(conn) {
return new ChannelModel(conn);
var options = { Promise: (connOptions && connOptions.Promise) || Promise };

return new options.Promise(function (resolve, reject) {
raw_connect(url, connOptions, function (err, result) {
if (err) {
reject(err);
} else {
resolve(result);
}
});
}).then(function (conn) {
return new ChannelModel(conn, options);
});
};
}

module.exports.connect = connect;
module.exports.credentials = require('./lib/credentials');
module.exports.IllegalOperationError = require('./lib/error').IllegalOperationError;
module.exports.credentials = require("./lib/credentials");
module.exports.IllegalOperationError = require("./lib/error").IllegalOperationError;
74 changes: 40 additions & 34 deletions examples/tutorials/receive_logs_direct.js
Original file line number Diff line number Diff line change
@@ -1,45 +1,51 @@
#!/usr/bin/env node

var amqp = require('amqplib');
var all = require('bluebird').all;
var basename = require('path').basename;
var amqp = require("amqplib");
var basename = require("path").basename;

var severities = process.argv.slice(2);
if (severities.length < 1) {
console.warn('Usage: %s [info] [warning] [error]',
basename(process.argv[1]));
console.warn("Usage: %s [info] [warning] [error]", basename(process.argv[1]));
process.exit(1);
}

amqp.connect('amqp://localhost').then(function(conn) {
process.once('SIGINT', function() { conn.close(); });
return conn.createChannel().then(function(ch) {
var ex = 'direct_logs';

var ok = ch.assertExchange(ex, 'direct', {durable: false});

ok = ok.then(function() {
return ch.assertQueue('', {exclusive: true});
});

ok = ok.then(function(qok) {
var queue = qok.queue;
return all(severities.map(function(sev) {
ch.bindQueue(queue, ex, sev);
})).then(function() { return queue; });
amqp
.connect("amqp://localhost")
.then(function (conn) {
process.once("SIGINT", function () {
conn.close();
});

ok = ok.then(function(queue) {
return ch.consume(queue, logMessage, {noAck: true});
});
return ok.then(function() {
console.log(' [*] Waiting for logs. To exit press CTRL+C.');
return conn.createChannel().then(function (ch) {
var ex = "direct_logs";

var ok = ch.assertExchange(ex, "direct", { durable: false });

ok = ok.then(function () {
return ch.assertQueue("", { exclusive: true });
});

ok = ok.then(function (qok) {
var queue = qok.queue;
return Promise.all(
severities.map(function (sev) {
ch.bindQueue(queue, ex, sev);
}),
).then(function () {
return queue;
});
});

ok = ok.then(function (queue) {
return ch.consume(queue, logMessage, { noAck: true });
});
return ok.then(function () {
console.log(" [*] Waiting for logs. To exit press CTRL+C.");
});

function logMessage(msg) {
console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
}
});

function logMessage(msg) {
console.log(" [x] %s:'%s'",
msg.fields.routingKey,
msg.content.toString());
}
});
}).catch(console.warn);
})
.catch(console.warn);
69 changes: 37 additions & 32 deletions examples/tutorials/receive_logs_topic.js
Original file line number Diff line number Diff line change
@@ -1,44 +1,49 @@
#!/usr/bin/env node

var amqp = require('amqplib');
var basename = require('path').basename;
var all = require('bluebird').all;
var amqp = require("amqplib");
var basename = require("path").basename;

var keys = process.argv.slice(2);
if (keys.length < 1) {
console.log('Usage: %s pattern [pattern...]',
basename(process.argv[1]));
console.log("Usage: %s pattern [pattern...]", basename(process.argv[1]));
process.exit(1);
}

amqp.connect('amqp://localhost').then(function(conn) {
process.once('SIGINT', function() { conn.close(); });
return conn.createChannel().then(function(ch) {
var ex = 'topic_logs';
var ok = ch.assertExchange(ex, 'topic', {durable: false});

ok = ok.then(function() {
return ch.assertQueue('', {exclusive: true});
amqp
.connect("amqp://localhost")
.then(function (conn) {
process.once("SIGINT", function () {
conn.close();
});
return conn.createChannel().then(function (ch) {
var ex = "topic_logs";
var ok = ch.assertExchange(ex, "topic", { durable: false });

ok = ok.then(function(qok) {
var queue = qok.queue;
return all(keys.map(function(rk) {
ch.bindQueue(queue, ex, rk);
})).then(function() { return queue; });
});
ok = ok.then(function () {
return ch.assertQueue("", { exclusive: true });
});

ok = ok.then(function(queue) {
return ch.consume(queue, logMessage, {noAck: true});
});
return ok.then(function() {
console.log(' [*] Waiting for logs. To exit press CTRL+C.');
});
ok = ok.then(function (qok) {
var queue = qok.queue;
return Promise.all(
keys.map(function (rk) {
ch.bindQueue(queue, ex, rk);
}),
).then(function () {
return queue;
});
});

ok = ok.then(function (queue) {
return ch.consume(queue, logMessage, { noAck: true });
});
return ok.then(function () {
console.log(" [*] Waiting for logs. To exit press CTRL+C.");
});

function logMessage(msg) {
console.log(" [x] %s:'%s'",
msg.fields.routingKey,
msg.content.toString());
}
});
}).catch(console.warn);
function logMessage(msg) {
console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
}
});
})
.catch(console.warn);
74 changes: 41 additions & 33 deletions examples/tutorials/rpc_client.js
Original file line number Diff line number Diff line change
@@ -1,53 +1,61 @@
#!/usr/bin/env node

var amqp = require('amqplib');
var basename = require('path').basename;
var Promise = require('bluebird');
var uuid = require('node-uuid');
var amqp = require("amqplib");
var basename = require("path").basename;
var uuid = require("node-uuid");

// I've departed from the form of the original RPC tutorial, which
// needlessly introduces a class definition, and doesn't even
// parameterise the request.

var n;
try {
if (process.argv.length < 3) throw Error('Too few args');
if (process.argv.length < 3) throw Error("Too few args");
n = parseInt(process.argv[2]);
}
catch (e) {
} catch (e) {
console.error(e);
console.warn('Usage: %s number', basename(process.argv[1]));
console.warn("Usage: %s number", basename(process.argv[1]));
process.exit(1);
}

amqp.connect('amqp://localhost').then(function(conn) {
return conn.createChannel().then(function(ch) {
return new Promise(function(resolve) {
var corrId = uuid();
function maybeAnswer(msg) {
if (msg.properties.correlationId === corrId) {
resolve(msg.content.toString());
}
}
amqp
.connect("amqp://localhost")
.then(function (conn) {
return conn
.createChannel()
.then(function (ch) {
return new Promise(function (resolve) {
var corrId = uuid();
function maybeAnswer(msg) {
if (msg.properties.correlationId === corrId) {
resolve(msg.content.toString());
}
}

var ok = ch.assertQueue('', {exclusive: true})
.then(function(qok) { return qok.queue; });
var ok = ch.assertQueue("", { exclusive: true }).then(function (qok) {
return qok.queue;
});

ok = ok.then(function(queue) {
return ch.consume(queue, maybeAnswer, {noAck: true})
.then(function() { return queue; });
});
ok = ok.then(function (queue) {
return ch.consume(queue, maybeAnswer, { noAck: true }).then(function () {
return queue;
});
});

ok = ok.then(function(queue) {
console.log(' [x] Requesting fib(%d)', n);
ch.sendToQueue('rpc_queue', Buffer.from(n.toString()), {
correlationId: corrId, replyTo: queue
ok = ok.then(function (queue) {
console.log(" [x] Requesting fib(%d)", n);
ch.sendToQueue("rpc_queue", Buffer.from(n.toString()), {
correlationId: corrId,
replyTo: queue,
});
});
});
})
.then(function (fibN) {
console.log(" [.] Got %d", fibN);
})
.finally(function () {
conn.close();
});
});
})
.then(function(fibN) {
console.log(' [.] Got %d', fibN);
})
.finally(function() { conn.close(); });
}).catch(console.warn);
.catch(console.warn);
Loading