Compare commits

..

No commits in common. "d6c41713b926fe60d8e36e878d0699afec9c37d2" and "4012f84886aa023d78facdab3ffe2f8fc43be1ff" have entirely different histories.

3 changed files with 51 additions and 100 deletions

View file

@ -43,7 +43,7 @@ impl Client {
///
/// Arguments
/// * `data` The job that will be sent to the queue, must implement Deserialize and Serialize
/// * `options` BasicCallOptions, used to control the timeout and message version
/// * `queue_name` The name of the queue to be sent to
// TODO if the queue is nonexistent return error
pub async fn rpc_call<T: RPCClientTask + Send + Debug>(
&self,
@ -235,10 +235,6 @@ impl Client {
}
/// Sends a basic Task to the queue
///
/// Arguments
/// * `data` The job that will be sent to the queue, must implement Deserialize and Serialize
/// * `options` BasicCallOptions, used to control the timeout and message version
pub async fn call<T>(&self, data: T, options: BasicCallOptions) -> Result<(), ClientError>
where
T: Serialize + DeserializeOwned,

View file

@ -1,7 +1,6 @@
// TODO clean up code...
use futures::{
future::{join_all, BoxFuture},
StreamExt, TryStreamExt,
StreamExt,
};
use lapin::{
options::{
@ -30,7 +29,7 @@ mod test;
pub struct Worker {
channel: Channel,
/// A consumer for each rpc handler
rpc_consumers: Vec<ListenerConfig>,
rpc_consumers: Vec<Consumer>,
rpc_handlers: Vec<
Arc<
dyn Fn(
@ -41,7 +40,7 @@ pub struct Worker {
>,
>,
/// A consumer for each non-rpc handler
consumers: Vec<ListenerConfig>,
consumers: Vec<Consumer>,
handlers: Vec<
Arc<
dyn Fn(
@ -60,7 +59,6 @@ pub struct TlsConfig {
client_cert_and_key: String,
client_cert_and_key_password: String,
}
impl TlsConfig {
/// Create a custom TLS config
pub fn new(
@ -78,6 +76,7 @@ impl TlsConfig {
#[derive(Debug)]
/// A worker configuration
/// Enable tls here
pub struct WorkerConfig {
tls: Option<OwnedTLSConfig>,
}
@ -141,8 +140,6 @@ impl ListenerConfig {
self.consumer_tag = consumer_tag.into();
self
}
/// Set the message version (eg queue_name-v1.0.0)
pub fn set_message_version(mut self, version: impl Into<String>) -> Self {
self.message_version = version.into();
self
@ -189,6 +186,8 @@ impl Worker {
.unwrap(),
};
channel
// TODO set qos for channel
}
/// Add a non-rpc listener to the worker object
@ -196,19 +195,28 @@ impl Worker {
/// # Arguments
/// * `state` - An Arc of the state object that will be passed to the listener
/// * `listener_config` - An Arc of the state object that will be passed to the listener
/// ```
/// use bunbun_worker::{Worker, ListenerConfig, WorkerConfig};
/// let server = Worker::new("amqp://localhost:5672", Workerconfig::default()).await;
/// server.add_non_rpc_consumer::<MyTask>(ListenerConfig::default("service-jobname").set_message_version("v2.0.0") ));
/// server.start_all_listeners().await;
/// ```
pub fn add_non_rpc_consumer<J: Task + 'static + Send>(
pub async fn add_non_rpc_consumer<J: Task + 'static + Send>(
&mut self,
state: Arc<J::State>,
listener_config: ListenerConfig,
) where
<J as Task>::State: std::marker::Send + Sync,
{
let consumer = self
.channel
.basic_consume(
format!(
"{}-{}",
listener_config.queue_name, listener_config.message_version
)
.as_str(),
&listener_config.consumer_tag,
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.expect("basic_consume error");
let handler: Arc<
dyn Fn(
lapin::message::Delivery,
@ -253,7 +261,7 @@ impl Worker {
});
self.handlers.push(handler);
self.consumers.push(listener_config);
self.consumers.push(consumer);
}
/// Add an rpc job listener to the worker object
/// Make sure the type you pass in implements RPCTask
@ -265,9 +273,8 @@ impl Worker {
/// # Examples
///
/// ```
/// use bunbun_worker::{Worker, ListenerConfig, WorkerConfig};
/// let server = Worker::new("amqp://localhost:5672", Workerconfig::default()).await;
/// server.add_rpc_consumer::<MyRPCTask>(ListenerConfig::default("service-jobname").set_message_version("v2.0.0") ));
/// let server = BunBunWorker::new("amqp://localhost:5672", None).await;
/// server.add_rpc_consumer::<MyRPCTask>(ListenerConfig::default("service-jobname-v1.0.0") )).await;
/// server.start_all_listeners().await;
/// ```
pub async fn add_rpc_consumer<J: RPCTask + 'static + Send>(
@ -279,6 +286,22 @@ impl Worker {
<J as RPCTask>::Result: std::marker::Send + Sync,
<J as RPCTask>::ErroredResult: std::marker::Send + Sync,
{
let consumer = create_consumer(
self.channel.clone(),
format!(
"{}-{}",
listener_config.queue_name, listener_config.message_version
)
.as_str(),
&listener_config.consumer_tag,
listener_config.prefetch_count,
)
.await
.map_err(|e| {
tracing::error!("Failed to create consumer: {}", e);
})
.expect("Failed to create consumer");
let channel = self.channel.clone();
let handler: Arc<
dyn Fn(
@ -365,53 +388,21 @@ impl Worker {
});
self.rpc_handlers.push(handler);
self.rpc_consumers.push(listener_config);
self.rpc_consumers.push(consumer);
}
/// Start all the listeners added to the worker object
// TODO implement reconnect
// TODO better error handling
pub async fn start_all_listeners(&self) -> Result<(), String> {
pub async fn start_all_listeners(&self) {
let mut listeners = vec![];
// Start all the non-rpc listeners
for (handler, consumer_config) in self.handlers.iter().zip(self.consumers.iter()) {
// Clone channel
let mut channel = self.channel.clone();
// Set prefetch count
set_consumer_qos(&mut channel, consumer_config.prefetch_count)
.await
.map_err(|e| {
tracing::error!("Failed to set qos: {}", e);
"Failed to set qos".to_string()
})?;
// Create a consumer with modified channel
let consumer = channel
.basic_consume(
format!(
"{}-{}",
consumer_config.queue_name, consumer_config.message_version
)
.as_str(),
&consumer_config.consumer_tag,
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.map_err(|e| {
tracing::error!("Failed to start consumer: {}", e);
"Failed to start consumer".to_string()
})?;
for (handler, consumer) in self.handlers.iter().zip(self.consumers.iter()) {
let consumer = consumer.clone();
let handler = Arc::clone(handler);
tracing::info!(
"Started listening for incoming messages on queue: {} | Non-rpc",
consumer.queue().as_str()
);
listeners.push(tokio::spawn(async move {
consumer
.for_each_concurrent(None, move |delivery| {
@ -431,33 +422,8 @@ impl Worker {
}));
}
for (handler, consumer_config) in self.rpc_handlers.iter().zip(self.rpc_consumers.iter()) {
let mut channel = self.channel.clone();
// Set prefetch count
set_consumer_qos(&mut channel, consumer_config.prefetch_count)
.await
.map_err(|e| {
tracing::error!("Failed to set qos: {}", e);
"Failed to set qos".to_string()
})?;
// Create a consumer with modified channel
let consumer = channel
.basic_consume(
format!(
"{}-{}",
consumer_config.queue_name, consumer_config.message_version
)
.as_str(),
&consumer_config.consumer_tag,
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.map_err(|e| {
tracing::error!("Failed to start consumer: {}", e);
"Failed to start consumer".to_string()
})?;
for (handler, consumer) in self.rpc_handlers.iter().zip(self.rpc_consumers.iter()) {
let consumer = consumer.clone();
let handler = Arc::clone(handler);
tracing::debug!(
@ -482,9 +448,7 @@ impl Worker {
.await;
}));
}
join_all(listeners).await;
Ok(())
}
}
@ -606,8 +570,6 @@ pub trait Task: Sized + Debug + DeserializeOwned {
Ok(job)
}
// TODO Attribute-Based Extraction
/// The method that will be run by the worker
fn run(self, state: Arc<Self::State>) -> BoxFuture<'static, Result<(), ()>>;
/// A function to display the task
@ -740,9 +702,3 @@ async fn create_consumer(
)
.await
}
async fn set_consumer_qos(channel: &mut Channel, prefetch_count: u16) -> Result<(), lapin::Error> {
channel
.basic_qos(prefetch_count, BasicQosOptions::default())
.await
}

View file

@ -3,7 +3,7 @@ mod test {
use std::{env, os::unix::thread, sync::Arc, time::Duration};
use futures::{future::join_all, FutureExt};
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use test_log::test;
use tokio::{
@ -52,14 +52,13 @@ mod test {
state: Arc<Self::State>,
) -> futures::prelude::future::BoxFuture<'static, Result<Self::Result, Self::ErroredResult>>
{
async move {
Box::pin(async move {
tracing::info!("Sent email to {}", self.send_to);
tokio::time::sleep(Duration::from_secs(2)).await;
return Ok(EmailJobResult {
contents: self.contents.clone(),
});
}
.boxed()
})
}
async fn before_job(self) -> Self
where