apalis implementation: now any messages incoming are sent to apalis to be processeds but I still haven't found a way to ack the message once the worker is done with the job
All checks were successful
CD / docker (push) Successful in 12m47s

This commit is contained in:
2005 2024-08-25 10:53:19 +02:00
parent 5224e1751a
commit c4ab5917f3
5 changed files with 112 additions and 28 deletions

10
Cargo.lock generated
View file

@ -737,6 +737,7 @@ dependencies = [
"derive_builder", "derive_builder",
"dotenv", "dotenv",
"env_logger", "env_logger",
"envy",
"futures", "futures",
"futures-lite 2.3.0", "futures-lite 2.3.0",
"lapin", "lapin",
@ -1026,6 +1027,15 @@ dependencies = [
"log", "log",
] ]
[[package]]
name = "envy"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f47e0157f2cb54f5ae1bd371b30a2ae4311e1c028f575cd4e81de7353215965"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "equivalent" name = "equivalent"
version = "1.0.1" version = "1.0.1"

2
consumer/.env Normal file
View file

@ -0,0 +1,2 @@
RABBITMQ_URL="amqp://127.0.0.1:5672"
REDIS_URL=redis://:password@localhost/

View file

@ -21,6 +21,7 @@ lapin = "2.5.0"
uuid = { version = "1.10.0", features = ["v4"] } uuid = { version = "1.10.0", features = ["v4"] }
futures-lite = "2.3.0" futures-lite = "2.3.0"
futures = "0.3.30" futures = "0.3.30"
envy = "0.4.2"
[dependencies.common] [dependencies.common]
path = "../common" path = "../common"

View file

@ -1,10 +1,26 @@
use apalis::{
layers::tracing::TraceLayer,
prelude::{Data, Monitor, Storage, TaskId, WorkerBuilder, WorkerFactoryFn},
utils::TokioExecutor,
};
use apalis_redis::RedisStorage;
use common::EmailJob; use common::EmailJob;
use futures::StreamExt; use futures::{io, StreamExt};
use lapin::{options::*, types::FieldTable, BasicProperties, Connection, ConnectionProperties}; use lapin::{
use std::fmt::Display; options::*,
use std::{convert::TryInto, str::from_utf8}; protocol::channel,
types::{FieldTable, ShortString},
BasicProperties, Channel, Connection, ConnectionProperties, Consumer,
};
use serde::Deserialize;
use std::{
convert::TryInto,
ops::Deref,
str::from_utf8,
sync::{atomic, Arc},
};
use std::{fmt::Display, thread, time::Duration};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
#[derive(Debug)] #[derive(Debug)]
enum Error { enum Error {
CannotDecodeArg, CannotDecodeArg,
@ -24,21 +40,32 @@ impl Display for Error {
} }
} }
#[derive(Debug, Deserialize)]
pub struct Configuration {
redis_url: String,
rabbitmq_url: String,
}
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Get .env variables
let _ = dotenv::dotenv().ok();
// Enable tracing
tracing_subscriber::registry() tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new( .with(tracing_subscriber::EnvFilter::new(
std::env::var("RUST_LOG").unwrap_or_else(|_| "debug".into()), std::env::var("RUST_LOG").unwrap_or_else(|_| "debug".into()),
)) ))
.with(tracing_subscriber::fmt::layer()) .with(tracing_subscriber::fmt::layer())
.init(); .init();
let addr = "amqp://127.0.0.1:5672";
let conn = Connection::connect(addr, ConnectionProperties::default()).await?; let c = envy::from_env::<Configuration>().expect("Invalid env vars");
let conn = Connection::connect(&c.rabbitmq_url, ConnectionProperties::default()).await?;
let channel = conn.create_channel().await?; let channel = conn.create_channel().await?;
// Declare queue for email jobs
channel channel
.queue_declare( .queue_declare(
"rpc_queue", "email_queue",
QueueDeclareOptions::default(), QueueDeclareOptions::default(),
FieldTable::default(), FieldTable::default(),
) )
@ -48,16 +75,51 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut consumer = channel let mut consumer = channel
.basic_consume( .basic_consume(
"rpc_queue", "email_queue",
"rpc_server", "rpc_server",
BasicConsumeOptions::default(), BasicConsumeOptions::default(),
FieldTable::default(), FieldTable::default(),
) )
.await?; .await?;
let conn = apalis_redis::connect(c.redis_url.clone()).await.unwrap();
let storage = apalis_redis::RedisStorage::new(conn);
let monitor = async {
Monitor::<TokioExecutor>::new()
.register_with_count(4, {
log::debug!("Spawning worker");
WorkerBuilder::new(&format!("scraper"))
.layer(TraceLayer::new())
.backend(storage.clone())
.build_fn(handle_job)
})
.run()
.await
.unwrap()
};
let handler = handle_incoming(consumer, channel, storage.clone());
tokio::join!(monitor, handler);
Ok(())
}
async fn handle_job(job: EmailJob) -> String {
log::info!("Sent email to {} with task id:", job.to(),);
format!("Sent email to {}", job.to());
"asd".to_string()
}
async fn handle_incoming(
mut consumer: Consumer,
channel: Channel,
mut storage: RedisStorage<EmailJob>,
) {
log::info!("Listening for incoming messages");
while let Some(delivery) = consumer.next().await { while let Some(delivery) = consumer.next().await {
if let Ok(delivery) = delivery { if let Ok(delivery) = delivery {
log::debug!("Received a request"); log::debug!("Message received");
let data = let data =
serde_json::from_str::<EmailJob>(from_utf8(&delivery.data).unwrap()).unwrap(); serde_json::from_str::<EmailJob>(from_utf8(&delivery.data).unwrap()).unwrap();
@ -66,30 +128,37 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.properties .properties
.reply_to() .reply_to()
.as_ref() .as_ref()
.ok_or(Error::MissingReplyTo)? .ok_or(Error::MissingReplyTo)
.unwrap()
.as_str(); .as_str();
let correlation_id = delivery let correlation_id = delivery
.properties .properties
.correlation_id() .correlation_id()
.clone() .clone()
.ok_or(Error::MissingCorrelationId)?; .ok_or(Error::MissingCorrelationId)
.unwrap();
channel match storage.push(data).await {
.basic_publish( Ok(task_id) => {
"", log::debug!(
routing_key, "Job has been sent to the background worker with id: {}",
BasicPublishOptions::default(), task_id
&format!("Sent email to {}", data.to()).as_bytes(), );
BasicProperties::default().with_correlation_id(correlation_id), }
) Err(error) => {
.await?; log::error!(
"Failed to send job to background worker: {}, rejecting in queue",
error
);
channel
.basic_nack(delivery.delivery_tag, BasicNackOptions::default())
.await
.unwrap();
channel return;
.basic_ack(delivery.delivery_tag, BasicAckOptions::default()) }
.await?; }
} }
} }
Ok(())
} }

View file

@ -1,3 +1,4 @@
// Original example taken from https://github.com/rabbitmq/rabbitmq-tutorials/tree/main/rust-lapin/src/bin
use common::EmailJobBuilder; use common::EmailJobBuilder;
use futures::StreamExt; use futures::StreamExt;
use lapin::{ use lapin::{
@ -82,7 +83,7 @@ impl RpcClient {
self.channel self.channel
.basic_publish( .basic_publish(
"", "",
"rpc_queue", "email_queue",
BasicPublishOptions::default(), BasicPublishOptions::default(),
serde_json::to_string(&data).unwrap().as_bytes(), serde_json::to_string(&data).unwrap().as_bytes(),
BasicProperties::default() BasicProperties::default()
@ -117,8 +118,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
)) ))
.with(tracing_subscriber::fmt::layer()) .with(tracing_subscriber::fmt::layer())
.init(); .init();
let mut fibonacci_rpc = RpcClient::new().await?; let mut fibonacci_rpc = RpcClient::new().await?;
println!(" [x] Requesting fib(30)");
let email = EmailJobBuilder::default() let email = EmailJobBuilder::default()
.to("someone@example.com".into()) .to("someone@example.com".into())
@ -126,6 +127,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.build() .build()
.unwrap(); .unwrap();
loop { loop {
log::info!("Sending message");
let response = fibonacci_rpc.call(email.clone()).await?; let response = fibonacci_rpc.call(email.clone()).await?;
log::info!("Got back fib: {}", response); log::info!("Got back fib: {}", response);
thread::sleep(Duration::from_secs(1)) thread::sleep(Duration::from_secs(1))