From 953f7c154bcdb5a23bc5bf5eee7f9cd529b1de8f Mon Sep 17 00:00:00 2001 From: 4o1x5 <4o1x5@4o1x5.dev> Date: Sun, 25 Aug 2024 02:43:29 +0200 Subject: [PATCH] switched to lapin if both clients run at the same time the server producer seems to acknowledge the feedback but any time the producer dies the consumer begins to flood the server --- consumer/src/main.rs | 109 ++++++++++++++++++------------------------- producer/src/main.rs | 65 +++++++++++++++++++++----- 2 files changed, 100 insertions(+), 74 deletions(-) diff --git a/consumer/src/main.rs b/consumer/src/main.rs index b34a10d..72c9ef4 100644 --- a/consumer/src/main.rs +++ b/consumer/src/main.rs @@ -1,73 +1,56 @@ -// Port of https://www.rabbitmq.com/tutorials/tutorial-six-python.html. Start this -// example in one shell, then the rpc_client example in another. -use amiquip::{ - AmqpProperties, Connection, ConsumerMessage, ConsumerOptions, Exchange, Publish, - QueueDeclareOptions, Result, +use std::str::from_utf8; + +use futures_lite::stream::StreamExt; +use lapin::{ + options::*, publisher_confirm::Confirmation, types::FieldTable, BasicProperties, Connection, + ConnectionProperties, Result, }; +use log::*; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; -fn fib(n: u64) -> u64 { - match n { - 0 => 0, - 1 => 1, - n => fib(n - 1) + fib(n - 2), - } -} - -fn main() -> Result<()> { - env_logger::init(); - +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::registry() + .with(tracing_subscriber::EnvFilter::new( + std::env::var("RUST_LOG").unwrap_or_else(|_| "debug".into()), + )) + .with(tracing_subscriber::fmt::layer()) + .init(); // Open connection. - let mut connection = Connection::insecure_open("amqp://guest:guest@localhost:5672")?; - // Open a channel - None says let the library choose the channel ID. - let channel = connection.open_channel(None)?; + let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into()); + let conn = Connection::connect(&addr, ConnectionProperties::default()) + .await + .unwrap(); - // Get a handle to the default direct exchange. - let exchange = Exchange::direct(&channel); + info!("CONNECTED"); + let channel_b = conn.create_channel().await.unwrap(); - // Declare the queue that will receive RPC requests. - let queue = channel.queue_declare("rpc_queue", QueueDeclareOptions::default())?; + let mut consumer = channel_b + .basic_consume( + "email", + "", + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await?; - // Start a consumer. - let consumer = queue.consume(ConsumerOptions::default())?; - println!("Awaiting RPC requests"); - - for (i, message) in consumer.receiver().iter().enumerate() { - match message { - ConsumerMessage::Delivery(delivery) => { - let body = String::from_utf8_lossy(&delivery.body); - println!("({:>3}) fib({})", i, body); - - let (reply_to, corr_id) = match ( - delivery.properties.reply_to(), - delivery.properties.correlation_id(), - ) { - (Some(r), Some(c)) => (r.clone(), c.clone()), - _ => { - println!("received delivery without reply_to or correlation_id"); - consumer.ack(delivery)?; - continue; - } - }; - - let response = match body.parse() { - Ok(n) => format!("{}", fib(n)), - Err(_) => "invalid input".to_string(), - }; - - exchange.publish(Publish::with_properties( - response.as_bytes(), - reply_to, - AmqpProperties::default().with_correlation_id(corr_id), - ))?; - consumer.ack(delivery)?; - } - other => { - println!("Consumer ended: {:?}", other); - break; - } - } + while let Some(delivery) = consumer.next().await { + let delivery = delivery.expect("error in consumer"); + let reply = delivery.properties.reply_to(); + log::info!("Reply id: {:?}", reply); + channel_b + .basic_publish( + "", + reply.clone().unwrap().as_str(), + BasicPublishOptions::default(), + "asd".as_bytes(), + BasicProperties::default().with_reply_to(consumer.queue()), + ) + .await + .unwrap(); + info!("Received message {:?}", delivery); } - connection.close() + Ok(()) } diff --git a/producer/src/main.rs b/producer/src/main.rs index 1885a8c..1e531dd 100644 --- a/producer/src/main.rs +++ b/producer/src/main.rs @@ -1,6 +1,7 @@ -use std::{thread, time::Duration}; +use std::{str::from_utf8, thread, time::Duration}; use common::EmailJobBuilder; +use futures_lite::StreamExt; use lapin::{ options::*, publisher_confirm::Confirmation, types::FieldTable, BasicProperties, Connection, ConnectionProperties, Result, @@ -20,34 +21,76 @@ async fn main() { .await .unwrap(); let channel_a = conn.create_channel().await.unwrap(); + let channel_b = conn.create_channel().await.unwrap(); let email = EmailJobBuilder::default() .to("someone@example.com".into()) .content("test".into()) .build() .unwrap(); + let mut consumer = channel_a + .basic_consume( + "email", + "", + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await + .unwrap(); + let queue = channel_a + .queue_declare( + "email", + QueueDeclareOptions::default(), + FieldTable::default(), + ) + .await + .unwrap(); + // Sending request to client + tokio::spawn(async move { + // + let mut c = channel_a + .basic_consume( + "email", + "", + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await + .unwrap(); + + while let Some(delivery) = consumer.next().await { + let delivery = delivery.expect("error in consumer"); + log::info!("Received feedback: {:?}", from_utf8(&delivery.data)); + delivery.ack(BasicAckOptions::default()).await.expect("ack"); + } + }); + let mut consumer = channel_b + .basic_consume( + "email", + "", + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await + .unwrap(); loop { let request = Uuid::new_v4(); - let confirm = channel_a + let confirm = channel_b .basic_publish( "", "email", BasicPublishOptions::default(), serde_json::to_string(&email).unwrap().as_bytes(), - BasicProperties::default().with_correlation_id(request.to_string().into()), + BasicProperties::default() + .with_correlation_id(request.to_string().into()) + .with_reply_to(consumer.queue()), ) .await .unwrap() .await .unwrap(); - - log::info!( - "Got response for :{} \n {:?}", - request, - confirm.take_message() - ); - - thread::sleep(Duration::from_millis(250)) + log::info!("Sent request"); + thread::sleep(Duration::from_secs(1)) } }