Compare commits
2 commits
953f7c154b
...
3fffaf3bab
Author | SHA1 | Date | |
---|---|---|---|
2005 | 3fffaf3bab | ||
2005 | 90906a701c |
|
@ -1,9 +1,12 @@
|
||||||
use std::str::from_utf8;
|
use std::str::from_utf8;
|
||||||
|
|
||||||
|
use common::EmailJob;
|
||||||
use futures_lite::stream::StreamExt;
|
use futures_lite::stream::StreamExt;
|
||||||
use lapin::{
|
use lapin::{
|
||||||
options::*, publisher_confirm::Confirmation, types::FieldTable, BasicProperties, Connection,
|
options::*,
|
||||||
ConnectionProperties, Result,
|
publisher_confirm::Confirmation,
|
||||||
|
types::{FieldTable, ShortString},
|
||||||
|
BasicProperties, Connection, ConnectionProperties, Result,
|
||||||
};
|
};
|
||||||
use log::*;
|
use log::*;
|
||||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||||
|
@ -26,6 +29,7 @@ async fn main() -> Result<()> {
|
||||||
info!("CONNECTED");
|
info!("CONNECTED");
|
||||||
let channel_b = conn.create_channel().await.unwrap();
|
let channel_b = conn.create_channel().await.unwrap();
|
||||||
|
|
||||||
|
// declare a consumer
|
||||||
let mut consumer = channel_b
|
let mut consumer = channel_b
|
||||||
.basic_consume(
|
.basic_consume(
|
||||||
"email",
|
"email",
|
||||||
|
@ -35,21 +39,24 @@ async fn main() -> Result<()> {
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
// watch for incoming messages
|
||||||
while let Some(delivery) = consumer.next().await {
|
while let Some(delivery) = consumer.next().await {
|
||||||
let delivery = delivery.expect("error in consumer");
|
let delivery = delivery.expect("error in consumer");
|
||||||
let reply = delivery.properties.reply_to();
|
let reply = delivery.properties.correlation_id();
|
||||||
log::info!("Reply id: {:?}", reply);
|
log::info!("Reply id: {:?}", reply);
|
||||||
|
let data = serde_json::from_str::<EmailJob>(from_utf8(&delivery.data).unwrap()).unwrap();
|
||||||
channel_b
|
channel_b
|
||||||
.basic_publish(
|
.basic_publish(
|
||||||
"",
|
"",
|
||||||
reply.clone().unwrap().as_str(),
|
delivery.properties.reply_to().clone().unwrap().as_str(),
|
||||||
BasicPublishOptions::default(),
|
BasicPublishOptions::default(),
|
||||||
"asd".as_bytes(),
|
format!("Sent email to {}", data.to()).as_bytes(),
|
||||||
BasicProperties::default().with_reply_to(consumer.queue()),
|
BasicProperties::default(),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
info!("Received message {:?}", delivery);
|
delivery.ack(BasicAckOptions::default()).await.unwrap();
|
||||||
|
info!("Received emailjob {:?} and replied back", data);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
@ -3,8 +3,10 @@ use std::{str::from_utf8, thread, time::Duration};
|
||||||
use common::EmailJobBuilder;
|
use common::EmailJobBuilder;
|
||||||
use futures_lite::StreamExt;
|
use futures_lite::StreamExt;
|
||||||
use lapin::{
|
use lapin::{
|
||||||
options::*, publisher_confirm::Confirmation, types::FieldTable, BasicProperties, Connection,
|
options::*,
|
||||||
ConnectionProperties, Result,
|
publisher_confirm::Confirmation,
|
||||||
|
types::{FieldTable, ShortString},
|
||||||
|
BasicProperties, Connection, ConnectionProperties, Result,
|
||||||
};
|
};
|
||||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
@ -28,6 +30,14 @@ async fn main() {
|
||||||
.content("test".into())
|
.content("test".into())
|
||||||
.build()
|
.build()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
let queue = channel_a
|
||||||
|
.queue_declare(
|
||||||
|
"email",
|
||||||
|
QueueDeclareOptions::default(),
|
||||||
|
FieldTable::default(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
let mut consumer = channel_a
|
let mut consumer = channel_a
|
||||||
.basic_consume(
|
.basic_consume(
|
||||||
"email",
|
"email",
|
||||||
|
@ -38,44 +48,8 @@ async fn main() {
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let queue = channel_a
|
let mut request = Uuid::new_v4();
|
||||||
.queue_declare(
|
|
||||||
"email",
|
|
||||||
QueueDeclareOptions::default(),
|
|
||||||
FieldTable::default(),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
// Sending request to client
|
|
||||||
tokio::spawn(async move {
|
|
||||||
//
|
|
||||||
let mut c = channel_a
|
|
||||||
.basic_consume(
|
|
||||||
"email",
|
|
||||||
"",
|
|
||||||
BasicConsumeOptions::default(),
|
|
||||||
FieldTable::default(),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
while let Some(delivery) = consumer.next().await {
|
|
||||||
let delivery = delivery.expect("error in consumer");
|
|
||||||
log::info!("Received feedback: {:?}", from_utf8(&delivery.data));
|
|
||||||
delivery.ack(BasicAckOptions::default()).await.expect("ack");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
let mut consumer = channel_b
|
|
||||||
.basic_consume(
|
|
||||||
"email",
|
|
||||||
"",
|
|
||||||
BasicConsumeOptions::default(),
|
|
||||||
FieldTable::default(),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
loop {
|
loop {
|
||||||
let request = Uuid::new_v4();
|
|
||||||
let confirm = channel_b
|
let confirm = channel_b
|
||||||
.basic_publish(
|
.basic_publish(
|
||||||
"",
|
"",
|
||||||
|
@ -90,7 +64,34 @@ async fn main() {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
while let Some(delivery) = consumer.next().await {
|
||||||
|
if let Ok(delivery) = delivery {
|
||||||
|
if delivery
|
||||||
|
.properties
|
||||||
|
.correlation_id()
|
||||||
|
.as_ref()
|
||||||
|
.unwrap_or(&ShortString::from("asd"))
|
||||||
|
.eq(&ShortString::from(request.to_string()))
|
||||||
|
{
|
||||||
|
log::info!(
|
||||||
|
"Received data from consumer: {:?}",
|
||||||
|
from_utf8(&delivery.data)
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
log::info!(
|
||||||
|
"Received data from consumer: {:?}",
|
||||||
|
from_utf8(&delivery.data)
|
||||||
|
);
|
||||||
|
log::error!("Received invalid delivery");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
request = Uuid::new_v4();
|
||||||
log::info!("Sent request");
|
log::info!("Sent request");
|
||||||
|
|
||||||
thread::sleep(Duration::from_secs(1))
|
thread::sleep(Duration::from_secs(1))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue