diff --git a/consumer/src/main.rs b/consumer/src/main.rs index 9bde381..d127a7b 100644 --- a/consumer/src/main.rs +++ b/consumer/src/main.rs @@ -8,11 +8,11 @@ use common::EmailJob; use futures::{io, StreamExt}; use lapin::{ options::*, - protocol::channel, + protocol::{basic::Consume, channel}, types::{FieldTable, ShortString}, BasicProperties, Channel, Connection, ConnectionProperties, Consumer, }; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::{ convert::TryInto, ops::Deref, @@ -91,6 +91,7 @@ async fn main() -> Result<(), Box> { log::debug!("Spawning worker"); WorkerBuilder::new(&format!("scraper")) .layer(TraceLayer::new()) + .data(channel.clone()) .backend(storage.clone()) .build_fn(handle_job) }) @@ -99,22 +100,47 @@ async fn main() -> Result<(), Box> { .unwrap() }; - let handler = handle_incoming(consumer, channel, storage.clone()); + let handler = handle_incoming(consumer.clone(), channel.clone(), storage.clone()); tokio::join!(monitor, handler); Ok(()) } -async fn handle_job(job: EmailJob) -> String { - log::info!("Sent email to {} with task id:", job.to(),); - format!("Sent email to {}", job.to()); +async fn handle_job(job: WrapperJob, channel: Data) -> String { + let email_job = job.job; + format!("Sent email to {}", email_job.to()); + + channel + .basic_publish( + "", + &job.routing_key, + BasicPublishOptions::default(), + "Ok".as_bytes(), + BasicProperties::default().with_correlation_id(job.correlation_id), + ) + .await + .unwrap(); + + channel + .basic_ack(job.delivery_tag, BasicAckOptions::default()) + .await + .unwrap(); + "asd".to_string() } +#[derive(Debug, Serialize, Deserialize)] +struct WrapperJob { + pub job: EmailJob, + pub correlation_id: ShortString, + pub routing_key: String, + pub delivery_tag: u64, +} + async fn handle_incoming( mut consumer: Consumer, channel: Channel, - mut storage: RedisStorage, + mut storage: RedisStorage, ) { log::info!("Listening for incoming messages"); while let Some(delivery) = consumer.next().await { @@ -139,17 +165,21 @@ async fn handle_incoming( .ok_or(Error::MissingCorrelationId) .unwrap(); - match storage.push(data).await { + match storage + .push(WrapperJob { + correlation_id: correlation_id, + job: data, + routing_key: routing_key.to_string(), + delivery_tag: delivery.delivery_tag, + }) + .await + { Ok(task_id) => { - log::debug!( - "Job has been sent to the background worker with id: {}", - task_id - ); + log::debug!("Job has been sent to the background worker with id: {task_id}",); } Err(error) => { log::error!( - "Failed to send job to background worker: {}, rejecting in queue", - error + "Failed to send job to background worker: {error}, rejecting in queue", ); channel .basic_nack(delivery.delivery_tag, BasicNackOptions::default())