BunBun-Worker

> ❗ Disclaimer > This crate is still under development, meaning api's may change on every commit... # Introduction `bunbun-worker` was made to provide a _panic-safe_ multithreaded job-runner server & client for microservices and alike. It supports [RPC](https://wikipedia.org/wiki/Remote_procedure_call) and regular (non-rpc) calls. As of right now only [rabbitmq](https://www.rabbitmq.com/) is supported but [gRPC](https://grpc.io/) will be added too. ### Rpc Remote procedure call, as it's name says is a message that can be sent to a remote microservice to be processed and the result to be returned. In `bunbun-worker` it's implemented by the following example: ```mermaid sequenceDiagram ServiceA->>+ServiceB: Hey, could you do this job for me? Note right of ServiceB: ServiceB does the job ServiceB->>+ServiceA: Sure, here is the data result ``` 1. ServiceA creates a callback queue that the response shall be sent to. 2. ServiceA sends a json job message to an **already declared** queue on a rabbitmq server. 3. ServiceB is listening on that queue for messages and spawns a new thread for each received. 4. Once ServiceB has finished the work, using the received messages header it responds to the callback queue with the correlation-id. ### Non-rpc In `bunbun-worker` regular jobs are called _non-rpc_ jobs, indicating that the response is not awaited. ## Installation Get directly from crates.io ```toml [dependencies] bunbun-worker = { version = "0.2.0" } ``` or get it directly from source ```toml [dependencies] bunbun-worker = { git = "https://git.4o1x5.dev/4o1x5/bunbun-worker", branch = "master" } ``` ## Usage This example uses [DTO](https://en.wikipedia.org/wiki/Data_transfer_object) as a way to transfer data between services. Add `futures` via `cargo add futures` ```rust // server.rs #[derive(Clone, Debug)] pub struct State { pub something: String, } #[derive(Deserialize, Serialize, Clone, Debug)] pub struct EmailJob { send_to: String, contents: String, } #[derive(Deserialize, Serialize, Clone, Debug, PartialEq)] pub struct EmailJobResult { contents: String, } impl RPCTask for EmailJob { type ErroredResult = EmailJobResultError; type Result = EmailJobResult; type State = State; fn run( self, state: Arc, ) -> futures::prelude::future::BoxFuture<'static, Result> { async move { tracing::info!("Sent email to {}", self.send_to); tokio::time::sleep(Duration::from_secs(2)).await; return Ok(EmailJobResult { contents: self.contents.clone(), }); } .boxed() } } #[tokio::main] async fn main() { let mut listener = Worker::new( env::var("AMQP_SERVER_URL").unwrap(), WorkerConfig::default(), ) .await; listener .add_rpc_consumer::( Arc::new(State { something: "test".into(), }), ListenerConfig::default("emailjob").set_prefetch_count(100), ); listener.start_all_listeners().await; } ``` ```rust // client.rs #[derive(Deserialize, Serialize, Clone, Debug)] pub struct EmailJob { send_to: String, contents: String, } #[derive(Deserialize, Serialize, Clone, Debug, PartialEq)] pub struct EmailJobResult { contents: String, } // Implement the client side trait, so the caller knows what the return types are impl RPCClientTask for EmailJob { type ErroredResult = EmailJobResultError; type Result = EmailJobResult; } #[tokio::main] async fn main() { let client = Client::new(env::var("AMQP_SERVER_URL").unwrap().as_str()) .await .unwrap(); let result = client .rpc_call::( EmailJob { send_to: "someone".into(), contents: "something".into(), }, BasicCallOptions::default("emailjob").timeout(Duration::from_secs(3)), ) .await .unwrap(); println!("{:?}",result); } ``` ### Message versioning In this crate message versioning is done by including `v1.0.0` or such on the end of the queue name, instead of including it in the headers of a message. This reduces the amount of redelivered messages. The following example will send a job to a queue named `emailjob-v1.0.0`. ```rust let result = client .rpc_call::( EmailJob { send_to: "someone".into(), contents: "something".into(), }, BasicCallOptions::default("emailjob") .timeout(Duration::from_secs(3)) .message_version("v1.0.0") ) .await .unwrap(); ``` # Limitations 1. Currently some `unwrap()`'s are called inside the code and may results in panics (not in the job-runner). 2. limited API # Bugs department Since the code is hosted on a private git instance (as of right now) any bugs shall be discussed in [4o1x5's project room](https://matrix.to/#/#projects:4o1x5.dev). ## License Licensed under [GNU AFFERO GENERAL PUBLIC LICENSE](https://www.gnu.org/licenses/agpl-3.0.en.html) ### Contribution Currently this library does not accept any contributors, as it's hosted on a private git server. # Credits This package was made with the help of- - [This readme template you are reading right now](https://github.com/webern/cargo-readme/blob/master/README.md) - [Lapin, an extensive easy to use rabbitmq client](https://crates.io/crates/lapin)