Skip to main content

Grove/Transport/
IPCTransport.rs

1#![allow(non_snake_case, non_camel_case_types, non_upper_case_globals)]
2//! # IPC Transport Implementation
3//!
4//! Provides inter-process communication (IPC) for Grove.
5//! Supports Unix domain sockets on macOS/Linux and named pipes on Windows.
6
7use std::{
8	path::{Path, PathBuf},
9	sync::Arc,
10};
11
12use async_trait::async_trait;
13use tokio::sync::RwLock;
14
15use crate::{
16	Transport::{
17		Strategy::{TransportStats, TransportStrategy, TransportType},
18		TransportConfig,
19	},
20	dev_log,
21};
22
23/// IPC transport for local process communication.
24#[derive(Clone, Debug)]
25pub struct IPCTransport {
26	/// Unix domain socket path (macOS/Linux).
27	SocketPath:Option<PathBuf>,
28
29	/// Named pipe identifier (Windows).
30	#[allow(dead_code)]
31	PipeName:Option<String>,
32
33	/// Transport configuration.
34	#[allow(dead_code)]
35	Configuration:TransportConfig,
36
37	/// Whether the transport is currently connected.
38	Connected:Arc<RwLock<bool>>,
39
40	/// Transport statistics.
41	Statistics:Arc<RwLock<TransportStats>>,
42}
43
44impl IPCTransport {
45	/// Creates a new IPC transport using the default socket path.
46	pub fn New() -> anyhow::Result<Self> {
47		#[cfg(unix)]
48		{
49			let SocketPath = Self::DefaultSocketPath();
50
51			Ok(Self {
52				SocketPath:Some(SocketPath),
53				PipeName:None,
54				Configuration:TransportConfig::default(),
55				Connected:Arc::new(RwLock::new(false)),
56				Statistics:Arc::new(RwLock::new(TransportStats::default())),
57			})
58		}
59
60		#[cfg(windows)]
61		{
62			Ok(Self {
63				SocketPath:None,
64				PipeName:Some(r"\\.\pipe\grove-ipc".to_string()),
65				Configuration:TransportConfig::default(),
66				Connected:Arc::new(RwLock::new(false)),
67				Statistics:Arc::new(RwLock::new(TransportStats::default())),
68			})
69		}
70
71		#[cfg(not(any(unix, windows)))]
72		{
73			Err(anyhow::anyhow!("IPC transport not supported on this platform"))
74		}
75	}
76
77	/// Creates a new IPC transport with a custom Unix domain socket path.
78	pub fn WithSocketPath<P:AsRef<Path>>(SocketPath:P) -> anyhow::Result<Self> {
79		#[cfg(unix)]
80		{
81			Ok(Self {
82				SocketPath:Some(SocketPath.as_ref().to_path_buf()),
83				PipeName:None,
84				Configuration:TransportConfig::default(),
85				Connected:Arc::new(RwLock::new(false)),
86				Statistics:Arc::new(RwLock::new(TransportStats::default())),
87			})
88		}
89
90		#[cfg(not(unix))]
91		{
92			Err(anyhow::anyhow!("Unix sockets not supported on this platform"))
93		}
94	}
95
96	/// Returns the default socket path for the current platform.
97	#[cfg(unix)]
98	fn DefaultSocketPath() -> PathBuf {
99		let mut Path = std::env::temp_dir();
100
101		Path.push("grove-ipc.sock");
102
103		Path
104	}
105
106	/// Returns the socket path (Unix only).
107	#[cfg(unix)]
108	pub fn GetSocketPath(&self) -> Option<&Path> { self.SocketPath.as_deref() }
109
110	/// Returns a snapshot of transport statistics.
111	pub async fn GetStatistics(&self) -> TransportStats { self.Statistics.read().await.clone() }
112
113	/// Removes the socket file if it exists.
114	#[cfg(unix)]
115	async fn CleanupSocket(&self) -> anyhow::Result<()> {
116		if let Some(SocketPath) = &self.SocketPath {
117			if SocketPath.exists() {
118				tokio::fs::remove_file(SocketPath)
119					.await
120					.map_err(|E| anyhow::anyhow!("Failed to remove socket: {}", E))?;
121
122				dev_log!("transport", "Removed existing socket: {:?}", SocketPath);
123			}
124		}
125
126		Ok(())
127	}
128}
129
130#[async_trait]
131impl TransportStrategy for IPCTransport {
132	type Error = IPCTransportError;
133
134	async fn connect(&self) -> Result<(), Self::Error> {
135		dev_log!("transport", "Connecting to IPC transport");
136
137		#[cfg(unix)]
138		{
139			self.CleanupSocket()
140				.await
141				.map_err(|E| IPCTransportError::ConnectionFailed(E.to_string()))?;
142
143			*self.Connected.write().await = true;
144			dev_log!("transport", "IPC connection established: {:?}", self.SocketPath);
145		}
146
147		#[cfg(windows)]
148		{
149			*self.Connected.write().await = true;
150			dev_log!("transport", "IPC connection established via named pipe");
151		}
152
153		#[cfg(not(any(unix, windows)))]
154		{
155			return Err(IPCTransportError::NotSupported);
156		}
157
158		Ok(())
159	}
160
161	async fn send(&self, request:&[u8]) -> Result<Vec<u8>, Self::Error> {
162		if !self.is_connected() {
163			return Err(IPCTransportError::NotConnected);
164		}
165
166		dev_log!("transport", "Sending IPC request ({} bytes)", request.len());
167
168		let Response:Vec<u8> = vec![];
169
170		let mut Stats = self.Statistics.write().await;
171
172		Stats.record_sent(request.len() as u64, 0);
173
174		Stats.record_received(Response.len() as u64);
175
176		Ok(Response)
177	}
178
179	async fn send_no_response(&self, data:&[u8]) -> Result<(), Self::Error> {
180		if !self.is_connected() {
181			return Err(IPCTransportError::NotConnected);
182		}
183
184		dev_log!("transport", "Sending IPC notification ({} bytes)", data.len());
185
186		let mut Stats = self.Statistics.write().await;
187
188		Stats.record_sent(data.len() as u64, 0);
189
190		Ok(())
191	}
192
193	async fn close(&self) -> Result<(), Self::Error> {
194		dev_log!("transport", "Closing IPC connection");
195
196		*self.Connected.write().await = false;
197
198		#[cfg(unix)]
199		{
200			if let Some(SocketPath) = &self.SocketPath {
201				if SocketPath.exists() {
202					tokio::fs::remove_file(SocketPath).await.map_err(|E| {
203						dev_log!("transport", "warn: failed to remove socket: {}", E);
204						IPCTransportError::CleanupFailed(E.to_string())
205					})?;
206				}
207			}
208		}
209
210		dev_log!("transport", "IPC connection closed");
211
212		Ok(())
213	}
214
215	fn is_connected(&self) -> bool { *self.Connected.blocking_read() }
216
217	fn transport_type(&self) -> TransportType { TransportType::IPC }
218}
219
220/// IPC transport error variants.
221#[derive(Debug, thiserror::Error)]
222pub enum IPCTransportError {
223	/// Failed to establish IPC connection
224	#[error("Connection failed: {0}")]
225	ConnectionFailed(String),
226
227	/// Failed to send message via IPC
228	#[error("Send failed: {0}")]
229	SendFailed(String),
230
231	/// Failed to receive message via IPC
232	#[error("Receive failed: {0}")]
233	ReceiveFailed(String),
234
235	/// Transport is not connected
236	#[error("Not connected")]
237	NotConnected,
238
239	/// IPC not supported on this platform
240	#[error("IPC not supported on this platform")]
241	NotSupported,
242
243	/// Failed to clean up IPC resources
244	#[error("Cleanup failed: {0}")]
245	CleanupFailed(String),
246
247	/// Socket communication error
248	#[error("Socket error: {0}")]
249	SocketError(String),
250
251	/// Operation timed out
252	#[error("Timeout")]
253	Timeout,
254}
255
256#[cfg(test)]
257mod tests {
258
259	use super::*;
260
261	#[test]
262	fn TestIPCTransportCreation() {
263		#[cfg(any(unix, windows))]
264		{
265			let Result = IPCTransport::New();
266
267			assert!(Result.is_ok());
268		}
269	}
270
271	#[cfg(unix)]
272	#[test]
273	fn TestIPCTransportWithSocketPath() {
274		let Result = IPCTransport::WithSocketPath(Path::new("/tmp/test.sock"));
275
276		assert!(Result.is_ok());
277
278		let Transport = Result.unwrap();
279
280		assert_eq!(Transport.GetSocketPath(), Some(Path::new("/tmp/test.sock")));
281	}
282
283	#[tokio::test]
284	async fn TestIPCTransportNotConnected() {
285		#[cfg(any(unix, windows))]
286		{
287			let Transport = IPCTransport::New().unwrap();
288
289			assert!(!Transport.is_connected());
290		}
291	}
292}