diff --git a/src/lib.rs b/src/lib.rs index 216367c..9791945 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -93,6 +93,7 @@ impl BunBunWorker { .await .expect("basic_consume error"); + let channel = self.channel.clone(); let handler: Arc< dyn Fn( lapin::message::Delivery, @@ -101,15 +102,19 @@ impl BunBunWorker { + Sync, > = Arc::new(move |delivery: lapin::message::Delivery| { let state = Arc::clone(&state); + let channel = channel.clone(); Box::pin(async move { if let Ok(job) = J::decode(delivery.data.clone()) { // Running job match tokio::task::spawn(async move { job.run(state).await }).await { Err(error) => { - tracing::error!("Failed to run non-rpc job: {}", error) + tracing::error!("Failed to run non-rpc job: {}", error); + + let _ = delivery.nack(BasicNackOptions::default()).await; } Ok(_) => { - tracing::info!("Non-rpc job has finished.") + tracing::info!("Non-rpc job has finished."); + let _ = delivery.ack(BasicAckOptions::default()).await; } }; } else {