producer: sends msg and await response

This commit is contained in:
2005 2024-08-25 03:41:29 +02:00
parent 90906a701c
commit 3fffaf3bab

View file

@ -3,8 +3,10 @@ use std::{str::from_utf8, thread, time::Duration};
use common::EmailJobBuilder; use common::EmailJobBuilder;
use futures_lite::StreamExt; use futures_lite::StreamExt;
use lapin::{ use lapin::{
options::*, publisher_confirm::Confirmation, types::FieldTable, BasicProperties, Connection, options::*,
ConnectionProperties, Result, publisher_confirm::Confirmation,
types::{FieldTable, ShortString},
BasicProperties, Connection, ConnectionProperties, Result,
}; };
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use uuid::Uuid; use uuid::Uuid;
@ -28,6 +30,14 @@ async fn main() {
.content("test".into()) .content("test".into())
.build() .build()
.unwrap(); .unwrap();
let queue = channel_a
.queue_declare(
"email",
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
.unwrap();
let mut consumer = channel_a let mut consumer = channel_a
.basic_consume( .basic_consume(
"email", "email",
@ -38,44 +48,8 @@ async fn main() {
.await .await
.unwrap(); .unwrap();
let queue = channel_a let mut request = Uuid::new_v4();
.queue_declare(
"email",
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
.unwrap();
// Sending request to client
tokio::spawn(async move {
//
let mut c = channel_a
.basic_consume(
"email",
"",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.unwrap();
while let Some(delivery) = consumer.next().await {
let delivery = delivery.expect("error in consumer");
log::info!("Received feedback: {:?}", from_utf8(&delivery.data));
delivery.ack(BasicAckOptions::default()).await.expect("ack");
}
});
let mut consumer = channel_b
.basic_consume(
"email",
"",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.unwrap();
loop { loop {
let request = Uuid::new_v4();
let confirm = channel_b let confirm = channel_b
.basic_publish( .basic_publish(
"", "",
@ -90,7 +64,34 @@ async fn main() {
.unwrap() .unwrap()
.await .await
.unwrap(); .unwrap();
while let Some(delivery) = consumer.next().await {
if let Ok(delivery) = delivery {
if delivery
.properties
.correlation_id()
.as_ref()
.unwrap_or(&ShortString::from("asd"))
.eq(&ShortString::from(request.to_string()))
{
log::info!(
"Received data from consumer: {:?}",
from_utf8(&delivery.data)
);
break;
} else {
log::info!(
"Received data from consumer: {:?}",
from_utf8(&delivery.data)
);
log::error!("Received invalid delivery");
break;
}
}
}
request = Uuid::new_v4();
log::info!("Sent request"); log::info!("Sent request");
thread::sleep(Duration::from_secs(1)) thread::sleep(Duration::from_secs(1))
} }
} }