diff --git a/consumer/src/main.rs b/consumer/src/main.rs index b34a10d..72c9ef4 100644 --- a/consumer/src/main.rs +++ b/consumer/src/main.rs @@ -1,73 +1,56 @@ -// Port of https://www.rabbitmq.com/tutorials/tutorial-six-python.html. Start this -// example in one shell, then the rpc_client example in another. -use amiquip::{ - AmqpProperties, Connection, ConsumerMessage, ConsumerOptions, Exchange, Publish, - QueueDeclareOptions, Result, +use std::str::from_utf8; + +use futures_lite::stream::StreamExt; +use lapin::{ + options::*, publisher_confirm::Confirmation, types::FieldTable, BasicProperties, Connection, + ConnectionProperties, Result, }; +use log::*; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; -fn fib(n: u64) -> u64 { - match n { - 0 => 0, - 1 => 1, - n => fib(n - 1) + fib(n - 2), - } -} - -fn main() -> Result<()> { - env_logger::init(); - +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::registry() + .with(tracing_subscriber::EnvFilter::new( + std::env::var("RUST_LOG").unwrap_or_else(|_| "debug".into()), + )) + .with(tracing_subscriber::fmt::layer()) + .init(); // Open connection. - let mut connection = Connection::insecure_open("amqp://guest:guest@localhost:5672")?; - // Open a channel - None says let the library choose the channel ID. - let channel = connection.open_channel(None)?; + let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into()); + let conn = Connection::connect(&addr, ConnectionProperties::default()) + .await + .unwrap(); - // Get a handle to the default direct exchange. - let exchange = Exchange::direct(&channel); + info!("CONNECTED"); + let channel_b = conn.create_channel().await.unwrap(); - // Declare the queue that will receive RPC requests. - let queue = channel.queue_declare("rpc_queue", QueueDeclareOptions::default())?; + let mut consumer = channel_b + .basic_consume( + "email", + "", + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await?; - // Start a consumer. - let consumer = queue.consume(ConsumerOptions::default())?; - println!("Awaiting RPC requests"); - - for (i, message) in consumer.receiver().iter().enumerate() { - match message { - ConsumerMessage::Delivery(delivery) => { - let body = String::from_utf8_lossy(&delivery.body); - println!("({:>3}) fib({})", i, body); - - let (reply_to, corr_id) = match ( - delivery.properties.reply_to(), - delivery.properties.correlation_id(), - ) { - (Some(r), Some(c)) => (r.clone(), c.clone()), - _ => { - println!("received delivery without reply_to or correlation_id"); - consumer.ack(delivery)?; - continue; - } - }; - - let response = match body.parse() { - Ok(n) => format!("{}", fib(n)), - Err(_) => "invalid input".to_string(), - }; - - exchange.publish(Publish::with_properties( - response.as_bytes(), - reply_to, - AmqpProperties::default().with_correlation_id(corr_id), - ))?; - consumer.ack(delivery)?; - } - other => { - println!("Consumer ended: {:?}", other); - break; - } - } + while let Some(delivery) = consumer.next().await { + let delivery = delivery.expect("error in consumer"); + let reply = delivery.properties.reply_to(); + log::info!("Reply id: {:?}", reply); + channel_b + .basic_publish( + "", + reply.clone().unwrap().as_str(), + BasicPublishOptions::default(), + "asd".as_bytes(), + BasicProperties::default().with_reply_to(consumer.queue()), + ) + .await + .unwrap(); + info!("Received message {:?}", delivery); } - connection.close() + Ok(()) } diff --git a/producer/src/main.rs b/producer/src/main.rs index 1885a8c..1e531dd 100644 --- a/producer/src/main.rs +++ b/producer/src/main.rs @@ -1,6 +1,7 @@ -use std::{thread, time::Duration}; +use std::{str::from_utf8, thread, time::Duration}; use common::EmailJobBuilder; +use futures_lite::StreamExt; use lapin::{ options::*, publisher_confirm::Confirmation, types::FieldTable, BasicProperties, Connection, ConnectionProperties, Result, @@ -20,34 +21,76 @@ async fn main() { .await .unwrap(); let channel_a = conn.create_channel().await.unwrap(); + let channel_b = conn.create_channel().await.unwrap(); let email = EmailJobBuilder::default() .to("someone@example.com".into()) .content("test".into()) .build() .unwrap(); + let mut consumer = channel_a + .basic_consume( + "email", + "", + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await + .unwrap(); + let queue = channel_a + .queue_declare( + "email", + QueueDeclareOptions::default(), + FieldTable::default(), + ) + .await + .unwrap(); + // Sending request to client + tokio::spawn(async move { + // + let mut c = channel_a + .basic_consume( + "email", + "", + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await + .unwrap(); + + while let Some(delivery) = consumer.next().await { + let delivery = delivery.expect("error in consumer"); + log::info!("Received feedback: {:?}", from_utf8(&delivery.data)); + delivery.ack(BasicAckOptions::default()).await.expect("ack"); + } + }); + let mut consumer = channel_b + .basic_consume( + "email", + "", + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await + .unwrap(); loop { let request = Uuid::new_v4(); - let confirm = channel_a + let confirm = channel_b .basic_publish( "", "email", BasicPublishOptions::default(), serde_json::to_string(&email).unwrap().as_bytes(), - BasicProperties::default().with_correlation_id(request.to_string().into()), + BasicProperties::default() + .with_correlation_id(request.to_string().into()) + .with_reply_to(consumer.queue()), ) .await .unwrap() .await .unwrap(); - - log::info!( - "Got response for :{} \n {:?}", - request, - confirm.take_message() - ); - - thread::sleep(Duration::from_millis(250)) + log::info!("Sent request"); + thread::sleep(Duration::from_secs(1)) } }