lib.rs: added non-rpc functions and methology to file

This commit is contained in:
2005 2024-10-24 22:10:12 +02:00
parent 184e50d156
commit ee4af91c42

View file

@ -1,4 +1,7 @@
use futures::{future::BoxFuture, FutureExt, StreamExt}; use futures::{
future::{join_all, BoxFuture},
FutureExt, StreamExt,
};
use lapin::{ use lapin::{
options::{ options::{
BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions, BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions,
@ -21,6 +24,16 @@ mod test;
pub struct BunBunWorker { pub struct BunBunWorker {
channel: Channel, channel: Channel,
rpc_consumers: Vec<Consumer>,
rpc_handlers: Vec<
Arc<
dyn Fn(
lapin::message::Delivery,
) -> Pin<Box<dyn std::future::Future<Output = ()> + Send>>
+ Send
+ Sync,
>,
>,
consumers: Vec<Consumer>, consumers: Vec<Consumer>,
handlers: Vec< handlers: Vec<
Arc< Arc<
@ -40,6 +53,9 @@ impl BunBunWorker {
channel, channel,
handlers: Vec::new(), handlers: Vec::new(),
consumers: Vec::new(), consumers: Vec::new(),
rpc_handlers: Vec::new(),
rpc_consumers: Vec::new(),
} }
} }
@ -59,7 +75,53 @@ impl BunBunWorker {
None => conn.create_channel().await.expect("create channel error"), None => conn.create_channel().await.expect("create channel error"),
} }
} }
pub async fn add_non_rpc_consumer<J: NonRPCServerTask + 'static + Send>(
&mut self,
queue_name: &str,
consumer_tag: &str,
state: Arc<Mutex<J::State>>,
) where
<J as NonRPCServerTask>::State: std::marker::Send,
{
let consumer = self
.channel
.basic_consume(
queue_name,
consumer_tag,
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.expect("basic_consume error");
let handler: Arc<
dyn Fn(
lapin::message::Delivery,
) -> Pin<Box<dyn std::future::Future<Output = ()> + Send>>
+ Send
+ Sync,
> = Arc::new(move |delivery: lapin::message::Delivery| {
let state = Arc::clone(&state);
Box::pin(async move {
if let Ok(job) = J::decode(delivery.data.clone()) {
// Running job
match tokio::task::spawn(async move { job.run(state).await }).await {
Err(error) => {
tracing::error!("Failed to run non-rpc job: {}", error)
}
Ok(_) => {
tracing::info!("Non-rpc job has finished.")
}
};
} else {
delivery.nack(BasicNackOptions::default()).await.unwrap();
}
})
});
self.handlers.push(handler);
self.consumers.push(consumer);
}
pub async fn add_rpc_consumer<J: RPCServerTask + 'static + Send>( pub async fn add_rpc_consumer<J: RPCServerTask + 'static + Send>(
&mut self, &mut self,
queue_name: &str, queue_name: &str,
@ -163,11 +225,12 @@ impl BunBunWorker {
}) })
}); });
self.handlers.push(handler); self.rpc_handlers.push(handler);
self.consumers.push(consumer); self.rpc_consumers.push(consumer);
} }
pub async fn start_all_listeners(&self) { pub async fn start_all_listeners(&self) {
let mut listeners = vec![];
for (handler, consumer) in self.handlers.iter().zip(self.consumers.iter()) { for (handler, consumer) in self.handlers.iter().zip(self.consumers.iter()) {
let consumer = consumer.clone(); let consumer = consumer.clone();
let handler = Arc::clone(handler); let handler = Arc::clone(handler);
@ -176,7 +239,7 @@ impl BunBunWorker {
"Listening for incoming messages for queue: {}", "Listening for incoming messages for queue: {}",
consumer.queue().as_str() consumer.queue().as_str()
); );
tokio::spawn(async move { listeners.push(tokio::spawn(async move {
consumer consumer
.for_each_concurrent(None, move |delivery| { .for_each_concurrent(None, move |delivery| {
let handler = Arc::clone(&handler); let handler = Arc::clone(&handler);
@ -187,11 +250,31 @@ impl BunBunWorker {
} }
}) })
.await; .await;
}); }));
} }
signal::ctrl_c().await.expect("failed to listen for event"); for (handler, consumer) in self.rpc_handlers.iter().zip(self.rpc_consumers.iter()) {
// TODO hand the program let consumer = consumer.clone();
let handler = Arc::clone(handler);
tracing::debug!(
"Listening for incoming messages for queue: {}",
consumer.queue().as_str()
);
listeners.push(tokio::spawn(async move {
consumer
.for_each_concurrent(None, move |delivery| {
let handler = Arc::clone(&handler);
// TODO handle unwrap of delivery
let delivery = delivery.unwrap();
async move {
handler(delivery).await;
}
})
.await;
}));
}
join_all(listeners).await;
} }
} }
@ -223,6 +306,29 @@ pub trait RPCServerTask: Sized + Debug + DeserializeOwned {
format!("{:?}", self) format!("{:?}", self)
} }
} }
pub trait NonRPCServerTask: Sized + Debug + DeserializeOwned {
type State: Clone + Debug;
fn decode(data: Vec<u8>) -> Result<Self, RabbitDecodeError> {
let job = match from_utf8(&data) {
Err(_) => {
return Err(RabbitDecodeError::NotUtf8);
}
Ok(data) => match serde_json::from_str::<Self>(data) {
Err(_) => return Err(RabbitDecodeError::NotJson),
Ok(data) => data,
},
};
Ok(job)
}
fn run(self, state: Arc<Mutex<Self::State>>) -> BoxFuture<'static, Result<(), ()>>;
/// A function to display the task
fn display(&self) -> String {
format!("{:?}", self)
}
}
#[derive(Debug)] #[derive(Debug)]
pub enum RabbitDecodeError { pub enum RabbitDecodeError {