// Publish message with expiration
channel.sendToQueue('my-queue', Buffer.from('Message Expires in 5s'), {
expiration: 5000,
});
// Consume message before expiration
channel.consume('my-queue', (msg) => {
console.log(msg.content.toString());
});
// Declare dead letter exchange
channel.assertExchange('my-dead-letter-exchange', 'direct');
channel.bindQueue('my-dead-letter-queue', 'my-dead-letter-exchange', 'my-dl-routing-key');
// Declare original exchange and bind to dead letter exchange
channel.assertExchange('my-exchange', 'topic');
channel.bindExchange('my-exchange', 'my-dead-letter-exchange', 'my-dl-routing-key');
// Publish message to original exchange
channel.publish('my-exchange', 'my-routing-key', Buffer.from('Message to Dead Letter Exchange'));
// Define retry policy with max 5 attempts and delay between attempts
channel.publish('my-exchange', 'my-routing-key', Buffer.from('Message with Retries'), {
retry: {
max: 5,
delay: 100,
},
});
// Set prefetch count
channel.prefetch(10);
// Consume messages
channel.consume('my-queue', (msg) => {
// Process message
});
// Consume message and acknowledge it
channel.consume('my-queue', (msg) => {
channel.ack(msg);
});
// Consume message and nack it
channel.consume('my-queue', (msg) => {
channel.nack(msg);
});
// Declare direct exchange
channel.assertExchange('my-direct-exchange', 'direct');
// Bind queue to exchange
channel.bindQueue('my-direct-queue', 'my-direct-exchange', 'my-direct-routing-key');
// Publish message to exchange
channel.publish('my-direct-exchange', 'my-direct-routing-key', Buffer.from('Message to Direct Exchange'));
// Declare fanout exchange
channel.assertExchange('my-fanout-exchange', 'fanout');
// Bind queue to exchange
channel.bindQueue('my-fanout-queue', 'my-fanout-exchange', '');
// Publish message to exchange
channel.publish('my-fanout-exchange', '', Buffer.from('Message to Fanout Exchange'));
// Declare topic exchange
channel.assertExchange('my-topic-exchange', 'topic');
// Bind queue to exchange
channel.bindQueue('my-topic-queue', 'my-topic-exchange', 'my-topic-routing-key.*');
// Publish message to exchange
channel.publish('my-topic-exchange', 'my-topic-routing-key.test', Buffer.from('Message to Topic Exchange'));
// Set queue durability
channel.assertQueue('my-durable-queue', {
durable: true,
});
// Enable publisher confirms
channel.confirmChannel();
// Publish message and wait for confirmation
channel.waitForConfirms(true);
channel.sendToQueue('my-durable-queue', Buffer.from('Message with Persistence Confirmation'));