consumer: it now responds to the corrent id

This commit is contained in:
2005 2024-08-25 03:41:13 +02:00
parent 953f7c154b
commit 90906a701c

View file

@ -1,9 +1,12 @@
use std::str::from_utf8; use std::str::from_utf8;
use common::EmailJob;
use futures_lite::stream::StreamExt; use futures_lite::stream::StreamExt;
use lapin::{ use lapin::{
options::*, publisher_confirm::Confirmation, types::FieldTable, BasicProperties, Connection, options::*,
ConnectionProperties, Result, publisher_confirm::Confirmation,
types::{FieldTable, ShortString},
BasicProperties, Connection, ConnectionProperties, Result,
}; };
use log::*; use log::*;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
@ -26,6 +29,7 @@ async fn main() -> Result<()> {
info!("CONNECTED"); info!("CONNECTED");
let channel_b = conn.create_channel().await.unwrap(); let channel_b = conn.create_channel().await.unwrap();
// declare a consumer
let mut consumer = channel_b let mut consumer = channel_b
.basic_consume( .basic_consume(
"email", "email",
@ -35,21 +39,24 @@ async fn main() -> Result<()> {
) )
.await?; .await?;
// watch for incoming messages
while let Some(delivery) = consumer.next().await { while let Some(delivery) = consumer.next().await {
let delivery = delivery.expect("error in consumer"); let delivery = delivery.expect("error in consumer");
let reply = delivery.properties.reply_to(); let reply = delivery.properties.correlation_id();
log::info!("Reply id: {:?}", reply); log::info!("Reply id: {:?}", reply);
let data = serde_json::from_str::<EmailJob>(from_utf8(&delivery.data).unwrap()).unwrap();
channel_b channel_b
.basic_publish( .basic_publish(
"", "",
reply.clone().unwrap().as_str(), delivery.properties.reply_to().clone().unwrap().as_str(),
BasicPublishOptions::default(), BasicPublishOptions::default(),
"asd".as_bytes(), format!("Sent email to {}", data.to()).as_bytes(),
BasicProperties::default().with_reply_to(consumer.queue()), BasicProperties::default(),
) )
.await .await
.unwrap(); .unwrap();
info!("Received message {:?}", delivery); delivery.ack(BasicAckOptions::default()).await.unwrap();
info!("Received emailjob {:?} and replied back", data);
} }
Ok(()) Ok(())