some changes

This commit is contained in:
2005 2024-11-09 12:09:34 +01:00
parent 6e6449f80b
commit bfe15cfa43
2 changed files with 19 additions and 2 deletions

View file

@ -60,7 +60,7 @@ impl BunBunClient {
exclusive: true, exclusive: true,
..Default::default() ..Default::default()
}, },
FieldTable::default(), create_timeout_headers(),
) )
.await .await
.unwrap(); .unwrap();
@ -70,6 +70,7 @@ impl BunBunClient {
"Creating consumer to listen for error/result messages {}", "Creating consumer to listen for error/result messages {}",
callback_queue.name() callback_queue.name()
); );
let mut consumer = channel let mut consumer = channel
.basic_consume( .basic_consume(
callback_queue.name().as_str(), callback_queue.name().as_str(),
@ -246,6 +247,12 @@ impl BunBunClient {
} }
Ok(confirmation) => { Ok(confirmation) => {
let _ = channel.close(0, "byebye").await; let _ = channel.close(0, "byebye").await;
tracing::info!(
"Sent nonRPC job of type {} to channel {} Ack: {}",
std::any::type_name::<T>(),
queue_name,
confirmation.is_ack()
);
tracing::debug!( tracing::debug!(
"AMQP confirmed dispatch of job | Acknowledged? {}", "AMQP confirmed dispatch of job | Acknowledged? {}",
confirmation.is_ack() confirmation.is_ack()
@ -288,3 +295,10 @@ pub trait RPCClientTask: Sized + Debug + DeserializeOwned {
format!("{:?}", self) format!("{:?}", self)
} }
} }
fn create_timeout_headers() -> FieldTable {
let mut table = FieldTable::default();
// 60 second expiry
table.insert("x-expires".into(), lapin::types::AMQPValue::LongInt(6_000));
table
}

View file

@ -328,7 +328,10 @@ pub trait NonRPCServerTask: Sized + Debug + DeserializeOwned {
return Err(RabbitDecodeError::NotUtf8); return Err(RabbitDecodeError::NotUtf8);
} }
Ok(data) => match serde_json::from_str::<Self>(data) { Ok(data) => match serde_json::from_str::<Self>(data) {
Err(_) => return Err(RabbitDecodeError::NotJson), Err(e) => {
tracing::error!("Failed to decode job: {e} \n {:?}", data);
return Err(RabbitDecodeError::NotJson);
}
Ok(data) => data, Ok(data) => data,
}, },
}; };