From f6c2998d64eb033880db0659eef0ca0cd5c0b0d5 Mon Sep 17 00:00:00 2001 From: 4o1x5 <4o1x5@4o1x5.dev> Date: Sat, 26 Oct 2024 00:44:02 +0200 Subject: [PATCH] lib.rs removed mutexguard as it can block parallel workers --- src/client.rs | 4 ++-- src/lib.rs | 16 ++++++++-------- src/test/mod.rs | 12 ++++++------ 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/client.rs b/src/client.rs index 16b72be..662233a 100644 --- a/src/client.rs +++ b/src/client.rs @@ -39,7 +39,7 @@ impl BunBunClient { } pub async fn rpc_call( - &mut self, + &self, data: T, queue_name: &str, ) -> Result> @@ -218,7 +218,7 @@ impl BunBunClient { /// let client = Client::new("amqp://127.0.0.1:5672"); /// let result = client.call(EmailJob::new("someone@example.com", "Hello there"), "email_channel"); /// ``` - pub async fn call(&mut self, data: T, queue_name: &str) -> Result<(), ClientError> + pub async fn call(&self, data: T, queue_name: &str) -> Result<(), ClientError> where T: Serialize + DeserializeOwned, { diff --git a/src/lib.rs b/src/lib.rs index 18e7f14..216367c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -78,9 +78,9 @@ impl BunBunWorker { pub async fn add_non_rpc_consumer( &mut self, queue_name: &str, - state: Arc>, + state: Arc, ) where - ::State: std::marker::Send, + ::State: std::marker::Send + Sync, { let consumer = self .channel @@ -124,11 +124,11 @@ impl BunBunWorker { pub async fn add_rpc_consumer( &mut self, queue_name: &str, - state: Arc>, + state: Arc, ) where - ::State: std::marker::Send, - ::Result: std::marker::Send, - ::ErroredResult: std::marker::Send, + ::State: std::marker::Send + Sync, + ::Result: std::marker::Send + Sync, + ::ErroredResult: std::marker::Send + Sync, { let consumer = self .channel @@ -306,7 +306,7 @@ pub trait RPCServerTask: Sized + Debug + DeserializeOwned { fn run( self, - state: Arc>, + state: Arc, ) -> BoxFuture<'static, Result>; /// A function to display the task @@ -330,7 +330,7 @@ pub trait NonRPCServerTask: Sized + Debug + DeserializeOwned { Ok(job) } - fn run(self, state: Arc>) -> BoxFuture<'static, Result<(), ()>>; + fn run(self, state: Arc) -> BoxFuture<'static, Result<(), ()>>; /// A function to display the task fn display(&self) -> String { diff --git a/src/test/mod.rs b/src/test/mod.rs index 419a8ac..e95e76a 100644 --- a/src/test/mod.rs +++ b/src/test/mod.rs @@ -49,7 +49,7 @@ mod test { type State = State; fn run( self, - state: Arc>, + state: Arc, ) -> futures::prelude::future::BoxFuture<'static, Result> { Box::pin(async move { @@ -67,7 +67,7 @@ mod test { type State = State; fn run( self, - state: Arc>, + state: Arc, ) -> futures::prelude::future::BoxFuture<'static, Result> { Box::pin(async move { panic!("Ooops..") }) @@ -88,9 +88,9 @@ mod test { listener .add_rpc_consumer::( "email-emailjob-v1.0.0", - Arc::new(Mutex::new(State { + Arc::new(State { something: "test".into(), - })), + }), ) .await; tracing::debug!("Starting listener"); @@ -106,9 +106,9 @@ mod test { listener .add_rpc_consumer::( "email-emailjob-v1.0.0", - Arc::new(Mutex::new(State { + Arc::new(State { something: "test".into(), - })), + }), ) .await; tracing::debug!("Starting listener");