i have function queuing message rabbitmq so:
var amqp = require('amqplib/callback_api'); var _queueurl = 'amqp://127.0.0.1'; var _toblahblahqueuename = 'blahblah'; var self = module.exports = { queuemessage: function (msgobj, callback) { try { amqp.connect(_queueurl, function (err, connection) { if (err) { callback(err); } connection.createchannel(function (err, channel) { if (err) { callback(err); } channel.assertqueue(_toblahblahqueuename, { durable: true }, function (err, _ok) { if (err) { callback(err); } var msg = new buffer(json.stringify(msgobj)); channel.sendtoqueue(_toblahblahqueuename, msg, { persistent: true }, function (err, ok) { if (err) { console.log(err); callback(err); } console.log('published', ok); channel.connection.close(); callback(null, { message: 'queued' }); }); }); }); }); } catch (e) { console.log(e.stack); callback(e); } } };
i calling function queuemessage messages 250k in length.
the sendtoqueue call hanging every time. sits there without returning error. however, message seems queued!
the server log has error message: client unexpectedly closed tcp connection
thanks help!
amqplib not support callback sendtoqueue
or publish
.
the documentation shows not option:
channel#sendtoqueue promises , callbacks
sendtoqueue(queue, content, [options])
send single message content given buffer specific queue named, bypassing routing. options , return value same publish.
to work around this, need call sendtoqueue
if synchronous message.
if want exit app, have wait few milliseconds before doing so. failure result in message not being sent.
here example of how change code work way:
channel.sendtoqueue(_toblahblahqueuename, msg, { persistent: true }); settimeout(function () { channel.connection.close(); callback(null, { message: 'queued' }); }, 500);
Comments
Post a Comment