stole example from rabbitmq-tutorials
All checks were successful
CD / docker (push) Successful in 12m41s

This commit is contained in:
2005 2024-08-25 09:13:36 +02:00
parent f42a9adafc
commit 5224e1751a
6 changed files with 188 additions and 119 deletions

View file

@ -40,9 +40,6 @@ jobs:
# add kvm support # add kvm support
extra_nix_config: | extra_nix_config: |
system-features = nixos-test benchmark big-parallel kvm system-features = nixos-test benchmark big-parallel kvm
- -
name: Build, import, tag and push producer container name: Build, import, tag and push producer container
run: nix build .#producer-container && docker image load --input result && docker image tag producer:latest git.4o1x5.dev/4o1x5/producer:latest && docker image push git.4o1x5.dev/4o1x5/producer:latest run: nix build .#producer-container && docker image load --input result && docker image tag producer:latest git.4o1x5.dev/4o1x5/producer:latest && docker image push git.4o1x5.dev/4o1x5/producer:latest

2
Cargo.lock generated
View file

@ -737,6 +737,7 @@ dependencies = [
"derive_builder", "derive_builder",
"dotenv", "dotenv",
"env_logger", "env_logger",
"futures",
"futures-lite 2.3.0", "futures-lite 2.3.0",
"lapin", "lapin",
"log", "log",
@ -2201,6 +2202,7 @@ dependencies = [
"derive_builder", "derive_builder",
"dotenv", "dotenv",
"env_logger", "env_logger",
"futures",
"futures-lite 2.3.0", "futures-lite 2.3.0",
"lapin", "lapin",
"log", "log",

View file

@ -20,6 +20,7 @@ dotenv = "0.15.0"
lapin = "2.5.0" lapin = "2.5.0"
uuid = { version = "1.10.0", features = ["v4"] } uuid = { version = "1.10.0", features = ["v4"] }
futures-lite = "2.3.0" futures-lite = "2.3.0"
futures = "0.3.30"
[dependencies.common] [dependencies.common]
path = "../common" path = "../common"

View file

@ -1,62 +1,94 @@
use std::str::from_utf8;
use common::EmailJob; use common::EmailJob;
use futures_lite::stream::StreamExt; use futures::StreamExt;
use lapin::{ use lapin::{options::*, types::FieldTable, BasicProperties, Connection, ConnectionProperties};
options::*, use std::fmt::Display;
publisher_confirm::Confirmation, use std::{convert::TryInto, str::from_utf8};
types::{FieldTable, ShortString},
BasicProperties, Connection, ConnectionProperties, Result,
};
use log::*;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
#[derive(Debug)]
enum Error {
CannotDecodeArg,
MissingReplyTo,
MissingCorrelationId,
}
impl std::error::Error for Error {}
impl Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Error::CannotDecodeArg => write!(f, "Cannot decode argument"),
Error::MissingReplyTo => write!(f, "Missing 'reply to' property"),
Error::MissingCorrelationId => write!(f, "Missing 'correlation id' property"),
}
}
}
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::registry() tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new( .with(tracing_subscriber::EnvFilter::new(
std::env::var("RUST_LOG").unwrap_or_else(|_| "debug".into()), std::env::var("RUST_LOG").unwrap_or_else(|_| "debug".into()),
)) ))
.with(tracing_subscriber::fmt::layer()) .with(tracing_subscriber::fmt::layer())
.init(); .init();
// Open connection. let addr = "amqp://127.0.0.1:5672";
let conn = Connection::connect(addr, ConnectionProperties::default()).await?;
let channel = conn.create_channel().await?;
let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into()); channel
let conn = Connection::connect(&addr, ConnectionProperties::default()) .queue_declare(
.await "rpc_queue",
.unwrap(); QueueDeclareOptions::default(),
FieldTable::default(),
)
.await?;
info!("CONNECTED"); channel.basic_qos(1, BasicQosOptions::default()).await?;
let channel_b = conn.create_channel().await.unwrap();
// declare a consumer let mut consumer = channel
let mut consumer = channel_b
.basic_consume( .basic_consume(
"email", "rpc_queue",
"", "rpc_server",
BasicConsumeOptions::default(), BasicConsumeOptions::default(),
FieldTable::default(), FieldTable::default(),
) )
.await?; .await?;
// watch for incoming messages
while let Some(delivery) = consumer.next().await { while let Some(delivery) = consumer.next().await {
let delivery = delivery.expect("error in consumer"); if let Ok(delivery) = delivery {
let reply = delivery.properties.correlation_id(); log::debug!("Received a request");
log::info!("Reply id: {:?}", reply);
let data = serde_json::from_str::<EmailJob>(from_utf8(&delivery.data).unwrap()).unwrap(); let data =
channel_b serde_json::from_str::<EmailJob>(from_utf8(&delivery.data).unwrap()).unwrap();
.basic_publish(
"", let routing_key = delivery
delivery.properties.reply_to().clone().unwrap().as_str(), .properties
BasicPublishOptions::default(), .reply_to()
format!("Sent email to {}", data.to()).as_bytes(), .as_ref()
BasicProperties::default(), .ok_or(Error::MissingReplyTo)?
) .as_str();
.await
.unwrap(); let correlation_id = delivery
delivery.ack(BasicAckOptions::default()).await.unwrap(); .properties
info!("Received emailjob {:?} and replied back", data); .correlation_id()
.clone()
.ok_or(Error::MissingCorrelationId)?;
channel
.basic_publish(
"",
routing_key,
BasicPublishOptions::default(),
&format!("Sent email to {}", data.to()).as_bytes(),
BasicProperties::default().with_correlation_id(correlation_id),
)
.await?;
channel
.basic_ack(delivery.delivery_tag, BasicAckOptions::default())
.await?;
}
} }
Ok(()) Ok(())

View file

@ -20,6 +20,7 @@ dotenv = "0.15.0"
lapin = "2.5.0" lapin = "2.5.0"
uuid = { version = "1.10.0", features = ["v4"] } uuid = { version = "1.10.0", features = ["v4"] }
futures-lite = "2.3.0" futures-lite = "2.3.0"
futures = "0.3.30"
[dependencies.common] [dependencies.common]
path = "../common" path = "../common"

View file

@ -1,97 +1,133 @@
use std::{str::from_utf8, thread, time::Duration};
use common::EmailJobBuilder; use common::EmailJobBuilder;
use futures_lite::StreamExt; use futures::StreamExt;
use lapin::{ use lapin::{
options::*, options::*, types::FieldTable, types::ShortString, BasicProperties, Channel, Connection,
publisher_confirm::Confirmation, ConnectionProperties, Consumer, Queue,
types::{FieldTable, ShortString},
BasicProperties, Connection, ConnectionProperties, Result,
}; };
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use serde::{de::DeserializeOwned, Serialize};
use std::fmt::Display;
use std::thread;
use std::time::Duration;
use std::{convert::TryInto, str::from_utf8};
use uuid::Uuid; use uuid::Uuid;
#[derive(Debug)]
enum Error {
CannotDecodeReply,
NoReply,
}
impl std::error::Error for Error {}
impl Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Error::CannotDecodeReply => write!(f, "Cannot decode reply"),
Error::NoReply => write!(f, "No reply arrived"),
}
}
}
struct RpcClient {
conn: Connection,
channel: Channel,
callback_queue: Queue,
consumer: Consumer,
correlation_id: ShortString,
}
impl RpcClient {
async fn new() -> Result<Self, lapin::Error> {
let addr = "amqp://127.0.0.1:5672";
let conn = Connection::connect(addr, ConnectionProperties::default()).await?;
let channel = conn.create_channel().await?;
let callback_queue = channel
.queue_declare(
"",
QueueDeclareOptions {
exclusive: true,
..Default::default()
},
FieldTable::default(),
)
.await?;
let consumer = channel
.basic_consume(
callback_queue.name().as_str(),
"rpc_client",
BasicConsumeOptions {
no_ack: true,
..Default::default()
},
FieldTable::default(),
)
.await?;
let correlation_id = Uuid::new_v4().to_string().into();
Ok(Self {
conn,
channel,
callback_queue,
consumer,
correlation_id,
})
}
async fn call<T>(&mut self, data: T) -> Result<String, Box<dyn std::error::Error>>
where
T: Serialize + DeserializeOwned,
{
self.channel
.basic_publish(
"",
"rpc_queue",
BasicPublishOptions::default(),
serde_json::to_string(&data).unwrap().as_bytes(),
BasicProperties::default()
.with_reply_to(self.callback_queue.name().clone())
.with_correlation_id(self.correlation_id.clone()),
)
.await?
.await?;
while let Some(delivery) = self.consumer.next().await {
if let Ok(delivery) = delivery {
if delivery.properties.correlation_id().as_ref() == Some(&self.correlation_id) {
return Ok(from_utf8(&delivery.data).unwrap().to_string());
}
}
}
Err(Box::new(Error::NoReply))
}
async fn close(&self) -> Result<(), lapin::Error> {
self.conn.close(0, "").await
}
}
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
#[tokio::main] #[tokio::main]
async fn main() { async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::registry() tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new( .with(tracing_subscriber::EnvFilter::new(
std::env::var("RUST_LOG").unwrap_or_else(|_| "debug".into()), std::env::var("RUST_LOG").unwrap_or_else(|_| "debug".into()),
)) ))
.with(tracing_subscriber::fmt::layer()) .with(tracing_subscriber::fmt::layer())
.init(); .init();
let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into()); let mut fibonacci_rpc = RpcClient::new().await?;
let conn = Connection::connect(&addr, ConnectionProperties::default()) println!(" [x] Requesting fib(30)");
.await
.unwrap();
let channel_a = conn.create_channel().await.unwrap();
let channel_b = conn.create_channel().await.unwrap();
let email = EmailJobBuilder::default() let email = EmailJobBuilder::default()
.to("someone@example.com".into()) .to("someone@example.com".into())
.content("test".into()) .content("Hello there".into())
.build() .build()
.unwrap(); .unwrap();
let queue = channel_a
.queue_declare(
"email",
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
.unwrap();
let mut consumer = channel_a
.basic_consume(
"email",
"",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.unwrap();
let mut request = Uuid::new_v4();
loop { loop {
let confirm = channel_b let response = fibonacci_rpc.call(email.clone()).await?;
.basic_publish( log::info!("Got back fib: {}", response);
"",
"email",
BasicPublishOptions::default(),
serde_json::to_string(&email).unwrap().as_bytes(),
BasicProperties::default()
.with_correlation_id(request.to_string().into())
.with_reply_to(consumer.queue()),
)
.await
.unwrap()
.await
.unwrap();
while let Some(delivery) = consumer.next().await {
if let Ok(delivery) = delivery {
if delivery
.properties
.correlation_id()
.as_ref()
.unwrap_or(&ShortString::from("asd"))
.eq(&ShortString::from(request.to_string()))
{
log::info!(
"Received data from consumer: {:?}",
from_utf8(&delivery.data)
);
break;
} else {
log::info!(
"Received data from consumer: {:?}",
from_utf8(&delivery.data)
);
log::error!("Received invalid delivery");
break;
}
}
}
request = Uuid::new_v4();
log::info!("Sent request");
thread::sleep(Duration::from_secs(1)) thread::sleep(Duration::from_secs(1))
} }
} }