Skip to content

Commit

Permalink
feat: switch to native Promise
Browse files Browse the repository at this point in the history
  • Loading branch information
alexkvak committed May 18, 2021
1 parent d87348d commit 1e59be1
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 72 deletions.
2 changes: 0 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
language: node_js
node_js:
- "0.8"
- "0.10"
- "0.12"
- "iojs-v1"
- "iojs-v2"
Expand Down
11 changes: 8 additions & 3 deletions channel_api.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
var raw_connect = require('./lib/connect').connect;
var ChannelModel = require('./lib/channel_model').ChannelModel;
var Promise = require('bluebird');

function connect(url, connOptions) {
return Promise.fromCallback(function(cb) {
return raw_connect(url, connOptions, cb);
return new Promise(function (resolve, reject) {
raw_connect(url, connOptions, function (err, result) {
if (err) {
reject(err);
} else {
resolve(result);
}
})
})
.then(function(conn) {
return new ChannelModel(conn);
Expand Down
3 changes: 1 addition & 2 deletions examples/tutorials/receive_logs_direct.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#!/usr/bin/env node

var amqp = require('amqplib');
var all = require('bluebird').all;
var basename = require('path').basename;

var severities = process.argv.slice(2);
Expand All @@ -24,7 +23,7 @@ amqp.connect('amqp://localhost').then(function(conn) {

ok = ok.then(function(qok) {
var queue = qok.queue;
return all(severities.map(function(sev) {
return Promise.all(severities.map(function(sev) {
ch.bindQueue(queue, ex, sev);
})).then(function() { return queue; });
});
Expand Down
3 changes: 1 addition & 2 deletions examples/tutorials/receive_logs_topic.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

var amqp = require('amqplib');
var basename = require('path').basename;
var all = require('bluebird').all;

var keys = process.argv.slice(2);
if (keys.length < 1) {
Expand All @@ -23,7 +22,7 @@ amqp.connect('amqp://localhost').then(function(conn) {

ok = ok.then(function(qok) {
var queue = qok.queue;
return all(keys.map(function(rk) {
return Promise.all(keys.map(function(rk) {
ch.bindQueue(queue, ex, rk);
})).then(function() { return queue; });
});
Expand Down
1 change: 0 additions & 1 deletion examples/tutorials/rpc_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

var amqp = require('amqplib');
var basename = require('path').basename;
var Promise = require('bluebird');
var uuid = require('node-uuid');

// I've departed from the form of the original RPC tutorial, which
Expand Down
1 change: 0 additions & 1 deletion lib/callback_model.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
'use strict';

var defs = require('./defs');
var Promise = require('bluebird');
var inherits = require('util').inherits;
var EventEmitter = require('events').EventEmitter;
var BaseChannel = require('./channel').BaseChannel;
Expand Down
78 changes: 66 additions & 12 deletions lib/channel_model.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
'use strict';

var defs = require('./defs');
var Promise = require('bluebird');
var Bluebird = require('bluebird');
var inherits = require('util').inherits;
var EventEmitter = require('events').EventEmitter;
var BaseChannel = require('./channel').BaseChannel;
Expand All @@ -29,7 +29,17 @@ module.exports.ChannelModel = ChannelModel;
var CM = ChannelModel.prototype;

CM.close = function() {
return Promise.fromCallback(this.connection.close.bind(this.connection));
var close = this.connection.close.bind(this.connection);

return new Promise(function (resolve, reject) {
close(function (err, result) {
if (err) {
reject(err);
} else {
resolve(result);
}
});
});
};

// Channels
Expand All @@ -55,17 +65,31 @@ var C = Channel.prototype;
// API procedures.
C.rpc = function(method, fields, expect) {
var self = this;
return Promise.fromCallback(function(cb) {
return self._rpc(method, fields, expect, cb);
})
.then(function(f) {

return new Promise(function (resolve, reject) {
self._rpc(method, fields, expect, function (err, result) {
if (err) {
reject(err);
} else {
resolve(result);
}
});
}).then(function (f) {
return f.fields;
});
};

// Do the remarkably simple channel open handshake
C.open = function() {
return Promise.try(this.allocate.bind(this)).then(
var allocate = this.allocate.bind(this);

return new Promise(function (resolve, reject) {
try {
resolve(allocate());
} catch (e) {
reject(e);
}
}).then(
function(ch) {
return ch.rpc(defs.ChannelOpen, {outOfBand: ""},
defs.ChannelOpenOk);
Expand All @@ -74,10 +98,16 @@ C.open = function() {

C.close = function() {
var self = this;
return Promise.fromCallback(function(cb) {

return Bluebird.fromCallback(function(cb) {
return self.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS,
cb);
});
// return new Promise(function(resolve, reject) {
// return self.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS,
// (err, result) => err ? reject(err) : resolve(result)
// );
// });
};

// === Public API, declaring queues and stuff ===
Expand Down Expand Up @@ -167,9 +197,18 @@ C.consume = function(queue, callback, options) {
// NB we want the callback to be run synchronously, so that we've
// registered the consumerTag before any messages can arrive.
var fields = Args.consume(queue, options);
return Promise.fromCallback(function(cb) {
return Bluebird.fromCallback(function(cb) {
self._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, cb);
})
// return new Promise(function(resolve, reject) {
// self._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, function (err, result) {
// if (err) {
// reject(err);
// } else {
// resolve(result);
// }
// });
// })
.then(function(ok) {
self.registerConsumer(ok.fields.consumerTag, callback);
return ok.fields;
Expand All @@ -178,10 +217,16 @@ C.consume = function(queue, callback, options) {

C.cancel = function(consumerTag) {
var self = this;
return Promise.fromCallback(function(cb) {
return new Promise(function(resolve, reject) {
self._rpc(defs.BasicCancel, Args.cancel(consumerTag),
defs.BasicCancelOk,
cb);
function (err, result) {
if (err) {
reject(err);
} else {
resolve(result);
}
});
})
.then(function(ok) {
self.unregisterConsumer(consumerTag);
Expand All @@ -192,9 +237,18 @@ C.cancel = function(consumerTag) {
C.get = function(queue, options) {
var self = this;
var fields = Args.get(queue, options);
return Promise.fromCallback(function(cb) {
return Bluebird.fromCallback(function(cb) {
return self.sendOrEnqueue(defs.BasicGet, fields, cb);
})
// return new Promise(function(resolve, reject) {
// return self.sendOrEnqueue(defs.BasicGet, fields, function (err, result) {
// if (err) {
// reject(err);
// } else {
// resolve(result);
// }
// });
// })
.then(function(f) {
if (f.id === defs.BasicGetEmpty) {
return false;
Expand Down
21 changes: 14 additions & 7 deletions test/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
'use strict';

var assert = require('assert');
var Promise = require('bluebird');
var Channel = require('../lib/channel').Channel;
var Connection = require('../lib/connection').Connection;
var util = require('./util');
Expand Down Expand Up @@ -77,11 +76,19 @@ var DELIVER_FIELDS = {
};

function open(ch) {
return Promise.try(function() {
ch.allocate();
return Promise.fromCallback(function(cb) {
ch._rpc(defs.ChannelOpen, {outOfBand: ''}, defs.ChannelOpenOk, cb);
});
return new Promise(function (resolve, reject) {
try {
ch.allocate();
ch._rpc(defs.ChannelOpen, {outOfBand: ''}, defs.ChannelOpenOk, function (err, done) {
if (err) {
reject(err);
} else {
resolve(done);
}
});
} catch (e) {
reject(e);
}
});
}

Expand Down Expand Up @@ -286,7 +293,7 @@ test("RPC on closed channel", channelTest(
failureCb(resolve, reject));
});

Promise.join(close, fail1, fail2)
Promise.all([close, fail1, fail2])
.then(succeed(done))
.catch(fail(done));
},
Expand Down
Loading

0 comments on commit 1e59be1

Please sign in to comment.