From 3fffaf3babbda4437c2f2e7110bf4ae5eb62180b Mon Sep 17 00:00:00 2001 From: 4o1x5 <4o1x5@4o1x5.dev> Date: Sun, 25 Aug 2024 03:41:29 +0200 Subject: [PATCH] producer: sends msg and await response --- producer/src/main.rs | 79 ++++++++++++++++++++++---------------------- 1 file changed, 40 insertions(+), 39 deletions(-) diff --git a/producer/src/main.rs b/producer/src/main.rs index 1e531dd..6044285 100644 --- a/producer/src/main.rs +++ b/producer/src/main.rs @@ -3,8 +3,10 @@ use std::{str::from_utf8, thread, time::Duration}; use common::EmailJobBuilder; use futures_lite::StreamExt; use lapin::{ - options::*, publisher_confirm::Confirmation, types::FieldTable, BasicProperties, Connection, - ConnectionProperties, Result, + options::*, + publisher_confirm::Confirmation, + types::{FieldTable, ShortString}, + BasicProperties, Connection, ConnectionProperties, Result, }; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use uuid::Uuid; @@ -28,6 +30,14 @@ async fn main() { .content("test".into()) .build() .unwrap(); + let queue = channel_a + .queue_declare( + "email", + QueueDeclareOptions::default(), + FieldTable::default(), + ) + .await + .unwrap(); let mut consumer = channel_a .basic_consume( "email", @@ -38,44 +48,8 @@ async fn main() { .await .unwrap(); - let queue = channel_a - .queue_declare( - "email", - QueueDeclareOptions::default(), - FieldTable::default(), - ) - .await - .unwrap(); - // Sending request to client - tokio::spawn(async move { - // - let mut c = channel_a - .basic_consume( - "email", - "", - BasicConsumeOptions::default(), - FieldTable::default(), - ) - .await - .unwrap(); - - while let Some(delivery) = consumer.next().await { - let delivery = delivery.expect("error in consumer"); - log::info!("Received feedback: {:?}", from_utf8(&delivery.data)); - delivery.ack(BasicAckOptions::default()).await.expect("ack"); - } - }); - let mut consumer = channel_b - .basic_consume( - "email", - "", - BasicConsumeOptions::default(), - FieldTable::default(), - ) - .await - .unwrap(); + let mut request = Uuid::new_v4(); loop { - let request = Uuid::new_v4(); let confirm = channel_b .basic_publish( "", @@ -90,7 +64,34 @@ async fn main() { .unwrap() .await .unwrap(); + while let Some(delivery) = consumer.next().await { + if let Ok(delivery) = delivery { + if delivery + .properties + .correlation_id() + .as_ref() + .unwrap_or(&ShortString::from("asd")) + .eq(&ShortString::from(request.to_string())) + { + log::info!( + "Received data from consumer: {:?}", + from_utf8(&delivery.data) + ); + break; + } else { + log::info!( + "Received data from consumer: {:?}", + from_utf8(&delivery.data) + ); + log::error!("Received invalid delivery"); + break; + } + } + } + + request = Uuid::new_v4(); log::info!("Sent request"); + thread::sleep(Duration::from_secs(1)) } }