BunBun-Worker

A multithreaded _panic-safe_ job runner built on the AMQP. Bunbun-worker features RPC and nonRPC calls, and can handle multiple types of messages at a time. > ❗ Disclaimer > This crate is still under development, meaning api's may change on every commit... ## Installation Since this package is not published to crates.io you will need to add it manually to `Cargo.toml` ```toml [dependencies] bunbun-worker = { git = "https://git.4o1x5.dev/4o1x5/bunbun-worker", version = "0.12" } ``` or ```toml [dependencies] bunbun-worker = { git = "https://git.4o1x5.dev/4o1x5/bunbun-worker", branch = "main" } ``` ## Motivation I have a project called _Mediarose_ in development and I needed a way to have efficient and simple communication between services to kill of circular dependencies. ## Usage Here is a basic implementation of an RPC job in bunbun-worker ```rust // server // First let's create a state that will be used inside a job. // Imagine this holding a database connection, some context that may need to be changed. Anything really #[derive(Clone, Debug)] pub struct State { pub something: String, } /// Second, let's create a job, with field that can be serialized/deserialized into JSON /// This is what the server will receive from a client and will do the job based on these properties #[derive(Deserialize, Serialize, Clone, Debug)] pub struct EmailJob { send_to: String, contents: String, } // We also create a result for it, since it's an RPC job #[derive(Deserialize, Serialize, Clone, Debug, PartialEq)] pub struct EmailJobResult { contents: String, } // And an error type so know if the other end errored out, what to do. #[derive(Deserialize, Serialize, Clone, Debug)] pub enum EmailJobResultError { Errored, } /// After all this we implement a Jobrunner/Taskrunner to the type, so when the listener receives it, it can run this piece of code. impl RPCServerTask for EmailJob { type ErroredResult = EmailJobResultError; type Result = EmailJobResult; type State = State; fn run( self, state: Arc>, ) -> futures::prelude::future::BoxFuture<'static, Result> { Box::pin(async move { tracing::info!("Sent email to {}", self.send_to); tokio::time::sleep(Duration::from_secs(2)).await; return Ok(EmailJobResult { contents: self.contents.clone(), }); }) } } // Finally, we can define an async main function to run the listener in. #[tokio::main] async fn main(){ // Define a listener, with a hard-limit of 100 jobs at once. let mut listener = BunBunWorker::new(env::var("AMQP_SERVER_URL").unwrap(), 100.into()).await; // Add the defined sturct to the worker listener .add_rpc_consumer::( "email-emailjob-v1.0.0", // queue name "emailjob", // consumer tag Arc::new(Mutex::new(State { something: "test".into(), // putting our state into a Arc> for thread safety })), ) .await; tracing::debug!("Starting listener"); // Starting the listener listener.start_all_listeners().await; } ``` ```rust // client // Define the same structs we did. These are DTO's after all.. #[derive(Deserialize, Serialize, Clone, Debug)] pub struct EmailJob { send_to: String, contents: String, } #[derive(Deserialize, Serialize, Clone, Debug, PartialEq)] pub struct EmailJobResult { contents: String, } #[derive(Deserialize, Serialize, Clone, Debug)] pub enum EmailJobResultError { Errored, } // Now we implement the clientside task for it. This reduces generics when defining the calling.. impl RPCClientTask for EmailJob { type ErroredResult = EmailJobResultError; type Result = EmailJobResult; } #[tokio::main] async fn main(){ // Define a client let mut client = BunBunClient::new(env::var("AMQP_SERVER_URL").unwrap().as_str()) .await .unwrap(); // Make a call let result = client .rpc_call::( // Define the job EmailJob { send_to: "someone".into(), contents: "something".into(), }, "email-emailjob-v1.0.0", // the queue name ) .await .unwrap(); } ``` ## License Licensed under [GNU AFFERO GENERAL PUBLIC LICENSE](https://www.gnu.org/licenses/agpl-3.0.en.html) ### Contribution Currently this library does not accept any contributors, as it's hosted on a private registry and git server. # Credits This package was made with the help of- - [This readme template you are reading right now](https://github.com/webern/cargo-readme/blob/master/README.md) - [Lapin, an extensive easy to use rabbitmq client](https://crates.io/crates/lapin)