use std::sync::Arc; use interprocess::unnamed_pipe::Sender; use pyo3::prelude::*; use tokio::sync::Mutex; use crate::ferron_util::preforked_process_pool::write_ipc_message; use crate::ferron_util::wsgid_message_structs::ProcessPoolToServerMessage; #[pyclass] pub struct WsgidErrorStream { ipc_tx: Arc>, } impl WsgidErrorStream { pub fn new(ipc_tx: Arc>) -> Self { Self { ipc_tx } } } #[pymethods] impl WsgidErrorStream { fn write(&self, data: &str) -> PyResult { write_ipc_message( &mut self.ipc_tx.blocking_lock(), &postcard::to_allocvec::(&ProcessPoolToServerMessage { application_id: None, status_code: None, headers: None, body_chunk: None, error_log_line: Some(data.to_string()), error_message: None, requests_body_chunk: false, }) .map_err(|e| anyhow::anyhow!(e.to_string()))?, )?; Ok(data.len()) } fn writelines(&self, lines: Vec) -> PyResult<()> { for line in lines { // Each `write_ipc_message` call prints a separate line write_ipc_message( &mut self.ipc_tx.blocking_lock(), &postcard::to_allocvec::(&ProcessPoolToServerMessage { application_id: None, status_code: None, headers: None, body_chunk: None, error_log_line: Some(line), error_message: None, requests_body_chunk: false, }) .map_err(|e| anyhow::anyhow!(e.to_string()))?, )?; } Ok(()) } fn flush(&self) -> PyResult<()> { // This is a no-op function Ok(()) } } #[cfg(test)] mod tests { use super::*; use crate::ferron_util::preforked_process_pool::read_ipc_message; #[test] fn test_write_sends_correct_data() { let (tx, mut rx) = interprocess::unnamed_pipe::pipe().unwrap(); let stream = WsgidErrorStream::new(Arc::new(Mutex::new(tx))); let input = "error log line"; let len = stream.write(input).unwrap(); assert_eq!(len, input.len()); let received = read_ipc_message(&mut rx).unwrap(); let msg: ProcessPoolToServerMessage = postcard::from_bytes(&received).unwrap(); assert_eq!(msg.error_log_line, Some(input.to_string())); } #[test] fn test_writelines_sends_each_line() { let (tx, mut rx) = interprocess::unnamed_pipe::pipe().unwrap(); let stream = WsgidErrorStream::new(Arc::new(Mutex::new(tx))); let lines = vec!["line one".into(), "line two".into()]; stream.writelines(lines.clone()).unwrap(); for line in lines { let received = read_ipc_message(&mut rx).unwrap(); let msg: ProcessPoolToServerMessage = postcard::from_bytes(&received).unwrap(); assert_eq!(msg.error_log_line, Some(line.clone())); } } }