switched to lapin

if both clients run at the same time the server producer seems to acknowledge the feedback but any time the producer dies the consumer begins to flood the server
This commit is contained in:
Barna Máté 2024-08-25 02:43:29 +02:00
parent 8045480513
commit 953f7c154b
2 changed files with 100 additions and 74 deletions

View file

@ -1,73 +1,56 @@
// Port of https://www.rabbitmq.com/tutorials/tutorial-six-python.html. Start this
// example in one shell, then the rpc_client example in another.
use amiquip::{
AmqpProperties, Connection, ConsumerMessage, ConsumerOptions, Exchange, Publish,
QueueDeclareOptions, Result,
use std::str::from_utf8;
use futures_lite::stream::StreamExt;
use lapin::{
options::*, publisher_confirm::Confirmation, types::FieldTable, BasicProperties, Connection,
ConnectionProperties, Result,
};
use log::*;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
fn fib(n: u64) -> u64 {
match n {
0 => 0,
1 => 1,
n => fib(n - 1) + fib(n - 2),
}
}
fn main() -> Result<()> {
env_logger::init();
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new(
std::env::var("RUST_LOG").unwrap_or_else(|_| "debug".into()),
))
.with(tracing_subscriber::fmt::layer())
.init();
// Open connection.
let mut connection = Connection::insecure_open("amqp://guest:guest@localhost:5672")?;
// Open a channel - None says let the library choose the channel ID.
let channel = connection.open_channel(None)?;
let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
let conn = Connection::connect(&addr, ConnectionProperties::default())
.await
.unwrap();
// Get a handle to the default direct exchange.
let exchange = Exchange::direct(&channel);
info!("CONNECTED");
let channel_b = conn.create_channel().await.unwrap();
// Declare the queue that will receive RPC requests.
let queue = channel.queue_declare("rpc_queue", QueueDeclareOptions::default())?;
let mut consumer = channel_b
.basic_consume(
"email",
"",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await?;
// Start a consumer.
let consumer = queue.consume(ConsumerOptions::default())?;
println!("Awaiting RPC requests");
for (i, message) in consumer.receiver().iter().enumerate() {
match message {
ConsumerMessage::Delivery(delivery) => {
let body = String::from_utf8_lossy(&delivery.body);
println!("({:>3}) fib({})", i, body);
let (reply_to, corr_id) = match (
delivery.properties.reply_to(),
delivery.properties.correlation_id(),
) {
(Some(r), Some(c)) => (r.clone(), c.clone()),
_ => {
println!("received delivery without reply_to or correlation_id");
consumer.ack(delivery)?;
continue;
}
};
let response = match body.parse() {
Ok(n) => format!("{}", fib(n)),
Err(_) => "invalid input".to_string(),
};
exchange.publish(Publish::with_properties(
response.as_bytes(),
reply_to,
AmqpProperties::default().with_correlation_id(corr_id),
))?;
consumer.ack(delivery)?;
}
other => {
println!("Consumer ended: {:?}", other);
break;
}
}
while let Some(delivery) = consumer.next().await {
let delivery = delivery.expect("error in consumer");
let reply = delivery.properties.reply_to();
log::info!("Reply id: {:?}", reply);
channel_b
.basic_publish(
"",
reply.clone().unwrap().as_str(),
BasicPublishOptions::default(),
"asd".as_bytes(),
BasicProperties::default().with_reply_to(consumer.queue()),
)
.await
.unwrap();
info!("Received message {:?}", delivery);
}
connection.close()
Ok(())
}

View file

@ -1,6 +1,7 @@
use std::{thread, time::Duration};
use std::{str::from_utf8, thread, time::Duration};
use common::EmailJobBuilder;
use futures_lite::StreamExt;
use lapin::{
options::*, publisher_confirm::Confirmation, types::FieldTable, BasicProperties, Connection,
ConnectionProperties, Result,
@ -20,34 +21,76 @@ async fn main() {
.await
.unwrap();
let channel_a = conn.create_channel().await.unwrap();
let channel_b = conn.create_channel().await.unwrap();
let email = EmailJobBuilder::default()
.to("someone@example.com".into())
.content("test".into())
.build()
.unwrap();
let mut consumer = channel_a
.basic_consume(
"email",
"",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.unwrap();
let queue = channel_a
.queue_declare(
"email",
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
.unwrap();
// Sending request to client
tokio::spawn(async move {
//
let mut c = channel_a
.basic_consume(
"email",
"",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.unwrap();
while let Some(delivery) = consumer.next().await {
let delivery = delivery.expect("error in consumer");
log::info!("Received feedback: {:?}", from_utf8(&delivery.data));
delivery.ack(BasicAckOptions::default()).await.expect("ack");
}
});
let mut consumer = channel_b
.basic_consume(
"email",
"",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.unwrap();
loop {
let request = Uuid::new_v4();
let confirm = channel_a
let confirm = channel_b
.basic_publish(
"",
"email",
BasicPublishOptions::default(),
serde_json::to_string(&email).unwrap().as_bytes(),
BasicProperties::default().with_correlation_id(request.to_string().into()),
BasicProperties::default()
.with_correlation_id(request.to_string().into())
.with_reply_to(consumer.queue()),
)
.await
.unwrap()
.await
.unwrap();
log::info!(
"Got response for :{} \n {:?}",
request,
confirm.take_message()
);
thread::sleep(Duration::from_millis(250))
log::info!("Sent request");
thread::sleep(Duration::from_secs(1))
}
}