File size: 2,831 Bytes
9552aa0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
use std::sync::Arc;

use interprocess::unnamed_pipe::Sender;
use pyo3::prelude::*;
use tokio::sync::Mutex;

use crate::ferron_util::preforked_process_pool::write_ipc_message;
use crate::ferron_util::wsgid_message_structs::ProcessPoolToServerMessage;

#[pyclass]
pub struct WsgidErrorStream {
  ipc_tx: Arc<Mutex<Sender>>,
}

impl WsgidErrorStream {
  pub fn new(ipc_tx: Arc<Mutex<Sender>>) -> Self {
    Self { ipc_tx }
  }
}

#[pymethods]
impl WsgidErrorStream {
  fn write(&self, data: &str) -> PyResult<usize> {
    write_ipc_message(
      &mut self.ipc_tx.blocking_lock(),
      &postcard::to_allocvec::<ProcessPoolToServerMessage>(&ProcessPoolToServerMessage {
        application_id: None,
        status_code: None,
        headers: None,
        body_chunk: None,
        error_log_line: Some(data.to_string()),
        error_message: None,
        requests_body_chunk: false,
      })
      .map_err(|e| anyhow::anyhow!(e.to_string()))?,
    )?;
    Ok(data.len())
  }

  fn writelines(&self, lines: Vec<String>) -> PyResult<()> {
    for line in lines {
      // Each `write_ipc_message` call prints a separate line
      write_ipc_message(
        &mut self.ipc_tx.blocking_lock(),
        &postcard::to_allocvec::<ProcessPoolToServerMessage>(&ProcessPoolToServerMessage {
          application_id: None,
          status_code: None,
          headers: None,
          body_chunk: None,
          error_log_line: Some(line),
          error_message: None,
          requests_body_chunk: false,
        })
        .map_err(|e| anyhow::anyhow!(e.to_string()))?,
      )?;
    }
    Ok(())
  }

  fn flush(&self) -> PyResult<()> {
    // This is a no-op function
    Ok(())
  }
}

#[cfg(test)]
mod tests {
  use super::*;
  use crate::ferron_util::preforked_process_pool::read_ipc_message;

  #[test]
  fn test_write_sends_correct_data() {
    let (tx, mut rx) = interprocess::unnamed_pipe::pipe().unwrap();
    let stream = WsgidErrorStream::new(Arc::new(Mutex::new(tx)));
    let input = "error log line";
    let len = stream.write(input).unwrap();
    assert_eq!(len, input.len());

    let received = read_ipc_message(&mut rx).unwrap();
    let msg: ProcessPoolToServerMessage = postcard::from_bytes(&received).unwrap();
    assert_eq!(msg.error_log_line, Some(input.to_string()));
  }

  #[test]
  fn test_writelines_sends_each_line() {
    let (tx, mut rx) = interprocess::unnamed_pipe::pipe().unwrap();
    let stream = WsgidErrorStream::new(Arc::new(Mutex::new(tx)));
    let lines = vec!["line one".into(), "line two".into()];
    stream.writelines(lines.clone()).unwrap();

    for line in lines {
      let received = read_ipc_message(&mut rx).unwrap();
      let msg: ProcessPoolToServerMessage = postcard::from_bytes(&received).unwrap();
      assert_eq!(msg.error_log_line, Some(line.clone()));
    }
  }
}