lib.rs removed mutexguard as it can block parallel workers

This commit is contained in:
2005 2024-10-26 00:44:02 +02:00
parent d0eba8fa34
commit f6c2998d64
3 changed files with 16 additions and 16 deletions

View file

@ -39,7 +39,7 @@ impl BunBunClient {
} }
pub async fn rpc_call<T: RPCClientTask + Send + Debug>( pub async fn rpc_call<T: RPCClientTask + Send + Debug>(
&mut self, &self,
data: T, data: T,
queue_name: &str, queue_name: &str,
) -> Result<T::Result, RpcClientError<T::ErroredResult>> ) -> Result<T::Result, RpcClientError<T::ErroredResult>>
@ -218,7 +218,7 @@ impl BunBunClient {
/// let client = Client::new("amqp://127.0.0.1:5672"); /// let client = Client::new("amqp://127.0.0.1:5672");
/// let result = client.call(EmailJob::new("someone@example.com", "Hello there"), "email_channel"); /// let result = client.call(EmailJob::new("someone@example.com", "Hello there"), "email_channel");
/// ``` /// ```
pub async fn call<T>(&mut self, data: T, queue_name: &str) -> Result<(), ClientError> pub async fn call<T>(&self, data: T, queue_name: &str) -> Result<(), ClientError>
where where
T: Serialize + DeserializeOwned, T: Serialize + DeserializeOwned,
{ {

View file

@ -78,9 +78,9 @@ impl BunBunWorker {
pub async fn add_non_rpc_consumer<J: NonRPCServerTask + 'static + Send>( pub async fn add_non_rpc_consumer<J: NonRPCServerTask + 'static + Send>(
&mut self, &mut self,
queue_name: &str, queue_name: &str,
state: Arc<Mutex<J::State>>, state: Arc<J::State>,
) where ) where
<J as NonRPCServerTask>::State: std::marker::Send, <J as NonRPCServerTask>::State: std::marker::Send + Sync,
{ {
let consumer = self let consumer = self
.channel .channel
@ -124,11 +124,11 @@ impl BunBunWorker {
pub async fn add_rpc_consumer<J: RPCServerTask + 'static + Send>( pub async fn add_rpc_consumer<J: RPCServerTask + 'static + Send>(
&mut self, &mut self,
queue_name: &str, queue_name: &str,
state: Arc<Mutex<J::State>>, state: Arc<J::State>,
) where ) where
<J as RPCServerTask>::State: std::marker::Send, <J as RPCServerTask>::State: std::marker::Send + Sync,
<J as RPCServerTask>::Result: std::marker::Send, <J as RPCServerTask>::Result: std::marker::Send + Sync,
<J as RPCServerTask>::ErroredResult: std::marker::Send, <J as RPCServerTask>::ErroredResult: std::marker::Send + Sync,
{ {
let consumer = self let consumer = self
.channel .channel
@ -306,7 +306,7 @@ pub trait RPCServerTask: Sized + Debug + DeserializeOwned {
fn run( fn run(
self, self,
state: Arc<Mutex<Self::State>>, state: Arc<Self::State>,
) -> BoxFuture<'static, Result<Self::Result, Self::ErroredResult>>; ) -> BoxFuture<'static, Result<Self::Result, Self::ErroredResult>>;
/// A function to display the task /// A function to display the task
@ -330,7 +330,7 @@ pub trait NonRPCServerTask: Sized + Debug + DeserializeOwned {
Ok(job) Ok(job)
} }
fn run(self, state: Arc<Mutex<Self::State>>) -> BoxFuture<'static, Result<(), ()>>; fn run(self, state: Arc<Self::State>) -> BoxFuture<'static, Result<(), ()>>;
/// A function to display the task /// A function to display the task
fn display(&self) -> String { fn display(&self) -> String {

View file

@ -49,7 +49,7 @@ mod test {
type State = State; type State = State;
fn run( fn run(
self, self,
state: Arc<Mutex<Self::State>>, state: Arc<Self::State>,
) -> futures::prelude::future::BoxFuture<'static, Result<Self::Result, Self::ErroredResult>> ) -> futures::prelude::future::BoxFuture<'static, Result<Self::Result, Self::ErroredResult>>
{ {
Box::pin(async move { Box::pin(async move {
@ -67,7 +67,7 @@ mod test {
type State = State; type State = State;
fn run( fn run(
self, self,
state: Arc<Mutex<Self::State>>, state: Arc<Self::State>,
) -> futures::prelude::future::BoxFuture<'static, Result<Self::Result, Self::ErroredResult>> ) -> futures::prelude::future::BoxFuture<'static, Result<Self::Result, Self::ErroredResult>>
{ {
Box::pin(async move { panic!("Ooops..") }) Box::pin(async move { panic!("Ooops..") })
@ -88,9 +88,9 @@ mod test {
listener listener
.add_rpc_consumer::<EmailJob>( .add_rpc_consumer::<EmailJob>(
"email-emailjob-v1.0.0", "email-emailjob-v1.0.0",
Arc::new(Mutex::new(State { Arc::new(State {
something: "test".into(), something: "test".into(),
})), }),
) )
.await; .await;
tracing::debug!("Starting listener"); tracing::debug!("Starting listener");
@ -106,9 +106,9 @@ mod test {
listener listener
.add_rpc_consumer::<PanickingEmailJob>( .add_rpc_consumer::<PanickingEmailJob>(
"email-emailjob-v1.0.0", "email-emailjob-v1.0.0",
Arc::new(Mutex::new(State { Arc::new(State {
something: "test".into(), something: "test".into(),
})), }),
) )
.await; .await;
tracing::debug!("Starting listener"); tracing::debug!("Starting listener");