consumer: stupid wrapper for job to send back data
All checks were successful
CD / docker (push) Successful in 13m11s

This commit is contained in:
2005 2024-08-26 10:29:13 +02:00
parent c4ab5917f3
commit 8a2cc61c0d

View file

@ -8,11 +8,11 @@ use common::EmailJob;
use futures::{io, StreamExt}; use futures::{io, StreamExt};
use lapin::{ use lapin::{
options::*, options::*,
protocol::channel, protocol::{basic::Consume, channel},
types::{FieldTable, ShortString}, types::{FieldTable, ShortString},
BasicProperties, Channel, Connection, ConnectionProperties, Consumer, BasicProperties, Channel, Connection, ConnectionProperties, Consumer,
}; };
use serde::Deserialize; use serde::{Deserialize, Serialize};
use std::{ use std::{
convert::TryInto, convert::TryInto,
ops::Deref, ops::Deref,
@ -91,6 +91,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
log::debug!("Spawning worker"); log::debug!("Spawning worker");
WorkerBuilder::new(&format!("scraper")) WorkerBuilder::new(&format!("scraper"))
.layer(TraceLayer::new()) .layer(TraceLayer::new())
.data(channel.clone())
.backend(storage.clone()) .backend(storage.clone())
.build_fn(handle_job) .build_fn(handle_job)
}) })
@ -99,22 +100,47 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.unwrap() .unwrap()
}; };
let handler = handle_incoming(consumer, channel, storage.clone()); let handler = handle_incoming(consumer.clone(), channel.clone(), storage.clone());
tokio::join!(monitor, handler); tokio::join!(monitor, handler);
Ok(()) Ok(())
} }
async fn handle_job(job: EmailJob) -> String { async fn handle_job(job: WrapperJob, channel: Data<Channel>) -> String {
log::info!("Sent email to {} with task id:", job.to(),); let email_job = job.job;
format!("Sent email to {}", job.to()); format!("Sent email to {}", email_job.to());
channel
.basic_publish(
"",
&job.routing_key,
BasicPublishOptions::default(),
"Ok".as_bytes(),
BasicProperties::default().with_correlation_id(job.correlation_id),
)
.await
.unwrap();
channel
.basic_ack(job.delivery_tag, BasicAckOptions::default())
.await
.unwrap();
"asd".to_string() "asd".to_string()
} }
#[derive(Debug, Serialize, Deserialize)]
struct WrapperJob {
pub job: EmailJob,
pub correlation_id: ShortString,
pub routing_key: String,
pub delivery_tag: u64,
}
async fn handle_incoming( async fn handle_incoming(
mut consumer: Consumer, mut consumer: Consumer,
channel: Channel, channel: Channel,
mut storage: RedisStorage<EmailJob>, mut storage: RedisStorage<WrapperJob>,
) { ) {
log::info!("Listening for incoming messages"); log::info!("Listening for incoming messages");
while let Some(delivery) = consumer.next().await { while let Some(delivery) = consumer.next().await {
@ -139,17 +165,21 @@ async fn handle_incoming(
.ok_or(Error::MissingCorrelationId) .ok_or(Error::MissingCorrelationId)
.unwrap(); .unwrap();
match storage.push(data).await { match storage
.push(WrapperJob {
correlation_id: correlation_id,
job: data,
routing_key: routing_key.to_string(),
delivery_tag: delivery.delivery_tag,
})
.await
{
Ok(task_id) => { Ok(task_id) => {
log::debug!( log::debug!("Job has been sent to the background worker with id: {task_id}",);
"Job has been sent to the background worker with id: {}",
task_id
);
} }
Err(error) => { Err(error) => {
log::error!( log::error!(
"Failed to send job to background worker: {}, rejecting in queue", "Failed to send job to background worker: {error}, rejecting in queue",
error
); );
channel channel
.basic_nack(delivery.delivery_tag, BasicNackOptions::default()) .basic_nack(delivery.delivery_tag, BasicNackOptions::default())