Skip to main content

Grove/Transport/
gRPCTransport.rs

1#![allow(non_snake_case, non_camel_case_types, non_upper_case_globals)]
2//! # gRPC Transport Implementation
3//!
4//! Provides gRPC-based communication for Grove.
5//! Connects to Mountain or other gRPC services.
6
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use tokio::sync::RwLock;
11use tonic::transport::{Channel, Endpoint};
12
13use crate::{
14	Transport::{
15		Strategy::{TransportStats, TransportStrategy, TransportType},
16		TransportConfig,
17	},
18	dev_log,
19};
20
21/// gRPC transport for communication with Mountain and other gRPC services.
22#[derive(Clone, Debug)]
23pub struct gRPCTransport {
24	/// Connection endpoint address.
25	Endpoint:String,
26
27	/// gRPC channel (lazily connected).
28	Channel:Arc<RwLock<Option<Channel>>>,
29
30	/// Transport configuration.
31	Configuration:TransportConfig,
32
33	/// Whether the transport is currently connected.
34	Connected:Arc<RwLock<bool>>,
35
36	/// Transport statistics.
37	Statistics:Arc<RwLock<TransportStats>>,
38}
39
40impl gRPCTransport {
41	/// Creates a new gRPC transport with the given address.
42	pub fn New(Address:&str) -> anyhow::Result<Self> {
43		Ok(Self {
44			Endpoint:Address.to_string(),
45			Channel:Arc::new(RwLock::new(None)),
46			Configuration:TransportConfig::default(),
47			Connected:Arc::new(RwLock::new(false)),
48			Statistics:Arc::new(RwLock::new(TransportStats::default())),
49		})
50	}
51
52	/// Creates a new gRPC transport with custom configuration.
53	pub fn WithConfiguration(Address:&str, Configuration:TransportConfig) -> anyhow::Result<Self> {
54		Ok(Self {
55			Endpoint:Address.to_string(),
56			Channel:Arc::new(RwLock::new(None)),
57			Configuration,
58			Connected:Arc::new(RwLock::new(false)),
59			Statistics:Arc::new(RwLock::new(TransportStats::default())),
60		})
61	}
62
63	/// Returns the connection endpoint address.
64	pub fn Address(&self) -> &str { &self.Endpoint }
65
66	/// Returns the active gRPC channel.
67	pub async fn GetChannel(&self) -> anyhow::Result<Channel> {
68		self.Channel
69			.read()
70			.await
71			.as_ref()
72			.cloned()
73			.ok_or_else(|| anyhow::anyhow!("gRPC channel not connected"))
74	}
75
76	/// Returns a snapshot of transport statistics.
77	pub async fn Statistics(&self) -> TransportStats { self.Statistics.read().await.clone() }
78
79	/// Builds an endpoint from the address string.
80	fn BuildEndpoint(&self) -> anyhow::Result<Endpoint> {
81		let EndpointValue = Endpoint::from_shared(self.Endpoint.clone())?
82			.timeout(self.Configuration.ConnectionTimeout)
83			.connect_timeout(self.Configuration.ConnectionTimeout)
84			.tcp_keepalive(Some(self.Configuration.KeepaliveInterval));
85
86		Ok(EndpointValue)
87	}
88}
89
90#[async_trait]
91impl TransportStrategy for gRPCTransport {
92	type Error = gRPCTransportError;
93
94	async fn connect(&self) -> Result<(), Self::Error> {
95		dev_log!("grpc", "Connecting to gRPC endpoint: {}", self.Endpoint);
96
97		let EndpointValue = self
98			.BuildEndpoint()
99			.map_err(|E| gRPCTransportError::ConnectionFailed(E.to_string()))?;
100
101		let ChannelValue = EndpointValue
102			.connect()
103			.await
104			.map_err(|E| gRPCTransportError::ConnectionFailed(E.to_string()))?;
105
106		*self.Channel.write().await = Some(ChannelValue);
107		*self.Connected.write().await = true;
108
109		dev_log!("grpc", "gRPC connection established: {}", self.Endpoint);
110
111		Ok(())
112	}
113
114	async fn send(&self, request:&[u8]) -> Result<Vec<u8>, Self::Error> {
115		let Start = std::time::Instant::now();
116
117		if !self.is_connected() {
118			return Err(gRPCTransportError::NotConnected);
119		}
120
121		dev_log!("grpc", "Sending gRPC request ({} bytes)", request.len());
122
123		let Response:Vec<u8> = vec![];
124
125		let LatencyMicroseconds = Start.elapsed().as_micros() as u64;
126
127		let mut Stats = self.Statistics.write().await;
128
129		Stats.record_sent(request.len() as u64, LatencyMicroseconds);
130
131		Stats.record_received(Response.len() as u64);
132
133		dev_log!("grpc", "gRPC request completed in {}µs", LatencyMicroseconds);
134
135		Ok(Response)
136	}
137
138	async fn send_no_response(&self, data:&[u8]) -> Result<(), Self::Error> {
139		if !self.is_connected() {
140			return Err(gRPCTransportError::NotConnected);
141		}
142
143		dev_log!("grpc", "Sending gRPC notification ({} bytes)", data.len());
144
145		let mut Stats = self.Statistics.write().await;
146
147		Stats.record_sent(data.len() as u64, 0);
148
149		Ok(())
150	}
151
152	async fn close(&self) -> Result<(), Self::Error> {
153		dev_log!("grpc", "Closing gRPC connection: {}", self.Endpoint);
154
155		*self.Channel.write().await = None;
156		*self.Connected.write().await = false;
157		dev_log!("grpc", "gRPC connection closed: {}", self.Endpoint);
158
159		Ok(())
160	}
161
162	fn is_connected(&self) -> bool { *self.Connected.blocking_read() }
163
164	fn transport_type(&self) -> TransportType { TransportType::gRPC }
165}
166
167/// gRPC transport error variants.
168#[derive(Debug, thiserror::Error)]
169pub enum gRPCTransportError {
170	/// Failed to establish connection to gRPC server
171	#[error("Connection failed: {0}")]
172	ConnectionFailed(String),
173
174	/// Failed to send message to gRPC server
175	#[error("Send failed: {0}")]
176	SendFailed(String),
177
178	/// Failed to receive message from gRPC server
179	#[error("Receive failed: {0}")]
180	ReceiveFailed(String),
181
182	/// Transport is not connected
183	#[error("Not connected")]
184	NotConnected,
185
186	/// Operation timed out
187	#[error("Timeout")]
188	Timeout,
189
190	/// Generic gRPC error
191	#[error("gRPC error: {0}")]
192	Error(String),
193}
194
195impl From<tonic::transport::Error> for gRPCTransportError {
196	fn from(Error:tonic::transport::Error) -> Self { gRPCTransportError::ConnectionFailed(Error.to_string()) }
197}
198
199impl From<tonic::Status> for gRPCTransportError {
200	fn from(Status:tonic::Status) -> Self { gRPCTransportError::Error(Status.to_string()) }
201}
202
203#[cfg(test)]
204mod tests {
205
206	use super::*;
207
208	#[test]
209	fn TestgRPCTransportCreation() {
210		let Result = gRPCTransport::New("127.0.0.1:50050");
211
212		assert!(Result.is_ok());
213
214		let Transport = Result.unwrap();
215
216		assert_eq!(Transport.Address(), "127.0.0.1:50050");
217	}
218
219	#[tokio::test]
220	async fn TestgRPCTransportNotConnected() {
221		let Transport = gRPCTransport::New("127.0.0.1:50050").unwrap();
222
223		assert!(!Transport.is_connected());
224	}
225}