From c4ab5917f37114b901c0445e929329f81335ee06 Mon Sep 17 00:00:00 2001 From: 4o1x5 <4o1x5@4o1x5.dev> Date: Sun, 25 Aug 2024 10:53:19 +0200 Subject: [PATCH] apalis implementation: now any messages incoming are sent to apalis to be processeds but I still haven't found a way to ack the message once the worker is done with the job --- Cargo.lock | 10 ++++ consumer/.env | 2 + consumer/Cargo.toml | 1 + consumer/src/main.rs | 121 +++++++++++++++++++++++++++++++++---------- producer/src/main.rs | 6 ++- 5 files changed, 112 insertions(+), 28 deletions(-) create mode 100644 consumer/.env diff --git a/Cargo.lock b/Cargo.lock index 194be18..2fe8983 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -737,6 +737,7 @@ dependencies = [ "derive_builder", "dotenv", "env_logger", + "envy", "futures", "futures-lite 2.3.0", "lapin", @@ -1026,6 +1027,15 @@ dependencies = [ "log", ] +[[package]] +name = "envy" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f47e0157f2cb54f5ae1bd371b30a2ae4311e1c028f575cd4e81de7353215965" +dependencies = [ + "serde", +] + [[package]] name = "equivalent" version = "1.0.1" diff --git a/consumer/.env b/consumer/.env new file mode 100644 index 0000000..06bc7cf --- /dev/null +++ b/consumer/.env @@ -0,0 +1,2 @@ +RABBITMQ_URL="amqp://127.0.0.1:5672" +REDIS_URL=redis://:password@localhost/ \ No newline at end of file diff --git a/consumer/Cargo.toml b/consumer/Cargo.toml index 7f70e09..4552115 100644 --- a/consumer/Cargo.toml +++ b/consumer/Cargo.toml @@ -21,6 +21,7 @@ lapin = "2.5.0" uuid = { version = "1.10.0", features = ["v4"] } futures-lite = "2.3.0" futures = "0.3.30" +envy = "0.4.2" [dependencies.common] path = "../common" diff --git a/consumer/src/main.rs b/consumer/src/main.rs index 94674f9..9bde381 100644 --- a/consumer/src/main.rs +++ b/consumer/src/main.rs @@ -1,10 +1,26 @@ +use apalis::{ + layers::tracing::TraceLayer, + prelude::{Data, Monitor, Storage, TaskId, WorkerBuilder, WorkerFactoryFn}, + utils::TokioExecutor, +}; +use apalis_redis::RedisStorage; use common::EmailJob; -use futures::StreamExt; -use lapin::{options::*, types::FieldTable, BasicProperties, Connection, ConnectionProperties}; -use std::fmt::Display; -use std::{convert::TryInto, str::from_utf8}; +use futures::{io, StreamExt}; +use lapin::{ + options::*, + protocol::channel, + types::{FieldTable, ShortString}, + BasicProperties, Channel, Connection, ConnectionProperties, Consumer, +}; +use serde::Deserialize; +use std::{ + convert::TryInto, + ops::Deref, + str::from_utf8, + sync::{atomic, Arc}, +}; +use std::{fmt::Display, thread, time::Duration}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; - #[derive(Debug)] enum Error { CannotDecodeArg, @@ -24,21 +40,32 @@ impl Display for Error { } } +#[derive(Debug, Deserialize)] +pub struct Configuration { + redis_url: String, + rabbitmq_url: String, +} + #[tokio::main] async fn main() -> Result<(), Box> { + // Get .env variables + let _ = dotenv::dotenv().ok(); + // Enable tracing tracing_subscriber::registry() .with(tracing_subscriber::EnvFilter::new( std::env::var("RUST_LOG").unwrap_or_else(|_| "debug".into()), )) .with(tracing_subscriber::fmt::layer()) .init(); - let addr = "amqp://127.0.0.1:5672"; - let conn = Connection::connect(addr, ConnectionProperties::default()).await?; + + let c = envy::from_env::().expect("Invalid env vars"); + let conn = Connection::connect(&c.rabbitmq_url, ConnectionProperties::default()).await?; let channel = conn.create_channel().await?; + // Declare queue for email jobs channel .queue_declare( - "rpc_queue", + "email_queue", QueueDeclareOptions::default(), FieldTable::default(), ) @@ -48,16 +75,51 @@ async fn main() -> Result<(), Box> { let mut consumer = channel .basic_consume( - "rpc_queue", + "email_queue", "rpc_server", BasicConsumeOptions::default(), FieldTable::default(), ) .await?; + let conn = apalis_redis::connect(c.redis_url.clone()).await.unwrap(); + let storage = apalis_redis::RedisStorage::new(conn); + + let monitor = async { + Monitor::::new() + .register_with_count(4, { + log::debug!("Spawning worker"); + WorkerBuilder::new(&format!("scraper")) + .layer(TraceLayer::new()) + .backend(storage.clone()) + .build_fn(handle_job) + }) + .run() + .await + .unwrap() + }; + + let handler = handle_incoming(consumer, channel, storage.clone()); + + tokio::join!(monitor, handler); + Ok(()) +} + +async fn handle_job(job: EmailJob) -> String { + log::info!("Sent email to {} with task id:", job.to(),); + format!("Sent email to {}", job.to()); + "asd".to_string() +} + +async fn handle_incoming( + mut consumer: Consumer, + channel: Channel, + mut storage: RedisStorage, +) { + log::info!("Listening for incoming messages"); while let Some(delivery) = consumer.next().await { if let Ok(delivery) = delivery { - log::debug!("Received a request"); + log::debug!("Message received"); let data = serde_json::from_str::(from_utf8(&delivery.data).unwrap()).unwrap(); @@ -66,30 +128,37 @@ async fn main() -> Result<(), Box> { .properties .reply_to() .as_ref() - .ok_or(Error::MissingReplyTo)? + .ok_or(Error::MissingReplyTo) + .unwrap() .as_str(); let correlation_id = delivery .properties .correlation_id() .clone() - .ok_or(Error::MissingCorrelationId)?; + .ok_or(Error::MissingCorrelationId) + .unwrap(); - channel - .basic_publish( - "", - routing_key, - BasicPublishOptions::default(), - &format!("Sent email to {}", data.to()).as_bytes(), - BasicProperties::default().with_correlation_id(correlation_id), - ) - .await?; + match storage.push(data).await { + Ok(task_id) => { + log::debug!( + "Job has been sent to the background worker with id: {}", + task_id + ); + } + Err(error) => { + log::error!( + "Failed to send job to background worker: {}, rejecting in queue", + error + ); + channel + .basic_nack(delivery.delivery_tag, BasicNackOptions::default()) + .await + .unwrap(); - channel - .basic_ack(delivery.delivery_tag, BasicAckOptions::default()) - .await?; + return; + } + } } } - - Ok(()) } diff --git a/producer/src/main.rs b/producer/src/main.rs index d5310ff..568c1ac 100644 --- a/producer/src/main.rs +++ b/producer/src/main.rs @@ -1,3 +1,4 @@ +// Original example taken from https://github.com/rabbitmq/rabbitmq-tutorials/tree/main/rust-lapin/src/bin use common::EmailJobBuilder; use futures::StreamExt; use lapin::{ @@ -82,7 +83,7 @@ impl RpcClient { self.channel .basic_publish( "", - "rpc_queue", + "email_queue", BasicPublishOptions::default(), serde_json::to_string(&data).unwrap().as_bytes(), BasicProperties::default() @@ -117,8 +118,8 @@ async fn main() -> Result<(), Box> { )) .with(tracing_subscriber::fmt::layer()) .init(); + let mut fibonacci_rpc = RpcClient::new().await?; - println!(" [x] Requesting fib(30)"); let email = EmailJobBuilder::default() .to("someone@example.com".into()) @@ -126,6 +127,7 @@ async fn main() -> Result<(), Box> { .build() .unwrap(); loop { + log::info!("Sending message"); let response = fibonacci_rpc.call(email.clone()).await?; log::info!("Got back fib: {}", response); thread::sleep(Duration::from_secs(1))