diff --git a/README.md b/README.md index a2ac7b3..46c9c89 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,11 @@

BunBun-Worker

-`bunbun-worker` is a panic-safe multithreaded job-runner server/client library for rust. It supports [RPC](https://wikipedia.org/wiki/Remote_procedure_call) and regular (non-rpc) calls. As of right now only [rabbitmq](https://www.rabbitmq.com/) is supported but [gRPC](https://grpc.io/) will be added too. - > ❗ Disclaimer > This crate is still under development, meaning api's may change on every commit... # Introduction -`bunbun-worker` was made to provide a way for microservices written in rust to communicate to each other by dispatching jobs that may return data and those who don't. +`bunbun-worker` was made to provide a _panic-safe_ multithreaded job-runner server & client for microservices and alike. It supports [RPC](https://wikipedia.org/wiki/Remote_procedure_call) and regular (non-rpc) calls. As of right now only [rabbitmq](https://www.rabbitmq.com/) is supported but [gRPC](https://grpc.io/) will be added too. ### Rpc @@ -35,7 +33,7 @@ Get directly from crates.io ```toml [dependencies] -bunbun-worker = { version = "0.1.1" } +bunbun-worker = { version = "0.2.0" } ``` or get it directly from source @@ -47,6 +45,101 @@ bunbun-worker = { git = "https://git.4o1x5.dev/4o1x5/bunbun-worker", branch = "m ## Usage +This example uses [DTO](https://en.wikipedia.org/wiki/Data_transfer_object) as a way to transfer data between services. +Add `futures` via `cargo add futures` + +```rust +// server.rs +#[derive(Clone, Debug)] +pub struct State { + pub something: String, +} + +#[derive(Deserialize, Serialize, Clone, Debug)] +pub struct EmailJob { + send_to: String, + contents: String, +} +#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)] +pub struct EmailJobResult { + contents: String, +} +impl RPCTask for EmailJob { + type ErroredResult = EmailJobResultError; + type Result = EmailJobResult; + type State = State; + fn run( + self, + state: Arc, + ) -> futures::prelude::future::BoxFuture<'static, Result> + { + async move { + tracing::info!("Sent email to {}", self.send_to); + tokio::time::sleep(Duration::from_secs(2)).await; + return Ok(EmailJobResult { + contents: self.contents.clone(), + }); + } + .boxed() + } +} + +#[tokio::main] +async fn main() { + let mut listener = Worker::new( + env::var("AMQP_SERVER_URL").unwrap(), + WorkerConfig::default(), + ) + .await; + listener + .add_rpc_consumer::( + Arc::new(State { + something: "test".into(), + }), + ListenerConfig::default("emailjob").set_prefetch_count(100), + ); + listener.start_all_listeners().await; +} +``` + +```rust +// client.rs +#[derive(Deserialize, Serialize, Clone, Debug)] +pub struct EmailJob { + send_to: String, + contents: String, +} +#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)] +pub struct EmailJobResult { + contents: String, +} + +// Implement the client side trait, so the caller knows what the return types are +impl RPCClientTask for EmailJob { + type ErroredResult = EmailJobResultError; + type Result = EmailJobResult; +} + + +#[tokio::main] +async fn main() { + let client = Client::new(env::var("AMQP_SERVER_URL").unwrap().as_str()) + .await + .unwrap(); + let result = client + .rpc_call::( + EmailJob { + send_to: "someone".into(), + contents: "something".into(), + }, + BasicCallOptions::default("emailjob").timeout(Duration::from_secs(3)), + ) + .await + .unwrap(); + println!("{:?}",result); +} +``` + ### Message versioning In this crate message versioning is done by including `v1.0.0` or such on the end of the queue name, instead of including it in the headers of a message. This reduces the amount of redelivered messages. diff --git a/src/lib.rs b/src/lib.rs index e581e21..97f9d76 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -270,7 +270,7 @@ impl Worker { /// server.add_rpc_consumer::(ListenerConfig::default("service-jobname").set_message_version("v2.0.0") )); /// server.start_all_listeners().await; /// ``` - pub async fn add_rpc_consumer( + pub fn add_rpc_consumer( &mut self, state: Arc, listener_config: ListenerConfig, @@ -489,7 +489,7 @@ impl Worker { } /// A trait that defines the structure of a task that can be run by the worker -/// BoxFuture is from a crate called `futures_util` +/// BoxFuture is from a crate called `futures` /// /// # Examples /// ``` diff --git a/src/test/mod.rs b/src/test/mod.rs index 3f01bec..2768865 100644 --- a/src/test/mod.rs +++ b/src/test/mod.rs @@ -107,14 +107,12 @@ mod test { WorkerConfig::default(), ) .await; - listener - .add_rpc_consumer::( - Arc::new(State { - something: "test".into(), - }), - ListenerConfig::default("emailjob").set_prefetch_count(100), - ) - .await; + listener.add_rpc_consumer::( + Arc::new(State { + something: "test".into(), + }), + ListenerConfig::default("emailjob").set_prefetch_count(100), + ); tracing::debug!("Starting listener"); listener.start_all_listeners().await; signal::ctrl_c().await.expect("failed to listen for event"); @@ -128,14 +126,12 @@ mod test { WorkerConfig::default(), ) .await; - listener - .add_rpc_consumer::( - Arc::new(State { - something: "test".into(), - }), - ListenerConfig::default("emailjob").set_prefetch_count(100), - ) - .await; + listener.add_rpc_consumer::( + Arc::new(State { + something: "test".into(), + }), + ListenerConfig::default("emailjob").set_prefetch_count(100), + ); tracing::debug!("Starting listener"); let out = timeout(Duration::from_secs(5), listener.start_all_listeners()).await; }