diff --git a/src/client.rs b/src/client.rs index 662233a..b42d27b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -60,7 +60,7 @@ impl BunBunClient { exclusive: true, ..Default::default() }, - FieldTable::default(), + create_timeout_headers(), ) .await .unwrap(); @@ -70,6 +70,7 @@ impl BunBunClient { "Creating consumer to listen for error/result messages {}", callback_queue.name() ); + let mut consumer = channel .basic_consume( callback_queue.name().as_str(), @@ -246,6 +247,12 @@ impl BunBunClient { } Ok(confirmation) => { let _ = channel.close(0, "byebye").await; + tracing::info!( + "Sent nonRPC job of type {} to channel {} Ack: {}", + std::any::type_name::(), + queue_name, + confirmation.is_ack() + ); tracing::debug!( "AMQP confirmed dispatch of job | Acknowledged? {}", confirmation.is_ack() @@ -288,3 +295,10 @@ pub trait RPCClientTask: Sized + Debug + DeserializeOwned { format!("{:?}", self) } } + +fn create_timeout_headers() -> FieldTable { + let mut table = FieldTable::default(); + // 60 second expiry + table.insert("x-expires".into(), lapin::types::AMQPValue::LongInt(6_000)); + table +} diff --git a/src/lib.rs b/src/lib.rs index 9791945..2964dfc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -328,7 +328,10 @@ pub trait NonRPCServerTask: Sized + Debug + DeserializeOwned { return Err(RabbitDecodeError::NotUtf8); } Ok(data) => match serde_json::from_str::(data) { - Err(_) => return Err(RabbitDecodeError::NotJson), + Err(e) => { + tracing::error!("Failed to decode job: {e} \n {:?}", data); + return Err(RabbitDecodeError::NotJson); + } Ok(data) => data, }, };