diff --git a/.forgejo/workflows/build_docker_images.yaml b/.forgejo/workflows/build_docker_images.yaml index c7ced59..8a3ccd9 100644 --- a/.forgejo/workflows/build_docker_images.yaml +++ b/.forgejo/workflows/build_docker_images.yaml @@ -40,9 +40,6 @@ jobs: # add kvm support extra_nix_config: | system-features = nixos-test benchmark big-parallel kvm - - - - name: Build, import, tag and push producer container run: nix build .#producer-container && docker image load --input result && docker image tag producer:latest git.4o1x5.dev/4o1x5/producer:latest && docker image push git.4o1x5.dev/4o1x5/producer:latest diff --git a/Cargo.lock b/Cargo.lock index bcdc62c..194be18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -737,6 +737,7 @@ dependencies = [ "derive_builder", "dotenv", "env_logger", + "futures", "futures-lite 2.3.0", "lapin", "log", @@ -2201,6 +2202,7 @@ dependencies = [ "derive_builder", "dotenv", "env_logger", + "futures", "futures-lite 2.3.0", "lapin", "log", diff --git a/consumer/Cargo.toml b/consumer/Cargo.toml index 07dd06d..7f70e09 100644 --- a/consumer/Cargo.toml +++ b/consumer/Cargo.toml @@ -20,6 +20,7 @@ dotenv = "0.15.0" lapin = "2.5.0" uuid = { version = "1.10.0", features = ["v4"] } futures-lite = "2.3.0" +futures = "0.3.30" [dependencies.common] path = "../common" diff --git a/consumer/src/main.rs b/consumer/src/main.rs index 11287db..94674f9 100644 --- a/consumer/src/main.rs +++ b/consumer/src/main.rs @@ -1,62 +1,94 @@ -use std::str::from_utf8; - use common::EmailJob; -use futures_lite::stream::StreamExt; -use lapin::{ - options::*, - publisher_confirm::Confirmation, - types::{FieldTable, ShortString}, - BasicProperties, Connection, ConnectionProperties, Result, -}; -use log::*; +use futures::StreamExt; +use lapin::{options::*, types::FieldTable, BasicProperties, Connection, ConnectionProperties}; +use std::fmt::Display; +use std::{convert::TryInto, str::from_utf8}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +#[derive(Debug)] +enum Error { + CannotDecodeArg, + MissingReplyTo, + MissingCorrelationId, +} + +impl std::error::Error for Error {} + +impl Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + Error::CannotDecodeArg => write!(f, "Cannot decode argument"), + Error::MissingReplyTo => write!(f, "Missing 'reply to' property"), + Error::MissingCorrelationId => write!(f, "Missing 'correlation id' property"), + } + } +} + #[tokio::main] -async fn main() -> Result<()> { +async fn main() -> Result<(), Box> { tracing_subscriber::registry() .with(tracing_subscriber::EnvFilter::new( std::env::var("RUST_LOG").unwrap_or_else(|_| "debug".into()), )) .with(tracing_subscriber::fmt::layer()) .init(); - // Open connection. + let addr = "amqp://127.0.0.1:5672"; + let conn = Connection::connect(addr, ConnectionProperties::default()).await?; + let channel = conn.create_channel().await?; - let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into()); - let conn = Connection::connect(&addr, ConnectionProperties::default()) - .await - .unwrap(); + channel + .queue_declare( + "rpc_queue", + QueueDeclareOptions::default(), + FieldTable::default(), + ) + .await?; - info!("CONNECTED"); - let channel_b = conn.create_channel().await.unwrap(); + channel.basic_qos(1, BasicQosOptions::default()).await?; - // declare a consumer - let mut consumer = channel_b + let mut consumer = channel .basic_consume( - "email", - "", + "rpc_queue", + "rpc_server", BasicConsumeOptions::default(), FieldTable::default(), ) .await?; - // watch for incoming messages while let Some(delivery) = consumer.next().await { - let delivery = delivery.expect("error in consumer"); - let reply = delivery.properties.correlation_id(); - log::info!("Reply id: {:?}", reply); - let data = serde_json::from_str::(from_utf8(&delivery.data).unwrap()).unwrap(); - channel_b - .basic_publish( - "", - delivery.properties.reply_to().clone().unwrap().as_str(), - BasicPublishOptions::default(), - format!("Sent email to {}", data.to()).as_bytes(), - BasicProperties::default(), - ) - .await - .unwrap(); - delivery.ack(BasicAckOptions::default()).await.unwrap(); - info!("Received emailjob {:?} and replied back", data); + if let Ok(delivery) = delivery { + log::debug!("Received a request"); + + let data = + serde_json::from_str::(from_utf8(&delivery.data).unwrap()).unwrap(); + + let routing_key = delivery + .properties + .reply_to() + .as_ref() + .ok_or(Error::MissingReplyTo)? + .as_str(); + + let correlation_id = delivery + .properties + .correlation_id() + .clone() + .ok_or(Error::MissingCorrelationId)?; + + channel + .basic_publish( + "", + routing_key, + BasicPublishOptions::default(), + &format!("Sent email to {}", data.to()).as_bytes(), + BasicProperties::default().with_correlation_id(correlation_id), + ) + .await?; + + channel + .basic_ack(delivery.delivery_tag, BasicAckOptions::default()) + .await?; + } } Ok(()) diff --git a/producer/Cargo.toml b/producer/Cargo.toml index daeb567..3040a3b 100644 --- a/producer/Cargo.toml +++ b/producer/Cargo.toml @@ -20,6 +20,7 @@ dotenv = "0.15.0" lapin = "2.5.0" uuid = { version = "1.10.0", features = ["v4"] } futures-lite = "2.3.0" +futures = "0.3.30" [dependencies.common] path = "../common" diff --git a/producer/src/main.rs b/producer/src/main.rs index 6044285..d5310ff 100644 --- a/producer/src/main.rs +++ b/producer/src/main.rs @@ -1,97 +1,133 @@ -use std::{str::from_utf8, thread, time::Duration}; - use common::EmailJobBuilder; -use futures_lite::StreamExt; +use futures::StreamExt; use lapin::{ - options::*, - publisher_confirm::Confirmation, - types::{FieldTable, ShortString}, - BasicProperties, Connection, ConnectionProperties, Result, + options::*, types::FieldTable, types::ShortString, BasicProperties, Channel, Connection, + ConnectionProperties, Consumer, Queue, }; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use serde::{de::DeserializeOwned, Serialize}; +use std::fmt::Display; +use std::thread; +use std::time::Duration; +use std::{convert::TryInto, str::from_utf8}; use uuid::Uuid; + +#[derive(Debug)] +enum Error { + CannotDecodeReply, + NoReply, +} + +impl std::error::Error for Error {} + +impl Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + Error::CannotDecodeReply => write!(f, "Cannot decode reply"), + Error::NoReply => write!(f, "No reply arrived"), + } + } +} + +struct RpcClient { + conn: Connection, + channel: Channel, + callback_queue: Queue, + consumer: Consumer, + correlation_id: ShortString, +} + +impl RpcClient { + async fn new() -> Result { + let addr = "amqp://127.0.0.1:5672"; + let conn = Connection::connect(addr, ConnectionProperties::default()).await?; + let channel = conn.create_channel().await?; + let callback_queue = channel + .queue_declare( + "", + QueueDeclareOptions { + exclusive: true, + ..Default::default() + }, + FieldTable::default(), + ) + .await?; + + let consumer = channel + .basic_consume( + callback_queue.name().as_str(), + "rpc_client", + BasicConsumeOptions { + no_ack: true, + ..Default::default() + }, + FieldTable::default(), + ) + .await?; + + let correlation_id = Uuid::new_v4().to_string().into(); + + Ok(Self { + conn, + channel, + callback_queue, + consumer, + correlation_id, + }) + } + + async fn call(&mut self, data: T) -> Result> + where + T: Serialize + DeserializeOwned, + { + self.channel + .basic_publish( + "", + "rpc_queue", + BasicPublishOptions::default(), + serde_json::to_string(&data).unwrap().as_bytes(), + BasicProperties::default() + .with_reply_to(self.callback_queue.name().clone()) + .with_correlation_id(self.correlation_id.clone()), + ) + .await? + .await?; + + while let Some(delivery) = self.consumer.next().await { + if let Ok(delivery) = delivery { + if delivery.properties.correlation_id().as_ref() == Some(&self.correlation_id) { + return Ok(from_utf8(&delivery.data).unwrap().to_string()); + } + } + } + + Err(Box::new(Error::NoReply)) + } + + async fn close(&self) -> Result<(), lapin::Error> { + self.conn.close(0, "").await + } +} + +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; #[tokio::main] -async fn main() { +async fn main() -> Result<(), Box> { tracing_subscriber::registry() .with(tracing_subscriber::EnvFilter::new( std::env::var("RUST_LOG").unwrap_or_else(|_| "debug".into()), )) .with(tracing_subscriber::fmt::layer()) .init(); - let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into()); - let conn = Connection::connect(&addr, ConnectionProperties::default()) - .await - .unwrap(); - let channel_a = conn.create_channel().await.unwrap(); - let channel_b = conn.create_channel().await.unwrap(); + let mut fibonacci_rpc = RpcClient::new().await?; + println!(" [x] Requesting fib(30)"); let email = EmailJobBuilder::default() .to("someone@example.com".into()) - .content("test".into()) + .content("Hello there".into()) .build() .unwrap(); - let queue = channel_a - .queue_declare( - "email", - QueueDeclareOptions::default(), - FieldTable::default(), - ) - .await - .unwrap(); - let mut consumer = channel_a - .basic_consume( - "email", - "", - BasicConsumeOptions::default(), - FieldTable::default(), - ) - .await - .unwrap(); - - let mut request = Uuid::new_v4(); loop { - let confirm = channel_b - .basic_publish( - "", - "email", - BasicPublishOptions::default(), - serde_json::to_string(&email).unwrap().as_bytes(), - BasicProperties::default() - .with_correlation_id(request.to_string().into()) - .with_reply_to(consumer.queue()), - ) - .await - .unwrap() - .await - .unwrap(); - while let Some(delivery) = consumer.next().await { - if let Ok(delivery) = delivery { - if delivery - .properties - .correlation_id() - .as_ref() - .unwrap_or(&ShortString::from("asd")) - .eq(&ShortString::from(request.to_string())) - { - log::info!( - "Received data from consumer: {:?}", - from_utf8(&delivery.data) - ); - break; - } else { - log::info!( - "Received data from consumer: {:?}", - from_utf8(&delivery.data) - ); - log::error!("Received invalid delivery"); - break; - } - } - } - - request = Uuid::new_v4(); - log::info!("Sent request"); - + let response = fibonacci_rpc.call(email.clone()).await?; + log::info!("Got back fib: {}", response); thread::sleep(Duration::from_secs(1)) } }