Compare commits

..

No commits in common. "3fffaf3babbda4437c2f2e7110bf4ae5eb62180b" and "953f7c154bcdb5a23bc5bf5eee7f9cd529b1de8f" have entirely different histories.

2 changed files with 46 additions and 54 deletions

View file

@ -1,12 +1,9 @@
use std::str::from_utf8;
use common::EmailJob;
use futures_lite::stream::StreamExt;
use lapin::{
options::*,
publisher_confirm::Confirmation,
types::{FieldTable, ShortString},
BasicProperties, Connection, ConnectionProperties, Result,
options::*, publisher_confirm::Confirmation, types::FieldTable, BasicProperties, Connection,
ConnectionProperties, Result,
};
use log::*;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
@ -29,7 +26,6 @@ async fn main() -> Result<()> {
info!("CONNECTED");
let channel_b = conn.create_channel().await.unwrap();
// declare a consumer
let mut consumer = channel_b
.basic_consume(
"email",
@ -39,24 +35,21 @@ async fn main() -> Result<()> {
)
.await?;
// watch for incoming messages
while let Some(delivery) = consumer.next().await {
let delivery = delivery.expect("error in consumer");
let reply = delivery.properties.correlation_id();
let reply = delivery.properties.reply_to();
log::info!("Reply id: {:?}", reply);
let data = serde_json::from_str::<EmailJob>(from_utf8(&delivery.data).unwrap()).unwrap();
channel_b
.basic_publish(
"",
delivery.properties.reply_to().clone().unwrap().as_str(),
reply.clone().unwrap().as_str(),
BasicPublishOptions::default(),
format!("Sent email to {}", data.to()).as_bytes(),
BasicProperties::default(),
"asd".as_bytes(),
BasicProperties::default().with_reply_to(consumer.queue()),
)
.await
.unwrap();
delivery.ack(BasicAckOptions::default()).await.unwrap();
info!("Received emailjob {:?} and replied back", data);
info!("Received message {:?}", delivery);
}
Ok(())

View file

@ -3,10 +3,8 @@ use std::{str::from_utf8, thread, time::Duration};
use common::EmailJobBuilder;
use futures_lite::StreamExt;
use lapin::{
options::*,
publisher_confirm::Confirmation,
types::{FieldTable, ShortString},
BasicProperties, Connection, ConnectionProperties, Result,
options::*, publisher_confirm::Confirmation, types::FieldTable, BasicProperties, Connection,
ConnectionProperties, Result,
};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use uuid::Uuid;
@ -30,14 +28,6 @@ async fn main() {
.content("test".into())
.build()
.unwrap();
let queue = channel_a
.queue_declare(
"email",
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
.unwrap();
let mut consumer = channel_a
.basic_consume(
"email",
@ -48,8 +38,44 @@ async fn main() {
.await
.unwrap();
let mut request = Uuid::new_v4();
let queue = channel_a
.queue_declare(
"email",
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
.unwrap();
// Sending request to client
tokio::spawn(async move {
//
let mut c = channel_a
.basic_consume(
"email",
"",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.unwrap();
while let Some(delivery) = consumer.next().await {
let delivery = delivery.expect("error in consumer");
log::info!("Received feedback: {:?}", from_utf8(&delivery.data));
delivery.ack(BasicAckOptions::default()).await.expect("ack");
}
});
let mut consumer = channel_b
.basic_consume(
"email",
"",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.unwrap();
loop {
let request = Uuid::new_v4();
let confirm = channel_b
.basic_publish(
"",
@ -64,34 +90,7 @@ async fn main() {
.unwrap()
.await
.unwrap();
while let Some(delivery) = consumer.next().await {
if let Ok(delivery) = delivery {
if delivery
.properties
.correlation_id()
.as_ref()
.unwrap_or(&ShortString::from("asd"))
.eq(&ShortString::from(request.to_string()))
{
log::info!(
"Received data from consumer: {:?}",
from_utf8(&delivery.data)
);
break;
} else {
log::info!(
"Received data from consumer: {:?}",
from_utf8(&delivery.data)
);
log::error!("Received invalid delivery");
break;
}
}
}
request = Uuid::new_v4();
log::info!("Sent request");
thread::sleep(Duration::from_secs(1))
}
}