lib.rs: changed so non-rpc jobs are arcked/nacked based on result

This commit is contained in:
2005 2024-10-26 01:01:30 +02:00
parent f6c2998d64
commit 6e6449f80b

View file

@ -93,6 +93,7 @@ impl BunBunWorker {
.await .await
.expect("basic_consume error"); .expect("basic_consume error");
let channel = self.channel.clone();
let handler: Arc< let handler: Arc<
dyn Fn( dyn Fn(
lapin::message::Delivery, lapin::message::Delivery,
@ -101,15 +102,19 @@ impl BunBunWorker {
+ Sync, + Sync,
> = Arc::new(move |delivery: lapin::message::Delivery| { > = Arc::new(move |delivery: lapin::message::Delivery| {
let state = Arc::clone(&state); let state = Arc::clone(&state);
let channel = channel.clone();
Box::pin(async move { Box::pin(async move {
if let Ok(job) = J::decode(delivery.data.clone()) { if let Ok(job) = J::decode(delivery.data.clone()) {
// Running job // Running job
match tokio::task::spawn(async move { job.run(state).await }).await { match tokio::task::spawn(async move { job.run(state).await }).await {
Err(error) => { Err(error) => {
tracing::error!("Failed to run non-rpc job: {}", error) tracing::error!("Failed to run non-rpc job: {}", error);
let _ = delivery.nack(BasicNackOptions::default()).await;
} }
Ok(_) => { Ok(_) => {
tracing::info!("Non-rpc job has finished.") tracing::info!("Non-rpc job has finished.");
let _ = delivery.ack(BasicAckOptions::default()).await;
} }
}; };
} else { } else {