From 6e6449f80b0a804a35e63256a05cd1bd34a847de Mon Sep 17 00:00:00 2001 From: 4o1x5 <4o1x5@4o1x5.dev> Date: Sat, 26 Oct 2024 01:01:30 +0200 Subject: [PATCH] lib.rs: changed so non-rpc jobs are arcked/nacked based on result --- src/lib.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 216367c..9791945 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -93,6 +93,7 @@ impl BunBunWorker { .await .expect("basic_consume error"); + let channel = self.channel.clone(); let handler: Arc< dyn Fn( lapin::message::Delivery, @@ -101,15 +102,19 @@ impl BunBunWorker { + Sync, > = Arc::new(move |delivery: lapin::message::Delivery| { let state = Arc::clone(&state); + let channel = channel.clone(); Box::pin(async move { if let Ok(job) = J::decode(delivery.data.clone()) { // Running job match tokio::task::spawn(async move { job.run(state).await }).await { Err(error) => { - tracing::error!("Failed to run non-rpc job: {}", error) + tracing::error!("Failed to run non-rpc job: {}", error); + + let _ = delivery.nack(BasicNackOptions::default()).await; } Ok(_) => { - tracing::info!("Non-rpc job has finished.") + tracing::info!("Non-rpc job has finished."); + let _ = delivery.ack(BasicAckOptions::default()).await; } }; } else {