This commit is contained in:
Barna Máté 2024-08-24 13:33:11 +02:00
commit 8045480513
17 changed files with 4127 additions and 0 deletions

24
.config/hakari.toml Normal file
View file

@ -0,0 +1,24 @@
# This file contains settings for `cargo hakari`.
# See https://docs.rs/cargo-hakari/latest/cargo_hakari/config for a full list of options.
hakari-package = "my-workspace-hack"
# Format version for hakari's output. Version 4 requires cargo-hakari 0.9.22 or above.
dep-format-version = "4"
# Setting workspace.resolver = "2" in the root Cargo.toml is HIGHLY recommended.
# Hakari works much better with the new feature resolver.
# For more about the new feature resolver, see:
# https://blog.rust-lang.org/2021/03/25/Rust-1.51.0.html#cargos-new-feature-resolver
resolver = "2"
# Add triples corresponding to platforms commonly used by developers here.
# https://doc.rust-lang.org/rustc/platform-support.html
platforms = [
# "x86_64-unknown-linux-gnu",
# "x86_64-apple-darwin",
# "x86_64-pc-windows-msvc",
]
# Write out exact versions rather than a semver range. (Defaults to false.)
# exact-versions = true

2
.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
/target
result*

3551
Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

12
Cargo.toml Normal file
View file

@ -0,0 +1,12 @@
[workspace]
resolver = "2"
members = [ "common", "consumer",
"producer",
]
[workspace.package]
version = "0.1.0"
edition = "2021"
[workspace.metadata.crane]
name = "producer-consumer"

1
common/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
/target

10
common/Cargo.toml Normal file
View file

@ -0,0 +1,10 @@
[package]
name = "common"
version.workspace = true
edition.workspace = true
[dependencies]
serde = {version="1.0.203", features=["derive"]}
serde_json = "1.0.117"
derive-getters = "0.4.0"
derive_builder = "0.20.0"

15
common/src/lib.rs Normal file
View file

@ -0,0 +1,15 @@
use derive_builder::Builder;
use derive_getters::Getters;
use serde::{Deserialize, Serialize};
#[derive(Builder, Getters, Serialize, Deserialize, Debug, Clone)]
pub struct EmailJob {
to: String,
content: String,
}
#[derive(Builder, Getters, Serialize, Deserialize, Debug, Clone)]
pub struct SMSJob {
number: String,
content: String,
}

1
consumer/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
/target

26
consumer/Cargo.toml Normal file
View file

@ -0,0 +1,26 @@
[package]
name = "consumer"
version.workspace = true
edition.workspace = true
[dependencies]
amiquip = "0.4.2"
serde = {version="1.0.203", features=["derive"]}
serde_json = "1.0.117"
tokio = { version = "1", features = ["full"] }
tokio-util = "0.7.11"
apalis = { version = "0.6.0-rc.3", features = ["layers", "sentry", ] }
apalis-redis = { version = "0.6.0-rc.3" }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
env_logger = "0.11.3"
log = "0.4.21"
derive-getters = "0.4.0"
derive_builder = "0.20.0"
dotenv = "0.15.0"
lapin = "2.5.0"
uuid = { version = "1.10.0", features = ["v4"] }
futures-lite = "2.3.0"
[dependencies.common]
path = "../common"

73
consumer/src/main.rs Normal file
View file

@ -0,0 +1,73 @@
// Port of https://www.rabbitmq.com/tutorials/tutorial-six-python.html. Start this
// example in one shell, then the rpc_client example in another.
use amiquip::{
AmqpProperties, Connection, ConsumerMessage, ConsumerOptions, Exchange, Publish,
QueueDeclareOptions, Result,
};
fn fib(n: u64) -> u64 {
match n {
0 => 0,
1 => 1,
n => fib(n - 1) + fib(n - 2),
}
}
fn main() -> Result<()> {
env_logger::init();
// Open connection.
let mut connection = Connection::insecure_open("amqp://guest:guest@localhost:5672")?;
// Open a channel - None says let the library choose the channel ID.
let channel = connection.open_channel(None)?;
// Get a handle to the default direct exchange.
let exchange = Exchange::direct(&channel);
// Declare the queue that will receive RPC requests.
let queue = channel.queue_declare("rpc_queue", QueueDeclareOptions::default())?;
// Start a consumer.
let consumer = queue.consume(ConsumerOptions::default())?;
println!("Awaiting RPC requests");
for (i, message) in consumer.receiver().iter().enumerate() {
match message {
ConsumerMessage::Delivery(delivery) => {
let body = String::from_utf8_lossy(&delivery.body);
println!("({:>3}) fib({})", i, body);
let (reply_to, corr_id) = match (
delivery.properties.reply_to(),
delivery.properties.correlation_id(),
) {
(Some(r), Some(c)) => (r.clone(), c.clone()),
_ => {
println!("received delivery without reply_to or correlation_id");
consumer.ack(delivery)?;
continue;
}
};
let response = match body.parse() {
Ok(n) => format!("{}", fib(n)),
Err(_) => "invalid input".to_string(),
};
exchange.publish(Publish::with_properties(
response.as_bytes(),
reply_to,
AmqpProperties::default().with_correlation_id(corr_id),
))?;
consumer.ack(delivery)?;
}
other => {
println!("Consumer ended: {:?}", other);
break;
}
}
}
connection.close()
}

11
deny.toml Normal file
View file

@ -0,0 +1,11 @@
[bans]
multiple-versions = 'allow'
[licenses]
private = { ignore = true }
allow = [
"Apache-2.0",
"BSD-3-Clause",
"MIT",
"Unicode-DFS-2016",
]

7
docker-compose.yml Normal file
View file

@ -0,0 +1,7 @@
services:
rabbitmq:
image: rabbitmq:3-management-alpine
container_name: "rabbitmq"
ports:
- 5672:5672
- 15672:15672

121
flake.lock Normal file
View file

@ -0,0 +1,121 @@
{
"nodes": {
"advisory-db": {
"flake": false,
"locked": {
"lastModified": 1724421788,
"narHash": "sha256-kKWEQL4x6FFhDc7xRtoAlgB57DjigoqBerO2PG/IgS4=",
"owner": "rustsec",
"repo": "advisory-db",
"rev": "1bc15cb78d189f47729ae15bf58f62ad91b45b9c",
"type": "github"
},
"original": {
"owner": "rustsec",
"repo": "advisory-db",
"type": "github"
}
},
"crane": {
"inputs": {
"nixpkgs": [
"nixpkgs"
]
},
"locked": {
"lastModified": 1724377159,
"narHash": "sha256-ixjje1JO8ucKT41hs6n2NCde1Vc0+Zc2p2gUbJpCsMw=",
"owner": "ipetkov",
"repo": "crane",
"rev": "3e47b7a86c19142bd3675da49d6acef488b4dac1",
"type": "github"
},
"original": {
"owner": "ipetkov",
"repo": "crane",
"type": "github"
}
},
"fenix": {
"inputs": {
"nixpkgs": [
"nixpkgs"
],
"rust-analyzer-src": []
},
"locked": {
"lastModified": 1724394478,
"narHash": "sha256-JSiv2uwI4UJo/4pxt4255BfzWCiEberJmBC6pCxnzas=",
"owner": "nix-community",
"repo": "fenix",
"rev": "2e7bcdc4ef73aa56abd2f970e5a8b9c0cc87e614",
"type": "github"
},
"original": {
"owner": "nix-community",
"repo": "fenix",
"type": "github"
}
},
"flake-utils": {
"inputs": {
"systems": "systems"
},
"locked": {
"lastModified": 1710146030,
"narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1724363052,
"narHash": "sha256-Nf/iQWamRVAwAPFccQMfm5Qcf+rLLnU1rWG3f9orDVE=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "5de1564aed415bf9d0f281461babc2d101dd49ff",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixpkgs-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"advisory-db": "advisory-db",
"crane": "crane",
"fenix": "fenix",
"flake-utils": "flake-utils",
"nixpkgs": "nixpkgs"
}
},
"systems": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

192
flake.nix Normal file
View file

@ -0,0 +1,192 @@
{
description = "Producer and consumer in rust using rabbitmq";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable";
crane = {
url = "github:ipetkov/crane";
inputs.nixpkgs.follows = "nixpkgs";
};
fenix = {
url = "github:nix-community/fenix";
inputs.nixpkgs.follows = "nixpkgs";
inputs.rust-analyzer-src.follows = "";
};
flake-utils.url = "github:numtide/flake-utils";
advisory-db = {
url = "github:rustsec/advisory-db";
flake = false;
};
};
outputs = { self, nixpkgs, crane, fenix, flake-utils, advisory-db, ... }:
flake-utils.lib.eachDefaultSystem (system:
let
pkgs = nixpkgs.legacyPackages.${system};
inherit (pkgs) lib;
craneLib = crane.mkLib pkgs;
src = craneLib.cleanCargoSource ./.;
# Common arguments can be set here to avoid repeating them later
commonArgs = {
inherit src;
strictDeps = true;
buildInputs = [
# Add additional build inputs here
] ++ lib.optionals pkgs.stdenv.isDarwin [
# Additional darwin specific inputs can be set here
pkgs.libiconv
];
# Additional environment variables can be set directly
# MY_CUSTOM_VAR = "some value";
};
craneLibLLvmTools = craneLib.overrideToolchain
(fenix.packages.${system}.complete.withComponents [
"cargo"
"llvm-tools"
"rustc"
]);
# Build *just* the cargo dependencies (of the entire workspace),
# so we can reuse all of that work (e.g. via cachix) when running in CI
# It is *highly* recommended to use something like cargo-hakari to avoid
# cache misses when building individual top-level-crates
cargoArtifacts = craneLib.buildDepsOnly commonArgs;
individualCrateArgs = commonArgs // {
inherit cargoArtifacts;
inherit (craneLib.crateNameFromCargoToml { inherit src; }) version;
# NB: we disable tests since we'll run them all via cargo-nextest
doCheck = false;
};
fileSetForCrate = crate: lib.fileset.toSource {
root = ./.;
fileset = lib.fileset.unions [
./Cargo.toml
./Cargo.lock
./producer
./consumer
crate
];
};
# Build the top-level crates of the workspace as individual derivations.
# This allows consumers to only depend on (and build) only what they need.
# Though it is possible to build the entire workspace as a single derivation,
# so this is left up to you on how to organize things
producer = craneLib.buildPackage (individualCrateArgs // {
pname = "producer";
cargoExtraArgs = "-p producer";
src = fileSetForCrate ./producer;
});
consumer = craneLib.buildPackage (individualCrateArgs // {
pname = "consumer";
cargoExtraArgs = "-p consumer";
src = fileSetForCrate ./consumer;
});
in
{
checks = {
# Build the crates as part of `nix flake check` for convenience
inherit producer consumer;
# Run clippy (and deny all warnings) on the workspace source,
# again, reusing the dependency artifacts from above.
#
# Note that this is done as a separate derivation so that
# we can block the CI if there are issues here, but not
# prevent downstream consumers from building our crate by itself.
producer-consumer-clippy = craneLib.cargoClippy (commonArgs // {
inherit cargoArtifacts;
cargoClippyExtraArgs = "--all-targets -- --deny warnings";
});
producer-consumer-doc = craneLib.cargoDoc (commonArgs // {
inherit cargoArtifacts;
});
# Check formatting
producer-consumer-fmt = craneLib.cargoFmt {
inherit src;
};
# Audit dependencies
producer-consumer-audit = craneLib.cargoAudit {
inherit src advisory-db;
};
# Audit licenses
producer-consumer-deny = craneLib.cargoDeny {
inherit src;
};
# Run tests with cargo-nextest
# Consider setting `doCheck = false` on other crate derivations
# if you do not want the tests to run twice
producer-consumer-nextest = craneLib.cargoNextest (commonArgs // {
inherit cargoArtifacts;
partitions = 1;
partitionType = "count";
});
# Ensure that cargo-hakari is up to date
producer-consumer-hakari = craneLib.mkCargoDerivation {
inherit src;
pname = "producer-consumer-hakari";
cargoArtifacts = null;
doInstallCargoArtifacts = false;
buildPhaseCargoCommand = ''
cargo hakari generate --diff # workspace-hack Cargo.toml is up-to-date
cargo hakari manage-deps --dry-run # all workspace crates depend on workspace-hack
cargo hakari verify
'';
nativeBuildInputs = [
pkgs.cargo-hakari
];
};
};
packages = {
inherit consumer producer;
} // lib.optionalAttrs (!pkgs.stdenv.isDarwin) {
producer-consumer-llvm-coverage = craneLibLLvmTools.cargoLlvmCov (commonArgs // {
inherit cargoArtifacts;
});
};
apps = {
producer = flake-utils.lib.mkApp {
drv = producer;
};
consumer = flake-utils.lib.mkApp {
drv = consumer;
};
};
devShells.default = craneLib.devShell {
# Inherit inputs from checks.
checks = self.checks.${system};
LIBCLANG_PATH = "${pkgs.llvmPackages_17.libclang.lib}/lib";
RUST_SRC_PATH = "${pkgs.rust.packages.stable.rustPlatform.rustLibSrc}";
packages = with pkgs; [
openssl
pkg-config
libiconv
cargo-hakari
cargo-nextest # testing
];
};
});
}

1
producer/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
/target

27
producer/Cargo.toml Normal file
View file

@ -0,0 +1,27 @@
[package]
name = "producer"
version.workspace = true
edition.workspace = true
[dependencies]
amiquip = "0.4.2"
serde = {version="1.0.203", features=["derive"]}
serde_json = "1.0.117"
tokio = { version = "1", features = ["full"] }
tokio-util = "0.7.11"
apalis = { version = "0.6.0-rc.3", features = ["layers", "sentry", ] }
apalis-redis = { version = "0.6.0-rc.3" }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
env_logger = "0.11.3"
log = "0.4.21"
derive-getters = "0.4.0"
derive_builder = "0.20.0"
dotenv = "0.15.0"
lapin = "2.5.0"
uuid = { version = "1.10.0", features = ["v4"] }
futures-lite = "2.3.0"
[dependencies.common]
path = "../common"

53
producer/src/main.rs Normal file
View file

@ -0,0 +1,53 @@
use std::{thread, time::Duration};
use common::EmailJobBuilder;
use lapin::{
options::*, publisher_confirm::Confirmation, types::FieldTable, BasicProperties, Connection,
ConnectionProperties, Result,
};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use uuid::Uuid;
#[tokio::main]
async fn main() {
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new(
std::env::var("RUST_LOG").unwrap_or_else(|_| "debug".into()),
))
.with(tracing_subscriber::fmt::layer())
.init();
let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
let conn = Connection::connect(&addr, ConnectionProperties::default())
.await
.unwrap();
let channel_a = conn.create_channel().await.unwrap();
let email = EmailJobBuilder::default()
.to("someone@example.com".into())
.content("test".into())
.build()
.unwrap();
loop {
let request = Uuid::new_v4();
let confirm = channel_a
.basic_publish(
"",
"email",
BasicPublishOptions::default(),
serde_json::to_string(&email).unwrap().as_bytes(),
BasicProperties::default().with_correlation_id(request.to_string().into()),
)
.await
.unwrap()
.await
.unwrap();
log::info!(
"Got response for :{} \n {:?}",
request,
confirm.take_message()
);
thread::sleep(Duration::from_millis(250))
}
}