docs: added some examples

api: small changes, now returning result result instead of wrapping it in an resulted enum, easier to check
This commit is contained in:
2005 2024-11-13 21:35:14 +01:00
parent db8a40db68
commit 4da2a61046
5 changed files with 122 additions and 30 deletions

View file

@ -1,6 +1,6 @@
[package] [package]
name = "bunbun-worker" name = "bunbun-worker"
version = "0.1.0" version = "0.1.1"
description = "An rpc/non-rpc rabbitmq worker library" description = "An rpc/non-rpc rabbitmq worker library"
edition = "2021" edition = "2021"
license = "AGPL-3.0" license = "AGPL-3.0"

View file

@ -118,6 +118,12 @@ async fn main(){
} }
``` ```
Instal `futures_util` for the client
```
cargo add futures_util
```
```rust ```rust
// client // client

View file

@ -14,14 +14,15 @@ use uuid::Uuid;
use crate::ResultHeader; use crate::ResultHeader;
/// An remote procedure client for rabbitmq (AMQP) /// A client for the server part of `bunbun-worker`
/// With this client you can send messages to a queue that other microservices
/// will process
#[derive(Debug)] #[derive(Debug)]
pub struct BunBunClient { pub struct BunBunClient {
conn: Connection, conn: Connection,
} }
// TODO implement reconnect
// TODO implement tls
impl BunBunClient { impl BunBunClient {
/// Creates an rpc client /// Creates an rpc client
/// ///
@ -29,8 +30,8 @@ impl BunBunClient {
/// ///
/// ``` /// ```
/// // Create a client and send a message /// // Create a client and send a message
/// use jobhandler::client::RpcClient; /// use bunbun_worker::client::BunBunClient;
/// let client = RpcClient::new("amqp://127.0.0.1:5672", "email_rpc"); /// let client = BunBunClient::new("amqp://127.0.0.1:5672");
/// ``` /// ```
pub async fn new(address: &str) -> Result<Self, lapin::Error> { pub async fn new(address: &str) -> Result<Self, lapin::Error> {
let conn = Connection::connect(address, ConnectionProperties::default()).await?; let conn = Connection::connect(address, ConnectionProperties::default()).await?;
@ -42,7 +43,7 @@ impl BunBunClient {
&self, &self,
data: T, data: T,
queue_name: &str, queue_name: &str,
) -> Result<T::Result, RpcClientError<T::ErroredResult>> ) -> Result<Result<T::Result, T::ErroredResult>, RpcClientError>
where where
T: Serialize + DeserializeOwned, T: Serialize + DeserializeOwned,
{ {
@ -141,7 +142,7 @@ impl BunBunClient {
}; };
}; };
// What the fuck is this // TODO better implementation of this
tracing::debug!("Decoding headers"); tracing::debug!("Decoding headers");
let result_type = match del.properties.headers().to_owned() { let result_type = match del.properties.headers().to_owned() {
None => { None => {
@ -175,7 +176,7 @@ impl BunBunClient {
}, },
}, },
}; };
tracing::debug!("Result type is: {result_type}, decoding acorrdingly"); tracing::debug!("Result type is: {result_type}, decoding...");
let utf8 = match from_utf8(&del.data) { let utf8 = match from_utf8(&del.data) {
Ok(r) => r, Ok(r) => r,
Err(error) => { Err(error) => {
@ -193,7 +194,7 @@ impl BunBunClient {
tracing::error!("Failed to decode response message to E"); tracing::error!("Failed to decode response message to E");
return Err(RpcClientError::FailedDecode); return Err(RpcClientError::FailedDecode);
} }
Ok(res) => return Err(RpcClientError::ServerErrored(res)), Ok(res) => return Ok(Err(res)),
}, },
ResultHeader::Panic => return Err(RpcClientError::ServerPanicked), ResultHeader::Panic => return Err(RpcClientError::ServerPanicked),
ResultHeader::Ok => ResultHeader::Ok =>
@ -204,7 +205,7 @@ impl BunBunClient {
tracing::error!("Failed to decode response message to R"); tracing::error!("Failed to decode response message to R");
return Err(RpcClientError::FailedDecode); return Err(RpcClientError::FailedDecode);
} }
Ok(res) => return Ok(res), Ok(res) => return Ok(Ok(res)),
} }
} }
} }
@ -215,9 +216,9 @@ impl BunBunClient {
/// # Examples /// # Examples
/// ///
/// ``` /// ```
/// use jobhandler::client::Client; /// use bunbun_worker::client::Client;
/// let client = Client::new("amqp://127.0.0.1:5672"); /// let client = Client::new("amqp://127.0.0.1:5672");
/// let result = client.call(EmailJob::new("someone@example.com", "Hello there"), "email_channel"); /// let result = client.call(EmailJob::new("someone@example.com", "Hello there"), "email-emailjob-v1.0.0");
/// ``` /// ```
pub async fn call<T>(&self, data: T, queue_name: &str) -> Result<(), ClientError> pub async fn call<T>(&self, data: T, queue_name: &str) -> Result<(), ClientError>
where where
@ -264,20 +265,21 @@ impl BunBunClient {
} }
} }
/// An error that the bunbunclient returns
#[derive(Debug)] #[derive(Debug)]
pub enum RpcClientError<E> { pub enum RpcClientError {
NoReply, NoReply, // TODO timeout
FailedDecode, FailedDecode,
FailedToSend, FailedToSend,
InvalidResponse, InvalidResponse,
ServerErrored(E),
ServerPanicked, ServerPanicked,
} }
/// An error for normal calls
#[derive(Debug)] #[derive(Debug)]
pub enum ClientError { pub enum ClientError {
FailedToSend, FailedToSend,
} }
impl Display for ClientError { impl Display for ClientError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self { match self {
@ -298,7 +300,7 @@ pub trait RPCClientTask: Sized + Debug + DeserializeOwned {
fn create_timeout_headers() -> FieldTable { fn create_timeout_headers() -> FieldTable {
let mut table = FieldTable::default(); let mut table = FieldTable::default();
// 60 second expiry // 60 second expiry, will not start counting down, only once there are no consumers on the channel
table.insert("x-expires".into(), lapin::types::AMQPValue::LongInt(6_000)); table.insert("x-expires".into(), lapin::types::AMQPValue::LongInt(6_000));
table table
} }

View file

@ -1,14 +1,13 @@
use futures::{ use futures::{
future::{join_all, BoxFuture}, future::{join_all, BoxFuture},
FutureExt, StreamExt, StreamExt,
}; };
use lapin::{ use lapin::{
options::{ options::{
BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions, BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions,
BasicQosOptions, BasicQosOptions,
}, },
protocol::basic::Consume, types::{DeliveryTag, FieldTable, ShortString},
types::{DeliveryTag, FieldTable, LongString, ShortString},
BasicProperties, Channel, Connection, ConnectionProperties, Consumer, BasicProperties, Channel, Connection, ConnectionProperties, Consumer,
}; };
use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde::{de::DeserializeOwned, Deserialize, Serialize};
@ -18,12 +17,15 @@ use std::{
str::from_utf8, str::from_utf8,
sync::Arc, sync::Arc,
}; };
use tokio::{signal, sync::Mutex};
/// The client module that interacts with the server part of `bunbun-worker`
pub mod client; pub mod client;
mod test; mod test;
pub struct BunBunWorker { pub struct BunBunWorker {
channel: Channel, channel: Channel,
/// A consumer for each rpc handler
rpc_consumers: Vec<Consumer>, rpc_consumers: Vec<Consumer>,
rpc_handlers: Vec< rpc_handlers: Vec<
Arc< Arc<
@ -34,6 +36,7 @@ pub struct BunBunWorker {
+ Sync, + Sync,
>, >,
>, >,
/// A consumer for each non-rpc handler
consumers: Vec<Consumer>, consumers: Vec<Consumer>,
handlers: Vec< handlers: Vec<
Arc< Arc<
@ -46,7 +49,13 @@ pub struct BunBunWorker {
>, >,
} }
// TODO implement reconnect
// TODO implement tls
impl BunBunWorker { impl BunBunWorker {
/// Create a new instance of `bunbun-worker`
/// # Arguments
/// * `amqp_server_url` - A string slice that holds the url of the amqp server (e.g. amqp://localhost:5672)
/// * `limit` - An optional u16 that holds the limit of the number of messages to prefetch 0 by default
pub async fn new(amqp_server_url: impl Into<String>, limit: Option<u16>) -> Self { pub async fn new(amqp_server_url: impl Into<String>, limit: Option<u16>) -> Self {
let channel = Self::create_channel(amqp_server_url.into(), limit).await; let channel = Self::create_channel(amqp_server_url.into(), limit).await;
BunBunWorker { BunBunWorker {
@ -75,6 +84,11 @@ impl BunBunWorker {
None => conn.create_channel().await.expect("create channel error"), None => conn.create_channel().await.expect("create channel error"),
} }
} }
/// Add a non-rpc listener to the worker object
///
/// # Arguments
/// * `queue_name` - A string slice that holds the name of the queue to listen to (e.g. service-serviceJobName-v1.0.0)
/// * `state` - An Arc of the state object that will be passed to the listener
pub async fn add_non_rpc_consumer<J: NonRPCServerTask + 'static + Send>( pub async fn add_non_rpc_consumer<J: NonRPCServerTask + 'static + Send>(
&mut self, &mut self,
queue_name: &str, queue_name: &str,
@ -126,6 +140,20 @@ impl BunBunWorker {
self.handlers.push(handler); self.handlers.push(handler);
self.consumers.push(consumer); self.consumers.push(consumer);
} }
/// Add an rpc job listener to the worker object
///
/// # Arguments
/// * `queue_name` - A string slice that holds the name of the queue to listen to (e.g. service-serviceJobName-v1.0.0)
/// * `state` - An Arc of the state object that will be passed to the listener
///
/// # Examples
///
/// ```
///
/// let server = BunBunWorker::new("amqp://localhost:5672", None).await;
/// server.add_rpc_consumer::<MyRPCServerTask>("service-serviceJobName-v1.0.0", SomeState{} )).await;
/// server.start_all_listeners().await;
/// ```
pub async fn add_rpc_consumer<J: RPCServerTask + 'static + Send>( pub async fn add_rpc_consumer<J: RPCServerTask + 'static + Send>(
&mut self, &mut self,
queue_name: &str, queue_name: &str,
@ -231,14 +259,16 @@ impl BunBunWorker {
self.rpc_consumers.push(consumer); self.rpc_consumers.push(consumer);
} }
/// Start all the listeners added to the worker object
pub async fn start_all_listeners(&self) { pub async fn start_all_listeners(&self) {
let mut listeners = vec![]; let mut listeners = vec![];
for (handler, consumer) in self.handlers.iter().zip(self.consumers.iter()) { for (handler, consumer) in self.handlers.iter().zip(self.consumers.iter()) {
let consumer = consumer.clone(); let consumer = consumer.clone();
let handler = Arc::clone(handler); let handler = Arc::clone(handler);
tracing::debug!( tracing::info!(
"Listening for incoming messages for queue: {}", "Started listening for incoming messages on queue: {} | Non-rpc",
consumer.queue().as_str() consumer.queue().as_str()
); );
listeners.push(tokio::spawn(async move { listeners.push(tokio::spawn(async move {
@ -265,7 +295,7 @@ impl BunBunWorker {
let handler = Arc::clone(handler); let handler = Arc::clone(handler);
tracing::debug!( tracing::debug!(
"Listening for incoming messages for queue: {}", "Started listening for incoming messages on queue: {} | RPC",
consumer.queue().as_str() consumer.queue().as_str()
); );
listeners.push(tokio::spawn(async move { listeners.push(tokio::spawn(async move {
@ -290,11 +320,31 @@ impl BunBunWorker {
} }
} }
/// A trait that defines the structure of a task that can be run by the worker
/// The task must be deserializable and serializable
/// The task must have a result and an errored result
/// # Examples
/// ```
/// #[derive(Debug, Serialize, Deserialize)]
/// struct MyRPCServerTask {
/// pub name: String,
/// }
/// impl RPCServerTask for MyRPCServerTask {
/// type Result = String;
/// type ErroredResult = String;
/// type State = SomeState;
///
/// fn run(self, state: Arc<Self::State>) -> BoxFuture<'static, Result<Self::Result, Self::ErroredResult>> {
/// async move {
/// Ok("Hello".to_string())
/// }.boxed()
/// }
pub trait RPCServerTask: Sized + Debug + DeserializeOwned { pub trait RPCServerTask: Sized + Debug + DeserializeOwned {
type Result: Serialize + DeserializeOwned + Debug; type Result: Serialize + DeserializeOwned + Debug;
type ErroredResult: Serialize + DeserializeOwned + Debug; type ErroredResult: Serialize + DeserializeOwned + Debug;
type State: Clone + Debug; type State: Clone + Debug;
/// Decoding for the message. Overriding is possible.
fn decode(data: Vec<u8>) -> Result<Self, RabbitDecodeError> { fn decode(data: Vec<u8>) -> Result<Self, RabbitDecodeError> {
let job = match from_utf8(&data) { let job = match from_utf8(&data) {
Err(_) => { Err(_) => {
@ -308,6 +358,7 @@ pub trait RPCServerTask: Sized + Debug + DeserializeOwned {
Ok(job) Ok(job)
} }
/// The function that will run once a message is received
fn run( fn run(
self, self,
state: Arc<Self::State>, state: Arc<Self::State>,
@ -317,7 +368,35 @@ pub trait RPCServerTask: Sized + Debug + DeserializeOwned {
fn display(&self) -> String { fn display(&self) -> String {
format!("{:?}", self) format!("{:?}", self)
} }
// TODO add a function that runs after a job is ran
// TODO add a function that runs before a job is about to run
} }
/// A NonRPCServer task
/// Implement this trait to any struct to make it a runnable `non-rpc` job.
///
/// Examples
///
/// ```
///
/// #[derive(Deserialize, Serialize, Clone, Debug)]
/// pub struct EmailJob {
/// send_to: String,
/// contents: String,
/// }
/// impl NonRPCServerTask for EmailJob {
/// type State = State;
/// fn run(
/// self,
/// state: Self::State,
/// ) -> futures::prelude::future::BoxFuture<'static, Result<(), ()>>
/// {
/// Box::pin(async move {
/// todo!();
/// })
/// }
/// }
/// ```
pub trait NonRPCServerTask: Sized + Debug + DeserializeOwned { pub trait NonRPCServerTask: Sized + Debug + DeserializeOwned {
type State: Clone + Debug; type State: Clone + Debug;
@ -343,9 +422,13 @@ pub trait NonRPCServerTask: Sized + Debug + DeserializeOwned {
fn display(&self) -> String { fn display(&self) -> String {
format!("{:?}", self) format!("{:?}", self)
} }
// TODO add a function that runs after a job is ran
// TODO add a function that runs before a job is about to run
} }
#[derive(Debug)] #[derive(Debug)]
/// A decode error
pub enum RabbitDecodeError { pub enum RabbitDecodeError {
NotJson, NotJson,
InvalidField, InvalidField,
@ -415,6 +498,8 @@ fn create_header(header: ResultHeader) -> FieldTable {
); );
headers headers
} }
/// A result header that is included in the header of the AMQP message.
/// It indicates the status of the returned message
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub enum ResultHeader { pub enum ResultHeader {
Ok, Ok,

View file

@ -33,7 +33,7 @@ mod test {
pub struct EmailJobResult { pub struct EmailJobResult {
contents: String, contents: String,
} }
#[derive(Deserialize, Serialize, Clone, Debug)] #[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
pub enum EmailJobResultError { pub enum EmailJobResultError {
Errored, Errored,
} }
@ -134,17 +134,16 @@ mod test {
.unwrap(); .unwrap();
assert_eq!( assert_eq!(
result, result,
EmailJobResult { Ok(EmailJobResult {
contents: "something".to_string() contents: "something".to_string()
} })
) )
} }
#[test(tokio::test)] #[test(tokio::test)]
#[traced_test] #[traced_test]
async fn rpc_client_spam_multithread() { async fn rpc_client_spam_multithread() {
// let client = BunBunClient::new(env::var("AMQP_SERVER_URL").unwrap().as_str())
let mut client = BunBunClient::new(env::var("AMQP_SERVER_URL").unwrap().as_str())
.await .await
.unwrap(); .unwrap();
let client = Arc::new(Mutex::new(client)); let client = Arc::new(Mutex::new(client));