From 90906a701cd3e9e499ee5319cbb00fe264e9cd1c Mon Sep 17 00:00:00 2001 From: 4o1x5 <4o1x5@4o1x5.dev> Date: Sun, 25 Aug 2024 03:41:13 +0200 Subject: [PATCH] consumer: it now responds to the corrent id --- consumer/src/main.rs | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/consumer/src/main.rs b/consumer/src/main.rs index 72c9ef4..11287db 100644 --- a/consumer/src/main.rs +++ b/consumer/src/main.rs @@ -1,9 +1,12 @@ use std::str::from_utf8; +use common::EmailJob; use futures_lite::stream::StreamExt; use lapin::{ - options::*, publisher_confirm::Confirmation, types::FieldTable, BasicProperties, Connection, - ConnectionProperties, Result, + options::*, + publisher_confirm::Confirmation, + types::{FieldTable, ShortString}, + BasicProperties, Connection, ConnectionProperties, Result, }; use log::*; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -26,6 +29,7 @@ async fn main() -> Result<()> { info!("CONNECTED"); let channel_b = conn.create_channel().await.unwrap(); + // declare a consumer let mut consumer = channel_b .basic_consume( "email", @@ -35,21 +39,24 @@ async fn main() -> Result<()> { ) .await?; + // watch for incoming messages while let Some(delivery) = consumer.next().await { let delivery = delivery.expect("error in consumer"); - let reply = delivery.properties.reply_to(); + let reply = delivery.properties.correlation_id(); log::info!("Reply id: {:?}", reply); + let data = serde_json::from_str::(from_utf8(&delivery.data).unwrap()).unwrap(); channel_b .basic_publish( "", - reply.clone().unwrap().as_str(), + delivery.properties.reply_to().clone().unwrap().as_str(), BasicPublishOptions::default(), - "asd".as_bytes(), - BasicProperties::default().with_reply_to(consumer.queue()), + format!("Sent email to {}", data.to()).as_bytes(), + BasicProperties::default(), ) .await .unwrap(); - info!("Received message {:?}", delivery); + delivery.ack(BasicAckOptions::default()).await.unwrap(); + info!("Received emailjob {:?} and replied back", data); } Ok(())