diff --git a/src/client.rs b/src/client.rs index 8e52d8d..a2134eb 100644 --- a/src/client.rs +++ b/src/client.rs @@ -49,7 +49,7 @@ impl Client { &self, data: T, options: BasicCallOptions, - ) -> Result, RpcClientError> + ) -> Result, ClientError> where T: Serialize + DeserializeOwned, { @@ -128,7 +128,7 @@ impl Client { match consumer.next().await { None => { tracing::error!("Received empty data after {:?}", now.elapsed()); - return Err(RpcClientError::NoReply); + return Err(ClientError::InvalidResponse); } Some(del) => match del { Err(error) => { @@ -138,7 +138,7 @@ impl Client { now.elapsed() ); // Idk if i should nack it? - return Err(RpcClientError::FailedDecode); + return Err(ClientError::FailedDecode); } Ok(del) => { tracing::debug!("Received response after {:?}", now.elapsed()); @@ -153,7 +153,7 @@ impl Client { Some(dur) => match timeout(dur, listen).await { Err(elapsed) => { tracing::warn!("RPC job has reached timeout after: {}", elapsed); - return Err(RpcClientError::TimeoutReached); + return Err(ClientError::TimeoutReached); } Ok(r) => match r { Err(error) => return Err(error), @@ -169,17 +169,17 @@ impl Client { tracing::error!( "Got a response with no headers, this might be an issue with version mismatch" ); - return Err(RpcClientError::InvalidResponse); + return Err(ClientError::InvalidResponse); } Some(headers) => match headers.inner().get("outcome") { None => { tracing::error!("Got a response with no outcome header"); - return Err(RpcClientError::InvalidResponse); + return Err(ClientError::InvalidResponse); } Some(res) => match res.as_long_string() { None => { tracing::error!("Got a response with no headers"); - return Err(RpcClientError::InvalidResponse); + return Err(ClientError::InvalidResponse); } Some(outcome) => { match serde_json::from_str::(outcome.to_string().as_str()) { @@ -189,7 +189,7 @@ impl Client { } Err(_) => { tracing::warn!("Received a result header but it's not a type that can be deserailized "); - return Err(RpcClientError::InvalidResponse); + return Err(ClientError::InvalidResponse); } } } @@ -202,35 +202,36 @@ impl Client { Ok(r) => r, Err(error) => { tracing::error!("Failed to decode response message to utf8 {error}"); - return Err(RpcClientError::FailedDecode); + return Err(ClientError::FailedDecode); } }; let _ = channel.close(0, "byebye").await; + // ack message let _ = del.ack(BasicAckOptions::default()).await; + match result_type { ResultHeader::Error => match serde_json::from_str::(utf8) { // get result header Err(_) => { tracing::error!("Failed to decode response message to E"); - return Err(RpcClientError::FailedDecode); + return Err(ClientError::FailedDecode); } Ok(res) => return Ok(Err(res)), }, - ResultHeader::Panic => return Err(RpcClientError::ServerPanicked), + ResultHeader::Panic => return Err(ClientError::ServerPanicked), ResultHeader::Ok => // get result { match serde_json::from_str::(utf8) { Err(_) => { tracing::error!("Failed to decode response message to R"); - return Err(RpcClientError::FailedDecode); + return Err(ClientError::FailedDecode); } Ok(res) => return Ok(Ok(res)), } } } - // ack message } /// Sends a basic Task to the queue @@ -306,27 +307,13 @@ impl BasicCallOptions { /// An error that the client returns #[derive(Debug)] -pub enum RpcClientError { - NoReply, +pub enum ClientError { FailedDecode, FailedToSend, InvalidResponse, ServerPanicked, TimeoutReached, } -/// An error for normal calls -#[derive(Debug)] -pub enum ClientError { - FailedToSend, -} - -impl Display for ClientError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::FailedToSend => write!(f, "Failed to send to queue"), - } - } -} /// A Client-side trait that needs to be implemented for a type in order for the client to know return types. ///