Compare commits
No commits in common. "3fffaf3babbda4437c2f2e7110bf4ae5eb62180b" and "953f7c154bcdb5a23bc5bf5eee7f9cd529b1de8f" have entirely different histories.
3fffaf3bab
...
953f7c154b
|
@ -1,12 +1,9 @@
|
|||
use std::str::from_utf8;
|
||||
|
||||
use common::EmailJob;
|
||||
use futures_lite::stream::StreamExt;
|
||||
use lapin::{
|
||||
options::*,
|
||||
publisher_confirm::Confirmation,
|
||||
types::{FieldTable, ShortString},
|
||||
BasicProperties, Connection, ConnectionProperties, Result,
|
||||
options::*, publisher_confirm::Confirmation, types::FieldTable, BasicProperties, Connection,
|
||||
ConnectionProperties, Result,
|
||||
};
|
||||
use log::*;
|
||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||
|
@ -29,7 +26,6 @@ async fn main() -> Result<()> {
|
|||
info!("CONNECTED");
|
||||
let channel_b = conn.create_channel().await.unwrap();
|
||||
|
||||
// declare a consumer
|
||||
let mut consumer = channel_b
|
||||
.basic_consume(
|
||||
"email",
|
||||
|
@ -39,24 +35,21 @@ async fn main() -> Result<()> {
|
|||
)
|
||||
.await?;
|
||||
|
||||
// watch for incoming messages
|
||||
while let Some(delivery) = consumer.next().await {
|
||||
let delivery = delivery.expect("error in consumer");
|
||||
let reply = delivery.properties.correlation_id();
|
||||
let reply = delivery.properties.reply_to();
|
||||
log::info!("Reply id: {:?}", reply);
|
||||
let data = serde_json::from_str::<EmailJob>(from_utf8(&delivery.data).unwrap()).unwrap();
|
||||
channel_b
|
||||
.basic_publish(
|
||||
"",
|
||||
delivery.properties.reply_to().clone().unwrap().as_str(),
|
||||
reply.clone().unwrap().as_str(),
|
||||
BasicPublishOptions::default(),
|
||||
format!("Sent email to {}", data.to()).as_bytes(),
|
||||
BasicProperties::default(),
|
||||
"asd".as_bytes(),
|
||||
BasicProperties::default().with_reply_to(consumer.queue()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
delivery.ack(BasicAckOptions::default()).await.unwrap();
|
||||
info!("Received emailjob {:?} and replied back", data);
|
||||
info!("Received message {:?}", delivery);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
|
@ -3,10 +3,8 @@ use std::{str::from_utf8, thread, time::Duration};
|
|||
use common::EmailJobBuilder;
|
||||
use futures_lite::StreamExt;
|
||||
use lapin::{
|
||||
options::*,
|
||||
publisher_confirm::Confirmation,
|
||||
types::{FieldTable, ShortString},
|
||||
BasicProperties, Connection, ConnectionProperties, Result,
|
||||
options::*, publisher_confirm::Confirmation, types::FieldTable, BasicProperties, Connection,
|
||||
ConnectionProperties, Result,
|
||||
};
|
||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||
use uuid::Uuid;
|
||||
|
@ -30,14 +28,6 @@ async fn main() {
|
|||
.content("test".into())
|
||||
.build()
|
||||
.unwrap();
|
||||
let queue = channel_a
|
||||
.queue_declare(
|
||||
"email",
|
||||
QueueDeclareOptions::default(),
|
||||
FieldTable::default(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut consumer = channel_a
|
||||
.basic_consume(
|
||||
"email",
|
||||
|
@ -48,8 +38,44 @@ async fn main() {
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut request = Uuid::new_v4();
|
||||
let queue = channel_a
|
||||
.queue_declare(
|
||||
"email",
|
||||
QueueDeclareOptions::default(),
|
||||
FieldTable::default(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
// Sending request to client
|
||||
tokio::spawn(async move {
|
||||
//
|
||||
let mut c = channel_a
|
||||
.basic_consume(
|
||||
"email",
|
||||
"",
|
||||
BasicConsumeOptions::default(),
|
||||
FieldTable::default(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
while let Some(delivery) = consumer.next().await {
|
||||
let delivery = delivery.expect("error in consumer");
|
||||
log::info!("Received feedback: {:?}", from_utf8(&delivery.data));
|
||||
delivery.ack(BasicAckOptions::default()).await.expect("ack");
|
||||
}
|
||||
});
|
||||
let mut consumer = channel_b
|
||||
.basic_consume(
|
||||
"email",
|
||||
"",
|
||||
BasicConsumeOptions::default(),
|
||||
FieldTable::default(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
loop {
|
||||
let request = Uuid::new_v4();
|
||||
let confirm = channel_b
|
||||
.basic_publish(
|
||||
"",
|
||||
|
@ -64,34 +90,7 @@ async fn main() {
|
|||
.unwrap()
|
||||
.await
|
||||
.unwrap();
|
||||
while let Some(delivery) = consumer.next().await {
|
||||
if let Ok(delivery) = delivery {
|
||||
if delivery
|
||||
.properties
|
||||
.correlation_id()
|
||||
.as_ref()
|
||||
.unwrap_or(&ShortString::from("asd"))
|
||||
.eq(&ShortString::from(request.to_string()))
|
||||
{
|
||||
log::info!(
|
||||
"Received data from consumer: {:?}",
|
||||
from_utf8(&delivery.data)
|
||||
);
|
||||
break;
|
||||
} else {
|
||||
log::info!(
|
||||
"Received data from consumer: {:?}",
|
||||
from_utf8(&delivery.data)
|
||||
);
|
||||
log::error!("Received invalid delivery");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
request = Uuid::new_v4();
|
||||
log::info!("Sent request");
|
||||
|
||||
thread::sleep(Duration::from_secs(1))
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue