Grove/Transport/
IPCTransport.rs1#![allow(non_snake_case, non_camel_case_types, non_upper_case_globals)]
2use std::{
8 path::{Path, PathBuf},
9 sync::Arc,
10};
11
12use async_trait::async_trait;
13use tokio::sync::RwLock;
14
15use crate::{
16 Transport::{
17 Strategy::{TransportStats, TransportStrategy, TransportType},
18 TransportConfig,
19 },
20 dev_log,
21};
22
23#[derive(Clone, Debug)]
25pub struct IPCTransport {
26 SocketPath:Option<PathBuf>,
28
29 #[allow(dead_code)]
31 PipeName:Option<String>,
32
33 #[allow(dead_code)]
35 Configuration:TransportConfig,
36
37 Connected:Arc<RwLock<bool>>,
39
40 Statistics:Arc<RwLock<TransportStats>>,
42}
43
44impl IPCTransport {
45 pub fn New() -> anyhow::Result<Self> {
47 #[cfg(unix)]
48 {
49 let SocketPath = Self::DefaultSocketPath();
50
51 Ok(Self {
52 SocketPath:Some(SocketPath),
53 PipeName:None,
54 Configuration:TransportConfig::default(),
55 Connected:Arc::new(RwLock::new(false)),
56 Statistics:Arc::new(RwLock::new(TransportStats::default())),
57 })
58 }
59
60 #[cfg(windows)]
61 {
62 Ok(Self {
63 SocketPath:None,
64 PipeName:Some(r"\\.\pipe\grove-ipc".to_string()),
65 Configuration:TransportConfig::default(),
66 Connected:Arc::new(RwLock::new(false)),
67 Statistics:Arc::new(RwLock::new(TransportStats::default())),
68 })
69 }
70
71 #[cfg(not(any(unix, windows)))]
72 {
73 Err(anyhow::anyhow!("IPC transport not supported on this platform"))
74 }
75 }
76
77 pub fn WithSocketPath<P:AsRef<Path>>(SocketPath:P) -> anyhow::Result<Self> {
79 #[cfg(unix)]
80 {
81 Ok(Self {
82 SocketPath:Some(SocketPath.as_ref().to_path_buf()),
83 PipeName:None,
84 Configuration:TransportConfig::default(),
85 Connected:Arc::new(RwLock::new(false)),
86 Statistics:Arc::new(RwLock::new(TransportStats::default())),
87 })
88 }
89
90 #[cfg(not(unix))]
91 {
92 Err(anyhow::anyhow!("Unix sockets not supported on this platform"))
93 }
94 }
95
96 #[cfg(unix)]
98 fn DefaultSocketPath() -> PathBuf {
99 let mut Path = std::env::temp_dir();
100
101 Path.push("grove-ipc.sock");
102
103 Path
104 }
105
106 #[cfg(unix)]
108 pub fn GetSocketPath(&self) -> Option<&Path> { self.SocketPath.as_deref() }
109
110 pub async fn GetStatistics(&self) -> TransportStats { self.Statistics.read().await.clone() }
112
113 #[cfg(unix)]
115 async fn CleanupSocket(&self) -> anyhow::Result<()> {
116 if let Some(SocketPath) = &self.SocketPath {
117 if SocketPath.exists() {
118 tokio::fs::remove_file(SocketPath)
119 .await
120 .map_err(|E| anyhow::anyhow!("Failed to remove socket: {}", E))?;
121
122 dev_log!("transport", "Removed existing socket: {:?}", SocketPath);
123 }
124 }
125
126 Ok(())
127 }
128}
129
130#[async_trait]
131impl TransportStrategy for IPCTransport {
132 type Error = IPCTransportError;
133
134 async fn connect(&self) -> Result<(), Self::Error> {
135 dev_log!("transport", "Connecting to IPC transport");
136
137 #[cfg(unix)]
138 {
139 self.CleanupSocket()
140 .await
141 .map_err(|E| IPCTransportError::ConnectionFailed(E.to_string()))?;
142
143 *self.Connected.write().await = true;
144 dev_log!("transport", "IPC connection established: {:?}", self.SocketPath);
145 }
146
147 #[cfg(windows)]
148 {
149 *self.Connected.write().await = true;
150 dev_log!("transport", "IPC connection established via named pipe");
151 }
152
153 #[cfg(not(any(unix, windows)))]
154 {
155 return Err(IPCTransportError::NotSupported);
156 }
157
158 Ok(())
159 }
160
161 async fn send(&self, request:&[u8]) -> Result<Vec<u8>, Self::Error> {
162 if !self.is_connected() {
163 return Err(IPCTransportError::NotConnected);
164 }
165
166 dev_log!("transport", "Sending IPC request ({} bytes)", request.len());
167
168 let Response:Vec<u8> = vec![];
169
170 let mut Stats = self.Statistics.write().await;
171
172 Stats.record_sent(request.len() as u64, 0);
173
174 Stats.record_received(Response.len() as u64);
175
176 Ok(Response)
177 }
178
179 async fn send_no_response(&self, data:&[u8]) -> Result<(), Self::Error> {
180 if !self.is_connected() {
181 return Err(IPCTransportError::NotConnected);
182 }
183
184 dev_log!("transport", "Sending IPC notification ({} bytes)", data.len());
185
186 let mut Stats = self.Statistics.write().await;
187
188 Stats.record_sent(data.len() as u64, 0);
189
190 Ok(())
191 }
192
193 async fn close(&self) -> Result<(), Self::Error> {
194 dev_log!("transport", "Closing IPC connection");
195
196 *self.Connected.write().await = false;
197
198 #[cfg(unix)]
199 {
200 if let Some(SocketPath) = &self.SocketPath {
201 if SocketPath.exists() {
202 tokio::fs::remove_file(SocketPath).await.map_err(|E| {
203 dev_log!("transport", "warn: failed to remove socket: {}", E);
204 IPCTransportError::CleanupFailed(E.to_string())
205 })?;
206 }
207 }
208 }
209
210 dev_log!("transport", "IPC connection closed");
211
212 Ok(())
213 }
214
215 fn is_connected(&self) -> bool { *self.Connected.blocking_read() }
216
217 fn transport_type(&self) -> TransportType { TransportType::IPC }
218}
219
220#[derive(Debug, thiserror::Error)]
222pub enum IPCTransportError {
223 #[error("Connection failed: {0}")]
225 ConnectionFailed(String),
226
227 #[error("Send failed: {0}")]
229 SendFailed(String),
230
231 #[error("Receive failed: {0}")]
233 ReceiveFailed(String),
234
235 #[error("Not connected")]
237 NotConnected,
238
239 #[error("IPC not supported on this platform")]
241 NotSupported,
242
243 #[error("Cleanup failed: {0}")]
245 CleanupFailed(String),
246
247 #[error("Socket error: {0}")]
249 SocketError(String),
250
251 #[error("Timeout")]
253 Timeout,
254}
255
256#[cfg(test)]
257mod tests {
258
259 use super::*;
260
261 #[test]
262 fn TestIPCTransportCreation() {
263 #[cfg(any(unix, windows))]
264 {
265 let Result = IPCTransport::New();
266
267 assert!(Result.is_ok());
268 }
269 }
270
271 #[cfg(unix)]
272 #[test]
273 fn TestIPCTransportWithSocketPath() {
274 let Result = IPCTransport::WithSocketPath(Path::new("/tmp/test.sock"));
275
276 assert!(Result.is_ok());
277
278 let Transport = Result.unwrap();
279
280 assert_eq!(Transport.GetSocketPath(), Some(Path::new("/tmp/test.sock")));
281 }
282
283 #[tokio::test]
284 async fn TestIPCTransportNotConnected() {
285 #[cfg(any(unix, windows))]
286 {
287 let Transport = IPCTransport::New().unwrap();
288
289 assert!(!Transport.is_connected());
290 }
291 }
292}