BunBun-Worker

`bunbun-worker` is a panic-safe multithreaded job-runner server/client library for rust. It supports [RPC](https://wikipedia.org/wiki/Remote_procedure_call) and regular (non-rpc) calls. As of right now only [rabbitmq](https://www.rabbitmq.com/) is supported but [gRPC](https://grpc.io/) will be added too. > ❗ Disclaimer > This crate is still under development, meaning api's may change on every commit... # Introduction `bunbun-worker` was made to provide a way for microservices written in rust to communicate to each other by dispatching jobs that may return data and those who don't. ### Rpc Remote procedure call, as it's name says is a message that can be sent to a remote microservice to be processed and the result to be returned. In `bunbun-worker` it's implemented by the following example: ```mermaid sequenceDiagram ServiceA->>+ServiceB: Hey, could you do this job for me? Note right of ServiceB: ServiceB does the job ServiceB->>+ServiceA: Sure, here is the data result ``` 1. ServiceA creates a callback queue that the response shall be sent to. 2. ServiceA sends a json job message to an **already declared** queue on a rabbitmq server. 3. ServiceB is listening on that queue for messages and spawns a new thread for each received. 4. Once ServiceB has finished the work, using the received messages header it responds to the callback queue with the correlation-id. ### Non-rpc In `bunbun-worker` regular jobs are called _non-rpc_ jobs, indicating that the response is not awaited. ## Installation Get directly from crates.io ```toml [dependencies] bunbun-worker = { version = "0.1.0" } ``` or get it directly from source ```toml [dependencies] bunbun-worker = { git = "https://git.4o1x5.dev/4o1x5/bunbun-worker", branch = "master" } ``` ## Usage Here is a basic implementation of an RPC job in bunbun-worker ```rust // server // First let's create a state that will be used inside a job. // Imagine this holding a database connection, some context that may need to be changed. Anything really #[derive(Clone, Debug)] pub struct State { pub something: String, } /// Second, let's create a job, with field that can be serialized/deserialized into JSON /// This is what the server will receive from a client and will do the job based on these properties #[derive(Deserialize, Serialize, Clone, Debug)] pub struct EmailJob { send_to: String, contents: String, } // We also create a result for it, since it's an RPC job #[derive(Deserialize, Serialize, Clone, Debug, PartialEq)] pub struct EmailJobResult { contents: String, } // And an error type so know if the other end errored out, what to do. #[derive(Deserialize, Serialize, Clone, Debug)] pub enum EmailJobResultError { Errored, } /// After all this we implement a Jobrunner/Taskrunner to the type, so when the listener receives it, it can run this piece of code. impl RPCServerTask for EmailJob { type ErroredResult = EmailJobResultError; type Result = EmailJobResult; type State = State; fn run( self, state: Self::State, ) -> futures::prelude::future::BoxFuture<'static, Result> { Box::pin(async move { tracing::info!("Sent email to {}", self.send_to); tokio::time::sleep(Duration::from_secs(2)).await; return Ok(EmailJobResult { contents: self.contents.clone(), }); }) } } // Finally, we can define an async main function to run the listener in. #[tokio::main] async fn main(){ // Define a listener, with a hard-limit of 100 jobs at once. let mut listener = BunBunWorker::new(env::var("AMQP_SERVER_URL").unwrap(), 100.into()).await; // Add the defined sturct to the worker listener .add_rpc_consumer::( "email-emailjob-v1.0.0", // queue name "emailjob", // consumer tag State { something: "test".into(), // putting our state into a Arc> for thread safety }, ) .await; tracing::debug!("Starting listener"); // Starting the listener listener.start_all_listeners().await; } ``` Instal `futures_util` for the client ``` cargo add futures_util ``` ```rust // client // Define the same structs we did. These are DTO's after all.. #[derive(Deserialize, Serialize, Clone, Debug)] pub struct EmailJob { send_to: String, contents: String, } #[derive(Deserialize, Serialize, Clone, Debug, PartialEq)] pub struct EmailJobResult { contents: String, } #[derive(Deserialize, Serialize, Clone, Debug)] pub enum EmailJobResultError { Errored, } // Now we implement the clientside task for it. This reduces generics when defining the calling.. impl RPCClientTask for EmailJob { type ErroredResult = EmailJobResultError; type Result = EmailJobResult; } #[tokio::main] async fn main(){ // Define a client let mut client = BunBunClient::new(env::var("AMQP_SERVER_URL").unwrap().as_str()) .await .unwrap(); // Make a call let result = client .rpc_call::( // Define the job EmailJob { send_to: "someone".into(), contents: "something".into(), }, "email-emailjob-v1.0.0", // the queue name ) .await .unwrap(); } ``` # Limitations 1. Currently some `unwrap()`'s are called inside the code and may results in panics (not in the job-runner). 2. No TLS support 3. No settings, and very limited API 4. The rabbitmq RPC logic is very basic with no message-versioning (aside using different queue names (see [usage](#usage)) ) # Bugs department Since the code is hosted on a private git instance (as of right now) any bugs shall be discussed in [4o1x5's project room](https://matrix.to/#/#projects:4o1x5.dev). ## License Licensed under [GNU AFFERO GENERAL PUBLIC LICENSE](https://www.gnu.org/licenses/agpl-3.0.en.html) ### Contribution Currently this library does not accept any contributors, as it's hosted on a private git server. # Credits This package was made with the help of- - [This readme template you are reading right now](https://github.com/webern/cargo-readme/blob/master/README.md) - [Lapin, an extensive easy to use rabbitmq client](https://crates.io/crates/lapin)