diff --git a/Cargo.toml b/Cargo.toml index 4856178..fe9a797 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bunbun-worker" -version = "0.1.0" +version = "0.1.1" description = "An rpc/non-rpc rabbitmq worker library" edition = "2021" license = "AGPL-3.0" diff --git a/README.md b/README.md index bfb43c6..aefa50a 100644 --- a/README.md +++ b/README.md @@ -118,6 +118,12 @@ async fn main(){ } ``` +Instal `futures_util` for the client + +``` +cargo add futures_util +``` + ```rust // client diff --git a/src/client.rs b/src/client.rs index b42d27b..88a9684 100644 --- a/src/client.rs +++ b/src/client.rs @@ -14,14 +14,15 @@ use uuid::Uuid; use crate::ResultHeader; -/// An remote procedure client for rabbitmq (AMQP) -/// With this client you can send messages to a queue that other microservices -/// will process +/// A client for the server part of `bunbun-worker` #[derive(Debug)] pub struct BunBunClient { conn: Connection, } +// TODO implement reconnect +// TODO implement tls + impl BunBunClient { /// Creates an rpc client /// @@ -29,8 +30,8 @@ impl BunBunClient { /// /// ``` /// // Create a client and send a message - /// use jobhandler::client::RpcClient; - /// let client = RpcClient::new("amqp://127.0.0.1:5672", "email_rpc"); + /// use bunbun_worker::client::BunBunClient; + /// let client = BunBunClient::new("amqp://127.0.0.1:5672"); /// ``` pub async fn new(address: &str) -> Result { let conn = Connection::connect(address, ConnectionProperties::default()).await?; @@ -42,7 +43,7 @@ impl BunBunClient { &self, data: T, queue_name: &str, - ) -> Result> + ) -> Result, RpcClientError> where T: Serialize + DeserializeOwned, { @@ -141,7 +142,7 @@ impl BunBunClient { }; }; - // What the fuck is this + // TODO better implementation of this tracing::debug!("Decoding headers"); let result_type = match del.properties.headers().to_owned() { None => { @@ -175,7 +176,7 @@ impl BunBunClient { }, }, }; - tracing::debug!("Result type is: {result_type}, decoding acorrdingly"); + tracing::debug!("Result type is: {result_type}, decoding..."); let utf8 = match from_utf8(&del.data) { Ok(r) => r, Err(error) => { @@ -193,7 +194,7 @@ impl BunBunClient { tracing::error!("Failed to decode response message to E"); return Err(RpcClientError::FailedDecode); } - Ok(res) => return Err(RpcClientError::ServerErrored(res)), + Ok(res) => return Ok(Err(res)), }, ResultHeader::Panic => return Err(RpcClientError::ServerPanicked), ResultHeader::Ok => @@ -204,7 +205,7 @@ impl BunBunClient { tracing::error!("Failed to decode response message to R"); return Err(RpcClientError::FailedDecode); } - Ok(res) => return Ok(res), + Ok(res) => return Ok(Ok(res)), } } } @@ -215,9 +216,9 @@ impl BunBunClient { /// # Examples /// /// ``` - /// use jobhandler::client::Client; + /// use bunbun_worker::client::Client; /// let client = Client::new("amqp://127.0.0.1:5672"); - /// let result = client.call(EmailJob::new("someone@example.com", "Hello there"), "email_channel"); + /// let result = client.call(EmailJob::new("someone@example.com", "Hello there"), "email-emailjob-v1.0.0"); /// ``` pub async fn call(&self, data: T, queue_name: &str) -> Result<(), ClientError> where @@ -264,20 +265,21 @@ impl BunBunClient { } } +/// An error that the bunbunclient returns #[derive(Debug)] -pub enum RpcClientError { - NoReply, +pub enum RpcClientError { + NoReply, // TODO timeout FailedDecode, FailedToSend, InvalidResponse, - ServerErrored(E), ServerPanicked, } - +/// An error for normal calls #[derive(Debug)] pub enum ClientError { FailedToSend, } + impl Display for ClientError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -298,7 +300,7 @@ pub trait RPCClientTask: Sized + Debug + DeserializeOwned { fn create_timeout_headers() -> FieldTable { let mut table = FieldTable::default(); - // 60 second expiry + // 60 second expiry, will not start counting down, only once there are no consumers on the channel table.insert("x-expires".into(), lapin::types::AMQPValue::LongInt(6_000)); table } diff --git a/src/lib.rs b/src/lib.rs index dc12b11..b197f24 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,14 +1,13 @@ use futures::{ future::{join_all, BoxFuture}, - FutureExt, StreamExt, + StreamExt, }; use lapin::{ options::{ BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions, BasicQosOptions, }, - protocol::basic::Consume, - types::{DeliveryTag, FieldTable, LongString, ShortString}, + types::{DeliveryTag, FieldTable, ShortString}, BasicProperties, Channel, Connection, ConnectionProperties, Consumer, }; use serde::{de::DeserializeOwned, Deserialize, Serialize}; @@ -18,12 +17,15 @@ use std::{ str::from_utf8, sync::Arc, }; -use tokio::{signal, sync::Mutex}; + +/// The client module that interacts with the server part of `bunbun-worker` pub mod client; + mod test; pub struct BunBunWorker { channel: Channel, + /// A consumer for each rpc handler rpc_consumers: Vec, rpc_handlers: Vec< Arc< @@ -34,6 +36,7 @@ pub struct BunBunWorker { + Sync, >, >, + /// A consumer for each non-rpc handler consumers: Vec, handlers: Vec< Arc< @@ -46,7 +49,13 @@ pub struct BunBunWorker { >, } +// TODO implement reconnect +// TODO implement tls impl BunBunWorker { + /// Create a new instance of `bunbun-worker` + /// # Arguments + /// * `amqp_server_url` - A string slice that holds the url of the amqp server (e.g. amqp://localhost:5672) + /// * `limit` - An optional u16 that holds the limit of the number of messages to prefetch 0 by default pub async fn new(amqp_server_url: impl Into, limit: Option) -> Self { let channel = Self::create_channel(amqp_server_url.into(), limit).await; BunBunWorker { @@ -75,6 +84,11 @@ impl BunBunWorker { None => conn.create_channel().await.expect("create channel error"), } } + /// Add a non-rpc listener to the worker object + /// + /// # Arguments + /// * `queue_name` - A string slice that holds the name of the queue to listen to (e.g. service-serviceJobName-v1.0.0) + /// * `state` - An Arc of the state object that will be passed to the listener pub async fn add_non_rpc_consumer( &mut self, queue_name: &str, @@ -126,6 +140,20 @@ impl BunBunWorker { self.handlers.push(handler); self.consumers.push(consumer); } + /// Add an rpc job listener to the worker object + /// + /// # Arguments + /// * `queue_name` - A string slice that holds the name of the queue to listen to (e.g. service-serviceJobName-v1.0.0) + /// * `state` - An Arc of the state object that will be passed to the listener + /// + /// # Examples + /// + /// ``` + /// + /// let server = BunBunWorker::new("amqp://localhost:5672", None).await; + /// server.add_rpc_consumer::("service-serviceJobName-v1.0.0", SomeState{} )).await; + /// server.start_all_listeners().await; + /// ``` pub async fn add_rpc_consumer( &mut self, queue_name: &str, @@ -231,14 +259,16 @@ impl BunBunWorker { self.rpc_consumers.push(consumer); } + /// Start all the listeners added to the worker object + pub async fn start_all_listeners(&self) { let mut listeners = vec![]; for (handler, consumer) in self.handlers.iter().zip(self.consumers.iter()) { let consumer = consumer.clone(); let handler = Arc::clone(handler); - tracing::debug!( - "Listening for incoming messages for queue: {}", + tracing::info!( + "Started listening for incoming messages on queue: {} | Non-rpc", consumer.queue().as_str() ); listeners.push(tokio::spawn(async move { @@ -265,7 +295,7 @@ impl BunBunWorker { let handler = Arc::clone(handler); tracing::debug!( - "Listening for incoming messages for queue: {}", + "Started listening for incoming messages on queue: {} | RPC", consumer.queue().as_str() ); listeners.push(tokio::spawn(async move { @@ -290,11 +320,31 @@ impl BunBunWorker { } } +/// A trait that defines the structure of a task that can be run by the worker +/// The task must be deserializable and serializable +/// The task must have a result and an errored result +/// # Examples +/// ``` +/// #[derive(Debug, Serialize, Deserialize)] +/// struct MyRPCServerTask { +/// pub name: String, +/// } +/// impl RPCServerTask for MyRPCServerTask { +/// type Result = String; +/// type ErroredResult = String; +/// type State = SomeState; +/// +/// fn run(self, state: Arc) -> BoxFuture<'static, Result> { +/// async move { +/// Ok("Hello".to_string()) +/// }.boxed() +/// } pub trait RPCServerTask: Sized + Debug + DeserializeOwned { type Result: Serialize + DeserializeOwned + Debug; type ErroredResult: Serialize + DeserializeOwned + Debug; type State: Clone + Debug; + /// Decoding for the message. Overriding is possible. fn decode(data: Vec) -> Result { let job = match from_utf8(&data) { Err(_) => { @@ -308,6 +358,7 @@ pub trait RPCServerTask: Sized + Debug + DeserializeOwned { Ok(job) } + /// The function that will run once a message is received fn run( self, state: Arc, @@ -317,7 +368,35 @@ pub trait RPCServerTask: Sized + Debug + DeserializeOwned { fn display(&self) -> String { format!("{:?}", self) } + // TODO add a function that runs after a job is ran + // TODO add a function that runs before a job is about to run } + +/// A NonRPCServer task +/// Implement this trait to any struct to make it a runnable `non-rpc` job. +/// +/// Examples +/// +/// ``` +/// +/// #[derive(Deserialize, Serialize, Clone, Debug)] +/// pub struct EmailJob { +/// send_to: String, +/// contents: String, +/// } +/// impl NonRPCServerTask for EmailJob { +/// type State = State; +/// fn run( +/// self, +/// state: Self::State, +/// ) -> futures::prelude::future::BoxFuture<'static, Result<(), ()>> +/// { +/// Box::pin(async move { +/// todo!(); +/// }) +/// } +/// } +/// ``` pub trait NonRPCServerTask: Sized + Debug + DeserializeOwned { type State: Clone + Debug; @@ -343,9 +422,13 @@ pub trait NonRPCServerTask: Sized + Debug + DeserializeOwned { fn display(&self) -> String { format!("{:?}", self) } + + // TODO add a function that runs after a job is ran + // TODO add a function that runs before a job is about to run } #[derive(Debug)] +/// A decode error pub enum RabbitDecodeError { NotJson, InvalidField, @@ -415,6 +498,8 @@ fn create_header(header: ResultHeader) -> FieldTable { ); headers } +/// A result header that is included in the header of the AMQP message. +/// It indicates the status of the returned message #[derive(Debug, Serialize, Deserialize)] pub enum ResultHeader { Ok, diff --git a/src/test/mod.rs b/src/test/mod.rs index e95e76a..56ad479 100644 --- a/src/test/mod.rs +++ b/src/test/mod.rs @@ -33,7 +33,7 @@ mod test { pub struct EmailJobResult { contents: String, } - #[derive(Deserialize, Serialize, Clone, Debug)] + #[derive(Deserialize, Serialize, Clone, Debug, PartialEq)] pub enum EmailJobResultError { Errored, } @@ -134,17 +134,16 @@ mod test { .unwrap(); assert_eq!( result, - EmailJobResult { + Ok(EmailJobResult { contents: "something".to_string() - } + }) ) } #[test(tokio::test)] #[traced_test] async fn rpc_client_spam_multithread() { - // - let mut client = BunBunClient::new(env::var("AMQP_SERVER_URL").unwrap().as_str()) + let client = BunBunClient::new(env::var("AMQP_SERVER_URL").unwrap().as_str()) .await .unwrap(); let client = Arc::new(Mutex::new(client));