#[cfg(not(unix))] compile_error!("This module is supported only on Unix and Unix-like systems."); use std::collections::HashMap; use std::error::Error; use std::io::{BufReader, Read}; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::Arc; use std::thread; use crate::ferron_common::{ ErrorLogger, HyperRequest, HyperUpgraded, RequestData, ResponseData, ServerConfig, ServerModule, ServerModuleHandlers, SocketData, }; use crate::ferron_common::{HyperResponse, WithRuntime}; use crate::ferron_res::server_software::SERVER_SOFTWARE; use crate::ferron_util::ip_match::ip_match; use crate::ferron_util::match_hostname::match_hostname; use crate::ferron_util::match_location::match_location; use crate::ferron_util::preforked_process_pool::{ read_ipc_message, read_ipc_message_async, write_ipc_message, write_ipc_message_async, PreforkedProcessPool, }; use crate::ferron_util::wsgi_load_application::load_wsgi_application; use crate::ferron_util::wsgid_body_reader::WsgidBodyReader; use crate::ferron_util::wsgid_error_stream::WsgidErrorStream; use crate::ferron_util::wsgid_input_stream::WsgidInputStream; use crate::ferron_util::wsgid_message_structs::{ ProcessPoolToServerMessage, ServerToProcessPoolMessage, }; use crate::ferron_util::wsgid_structs::{WsgidApplicationLocationWrap, WsgidApplicationWrap}; use async_trait::async_trait; use futures_util::{StreamExt, TryStreamExt}; use hashlink::LinkedHashMap; use http::{HeaderMap, HeaderName, HeaderValue, StatusCode}; use http_body_util::{BodyExt, Empty, StreamBody}; use hyper::body::{Bytes, Frame}; use hyper::header; use hyper::Response; use hyper_tungstenite::HyperWebsocket; use interprocess::unnamed_pipe::{Recver, Sender}; use pyo3::exceptions::{PyAssertionError, PyException}; use pyo3::prelude::*; use pyo3::types::{PyBool, PyCFunction, PyDict, PyIterator, PyString, PyTuple}; //use postcard::{DeOptions, SerOptions}; use tokio::fs; use tokio::runtime::Handle; use tokio::sync::Mutex; struct ResponseHead { status: u16, headers: Option>>, is_set: bool, is_sent: bool, } impl ResponseHead { fn new() -> Self { Self { status: 200, headers: None, is_set: false, is_sent: false, } } } fn wsgi_pool_fn(tx: Sender, rx: Recver, wsgi_script_path: PathBuf) { let wsgi_application_result: Result, Box> = load_wsgi_application(wsgi_script_path.as_path(), false); let mut body_iterators = HashMap::new(); let mut application_id = 0; let mut wsgi_head = Arc::new(Mutex::new(ResponseHead::new())); let rx_mutex = Arc::new(Mutex::new(rx)); let tx_mutex = Arc::new(Mutex::new(tx)); loop { let received_raw_message = match read_ipc_message(&mut rx_mutex.blocking_lock()) { Ok(message) => message, Err(_) => break, }; let received_message = match postcard::from_bytes::(&received_raw_message) { Ok(message) => message, Err(_) => continue, }; if let Some(error) = (|| -> Result<(), Box> { let wsgi_application = wsgi_application_result .as_ref() .map_err(|x| anyhow::anyhow!(x.to_string()))?; if let Some(environment_variables) = received_message.environment_variables { wsgi_head = Arc::new(Mutex::new(ResponseHead::new())); let wsgi_head_clone = wsgi_head.clone(); let tx_mutex_clone = tx_mutex.clone(); let rx_mutex_clone = rx_mutex.clone(); let body_iterator = Python::with_gil(move |py| -> PyResult> { let start_response = PyCFunction::new_closure( py, None, None, move |args: &Bound<'_, PyTuple>, kwargs: Option<&Bound<'_, PyDict>>| -> PyResult<_> { let args_native = args.extract::<(String, Vec<(String, String)>)>()?; let exc_info = kwargs.map_or(Ok(None), |kwargs| { let exc_info = kwargs.get_item("exc_info"); if let Ok(Some(exc_info)) = exc_info { if exc_info.is_none() { Ok(None) } else { Ok(Some(exc_info)) } } else { exc_info } })?; let mut wsgi_head_locked = wsgi_head_clone.blocking_lock(); if let Some(exc_info) = exc_info { if wsgi_head_locked.is_sent { let exc_info_tuple = exc_info.downcast::()?; let exc_info_exception = exc_info_tuple .get_item(1)? .getattr("with_traceback")? .call((exc_info_tuple.get_item(2)?,), None)? .downcast::()? .clone(); Err(exc_info_exception)? } } else if wsgi_head_locked.is_set { Err(PyAssertionError::new_err("Headers already set"))? } let status_code_string_option = args_native.0.split(" ").next(); if let Some(status_code_string) = status_code_string_option { wsgi_head_locked.status = status_code_string .parse() .map_err(|e: std::num::ParseIntError| anyhow::anyhow!(e))?; } else { Err(anyhow::anyhow!("Can't extract status code"))?; } let mut header_map: LinkedHashMap> = LinkedHashMap::new(); for header in args_native.1 { let header_name = header.0.to_lowercase(); let header_value = header.1; if let Some(header_values) = header_map.get_mut(&header_name) { header_values.push(header_value); } else { header_map.insert(header_name, vec![header_value]); } } wsgi_head_locked.headers = Some(header_map); wsgi_head_locked.is_set = true; Ok(()) }, )?; let mut environment: HashMap> = HashMap::new(); let is_https = environment_variables.contains_key("HTTPS"); let content_length = if let Some(content_length) = environment_variables.get("CONTENT_LENGTH") { content_length.parse::().ok() } else { None }; for (environment_variable, environment_variable_value) in environment_variables { environment.insert( environment_variable, PyString::new(py, &environment_variable_value).into_any(), ); } environment.insert( "wsgi.version".to_string(), PyTuple::new(py, [1, 0])?.into_any(), ); environment.insert( "wsgi.url_scheme".to_string(), PyString::new(py, if is_https { "https" } else { "http" }).into_any(), ); environment.insert( "wsgi.input".to_string(), (if let Some(content_length) = content_length { WsgidInputStream::new( BufReader::new(WsgidBodyReader::new( tx_mutex_clone.clone(), rx_mutex_clone.clone(), )) .take(content_length), ) } else { WsgidInputStream::new(BufReader::new(WsgidBodyReader::new( tx_mutex_clone.clone(), rx_mutex_clone.clone(), ))) }) .into_pyobject(py)? .into_any(), ); environment.insert( "wsgi.errors".to_string(), WsgidErrorStream::new(tx_mutex_clone.clone()) .into_pyobject(py)? .into_any(), ); environment.insert( "wsgi.multithread".to_string(), PyBool::new(py, false).as_any().clone(), ); environment.insert( "wsgi.multiprocess".to_string(), PyBool::new(py, true).as_any().clone(), ); environment.insert( "wsgi.run_once".to_string(), PyBool::new(py, false).as_any().clone(), ); let body_unknown = wsgi_application.call(py, (environment, start_response), None)?; let body_iterator = body_unknown .downcast_bound::(py)? .clone() .unbind(); Ok(body_iterator) })?; let current_application_id = application_id; body_iterators.insert(current_application_id, Arc::new(body_iterator)); application_id += 1; write_ipc_message( &mut tx_mutex.blocking_lock(), &postcard::to_allocvec::(&ProcessPoolToServerMessage { application_id: Some(current_application_id), status_code: None, headers: None, body_chunk: None, error_log_line: None, error_message: None, requests_body_chunk: false, })?, )? } else if received_message.requests_body_chunk { if let Some(application_id) = received_message.application_id { if let Some(body_iterator_arc) = body_iterators.get(&application_id) { let wsgi_head_clone = wsgi_head.clone(); let body_iterator_arc_clone = body_iterator_arc.clone(); let body_chunk_result = Python::with_gil(|py| -> PyResult>> { let mut body_iterator_bound = body_iterator_arc_clone.bind(py).clone(); if let Some(body_chunk) = body_iterator_bound.next() { Ok(Some(body_chunk?.extract::>()?)) } else { Ok(None) } }); let body_chunk = (match body_chunk_result { Err(error) => Err(std::io::Error::other(error)), Ok(None) => Ok(None), Ok(Some(chunk)) => { let wsgi_head_locked = wsgi_head_clone.blocking_lock(); if !wsgi_head_locked.is_set { Err(std::io::Error::other( "The \"start_response\" function hasn't been called.", )) } else { Ok(Some(chunk)) } } })?; let status_code; let headers; let mut wsgi_head_locked = wsgi_head_clone.blocking_lock(); if wsgi_head_locked.is_sent { status_code = None; headers = None; } else { status_code = Some(wsgi_head_locked.status); headers = wsgi_head_locked.headers.take(); wsgi_head_locked.is_sent = true; } drop(wsgi_head_locked); if body_chunk.is_none() { body_iterators.remove(&application_id); } write_ipc_message( &mut tx_mutex.blocking_lock(), &postcard::to_allocvec::(&ProcessPoolToServerMessage { application_id: None, status_code, headers, body_chunk, error_log_line: None, error_message: None, requests_body_chunk: false, })?, )? } else { Err(anyhow::anyhow!("The WSGI request wasn't initialized"))? } } else { Err(anyhow::anyhow!("The WSGI request wasn't initialized"))? } } Ok(()) })() .err() { if write_ipc_message( &mut tx_mutex.blocking_lock(), &postcard::to_allocvec::(&ProcessPoolToServerMessage { application_id: None, status_code: None, headers: None, body_chunk: None, error_log_line: None, error_message: Some(error.to_string()), requests_body_chunk: false, }) .unwrap_or_default(), ) .is_err() { break; } } } } fn init_wsgi_process_pool( wsgi_script_path: PathBuf, ) -> Result> { let available_parallelism = thread::available_parallelism()?.get(); // Safety: The function depends on `nix::unistd::fork`, which is executed before any threads are spawned. // The forking function is safe to call for single-threaded applications. unsafe { PreforkedProcessPool::new(available_parallelism, move |tx, rx| { let wsgi_script_path_clone = wsgi_script_path.clone(); wsgi_pool_fn(tx, rx, wsgi_script_path_clone) }) } } pub fn server_module_init( config: &ServerConfig, ) -> Result, Box> { let mut global_wsgi_process_pool = None; let mut host_wsgi_process_pools = Vec::new(); if let Some(wsgi_process_pool_path) = config["global"]["wsgidApplicationPath"].as_str() { global_wsgi_process_pool = Some(Arc::new(init_wsgi_process_pool(PathBuf::from_str( wsgi_process_pool_path, )?)?)); } let global_wsgi_path = config["global"]["wsgidPath"] .as_str() .map(|s| s.to_string()); if let Some(hosts) = config["hosts"].as_vec() { for host_yaml in hosts.iter() { let domain = host_yaml["domain"].as_str().map(String::from); let ip = host_yaml["ip"].as_str().map(String::from); let mut locations = Vec::new(); if let Some(locations_yaml) = host_yaml["locations"].as_vec() { for location_yaml in locations_yaml.iter() { if let Some(path_str) = location_yaml["path"].as_str() { let path = String::from(path_str); if let Some(wsgi_process_pool_path) = location_yaml["wsgidApplicationPath"].as_str() { locations.push(WsgidApplicationLocationWrap::new( path, Arc::new(init_wsgi_process_pool(PathBuf::from_str( wsgi_process_pool_path, )?)?), location_yaml["wsgidPath"].as_str().map(|s| s.to_string()), )); } } } } if let Some(wsgi_process_pool_path) = host_yaml["wsgidApplicationPath"].as_str() { host_wsgi_process_pools.push(WsgidApplicationWrap::new( domain, ip, Some(Arc::new(init_wsgi_process_pool(PathBuf::from_str( wsgi_process_pool_path, )?)?)), host_yaml["wsgiPath"].as_str().map(|s| s.to_string()), locations, )); } else if !locations.is_empty() { host_wsgi_process_pools.push(WsgidApplicationWrap::new( domain, ip, None, host_yaml["wsgiPath"].as_str().map(|s| s.to_string()), locations, )); } } } Ok(Box::new(WsgidModule::new( global_wsgi_process_pool, global_wsgi_path, Arc::new(host_wsgi_process_pools), ))) } struct WsgidModule { global_wsgi_process_pool: Option>, global_wsgi_path: Option, host_wsgi_process_pools: Arc>, } impl WsgidModule { fn new( global_wsgi_process_pool: Option>, global_wsgi_path: Option, host_wsgi_process_pools: Arc>, ) -> Self { Self { global_wsgi_process_pool, global_wsgi_path, host_wsgi_process_pools, } } } impl ServerModule for WsgidModule { fn get_handlers(&self, handle: Handle) -> Box { Box::new(WsgidModuleHandlers { handle, global_wsgi_process_pool: self.global_wsgi_process_pool.clone(), global_wsgi_path: self.global_wsgi_path.clone(), host_wsgi_process_pools: self.host_wsgi_process_pools.clone(), }) } } struct WsgidModuleHandlers { handle: Handle, global_wsgi_process_pool: Option>, global_wsgi_path: Option, host_wsgi_process_pools: Arc>, } #[async_trait] impl ServerModuleHandlers for WsgidModuleHandlers { async fn request_handler( &mut self, request: RequestData, config: &ServerConfig, socket_data: &SocketData, error_logger: &ErrorLogger, ) -> Result> { WithRuntime::new(self.handle.clone(), async move { let hyper_request = request.get_hyper_request(); // Use .take() instead of .clone(), since the values in Options will only be used once. let mut wsgi_process_pool = self.global_wsgi_process_pool.take(); let mut wsgi_path = self.global_wsgi_path.take(); // Should have used a HashMap instead of iterating over an array for better performance... for host_wsgi_process_pool_wrap in self.host_wsgi_process_pools.iter() { if match_hostname( match &host_wsgi_process_pool_wrap.domain { Some(value) => Some(value as &str), None => None, }, match hyper_request.headers().get(header::HOST) { Some(value) => value.to_str().ok(), None => None, }, ) && match &host_wsgi_process_pool_wrap.ip { Some(value) => ip_match(value as &str, socket_data.remote_addr.ip()), None => true, } { wsgi_process_pool = host_wsgi_process_pool_wrap.wsgi_process_pool.clone(); wsgi_path = host_wsgi_process_pool_wrap.wsgi_path.clone(); if let Ok(path_decoded) = urlencoding::decode( request .get_original_url() .unwrap_or(request.get_hyper_request().uri()) .path(), ) { for location_wrap in host_wsgi_process_pool_wrap.locations.iter() { if match_location(&location_wrap.path, &path_decoded) { wsgi_process_pool = Some(location_wrap.wsgi_process_pool.clone()); wsgi_path = location_wrap.wsgi_path.clone(); break; } } } break; } } let request_path = hyper_request.uri().path(); let mut request_path_bytes = request_path.bytes(); if request_path_bytes.len() < 1 || request_path_bytes.nth(0) != Some(b'/') { return Ok( ResponseData::builder(request) .status(StatusCode::BAD_REQUEST) .build(), ); } if let Some(wsgi_process_pool) = wsgi_process_pool { let wsgi_path = wsgi_path.unwrap_or("/".to_string()); let mut canonical_wsgi_path: &str = &wsgi_path; if canonical_wsgi_path.bytes().last() == Some(b'/') { canonical_wsgi_path = &canonical_wsgi_path[..(canonical_wsgi_path.len() - 1)]; } let request_path_with_slashes = match request_path == canonical_wsgi_path { true => format!("{}/", request_path), false => request_path.to_string(), }; if let Some(stripped_request_path) = request_path_with_slashes.strip_prefix(canonical_wsgi_path) { let wwwroot_yaml = &config["wwwroot"]; let wwwroot = wwwroot_yaml.as_str().unwrap_or("/nonexistent"); let wwwroot_unknown = PathBuf::from(wwwroot); let wwwroot_pathbuf = match wwwroot_unknown.as_path().is_absolute() { true => wwwroot_unknown, false => match fs::canonicalize(&wwwroot_unknown).await { Ok(pathbuf) => pathbuf, Err(_) => wwwroot_unknown, }, }; let wwwroot = wwwroot_pathbuf.as_path(); let mut relative_path = &request_path[1..]; while relative_path.as_bytes().first().copied() == Some(b'/') { relative_path = &relative_path[1..]; } let decoded_relative_path = match urlencoding::decode(relative_path) { Ok(path) => path.to_string(), Err(_) => { return Ok( ResponseData::builder(request) .status(StatusCode::BAD_REQUEST) .build(), ); } }; let joined_pathbuf = wwwroot.join(decoded_relative_path); let execute_pathbuf = joined_pathbuf; let execute_path_info = stripped_request_path .strip_prefix("/") .map(|s| s.to_string()); return execute_wsgi_with_environment_variables( request, socket_data, error_logger, wwwroot, execute_pathbuf, execute_path_info, config["serverAdministratorEmail"].as_str(), wsgi_process_pool, ) .await; } } Ok(ResponseData::builder(request).build()) }) .await } async fn proxy_request_handler( &mut self, request: RequestData, _config: &ServerConfig, _socket_data: &SocketData, _error_logger: &ErrorLogger, ) -> Result> { Ok(ResponseData::builder(request).build()) } async fn response_modifying_handler( &mut self, response: HyperResponse, ) -> Result> { Ok(response) } async fn proxy_response_modifying_handler( &mut self, response: HyperResponse, ) -> Result> { Ok(response) } async fn connect_proxy_request_handler( &mut self, _upgraded_request: HyperUpgraded, _connect_address: &str, _config: &ServerConfig, _socket_data: &SocketData, _error_logger: &ErrorLogger, ) -> Result<(), Box> { Ok(()) } fn does_connect_proxy_requests(&mut self) -> bool { false } async fn websocket_request_handler( &mut self, _websocket: HyperWebsocket, _uri: &hyper::Uri, _config: &ServerConfig, _socket_data: &SocketData, _error_logger: &ErrorLogger, ) -> Result<(), Box> { Ok(()) } fn does_websocket_requests(&mut self, _config: &ServerConfig, _socket_data: &SocketData) -> bool { false } } struct ResponseHeadHyper { status: StatusCode, headers: Option, } impl ResponseHeadHyper { fn new() -> Self { Self { status: StatusCode::OK, headers: None, } } } #[allow(clippy::too_many_arguments)] async fn execute_wsgi_with_environment_variables( request: RequestData, socket_data: &SocketData, error_logger: &ErrorLogger, wwwroot: &Path, execute_pathbuf: PathBuf, path_info: Option, server_administrator_email: Option<&str>, wsgi_process_pool: Arc, ) -> Result> { let mut environment_variables: LinkedHashMap = LinkedHashMap::new(); let hyper_request = request.get_hyper_request(); let original_request_uri = request.get_original_url().unwrap_or(hyper_request.uri()); if let Some(auth_user) = request.get_auth_user() { if let Some(authorization) = hyper_request.headers().get(header::AUTHORIZATION) { let authorization_value = String::from_utf8_lossy(authorization.as_bytes()).to_string(); let mut authorization_value_split = authorization_value.split(" "); if let Some(authorization_type) = authorization_value_split.next() { environment_variables.insert("AUTH_TYPE".to_string(), authorization_type.to_string()); } } environment_variables.insert("REMOTE_USER".to_string(), auth_user.to_string()); } environment_variables.insert( "QUERY_STRING".to_string(), match hyper_request.uri().query() { Some(query) => query.to_string(), None => "".to_string(), }, ); environment_variables.insert("SERVER_SOFTWARE".to_string(), SERVER_SOFTWARE.to_string()); environment_variables.insert( "SERVER_PROTOCOL".to_string(), match hyper_request.version() { hyper::Version::HTTP_09 => "HTTP/0.9".to_string(), hyper::Version::HTTP_10 => "HTTP/1.0".to_string(), hyper::Version::HTTP_11 => "HTTP/1.1".to_string(), hyper::Version::HTTP_2 => "HTTP/2.0".to_string(), hyper::Version::HTTP_3 => "HTTP/3.0".to_string(), _ => "HTTP/Unknown".to_string(), }, ); environment_variables.insert( "SERVER_PORT".to_string(), socket_data.local_addr.port().to_string(), ); environment_variables.insert( "SERVER_ADDR".to_string(), socket_data.local_addr.ip().to_canonical().to_string(), ); if let Some(server_administrator_email) = server_administrator_email { environment_variables.insert( "SERVER_ADMIN".to_string(), server_administrator_email.to_string(), ); } if let Some(host) = hyper_request.headers().get(header::HOST) { environment_variables.insert( "SERVER_NAME".to_string(), String::from_utf8_lossy(host.as_bytes()).to_string(), ); } environment_variables.insert( "DOCUMENT_ROOT".to_string(), wwwroot.to_string_lossy().to_string(), ); environment_variables.insert( "PATH_INFO".to_string(), match &path_info { Some(path_info) => format!("/{}", path_info), None => "".to_string(), }, ); environment_variables.insert( "PATH_TRANSLATED".to_string(), match &path_info { Some(path_info) => { let mut path_translated = execute_pathbuf.clone(); path_translated.push(path_info); path_translated.to_string_lossy().to_string() } None => "".to_string(), }, ); environment_variables.insert( "REQUEST_METHOD".to_string(), hyper_request.method().to_string(), ); environment_variables.insert("GATEWAY_INTERFACE".to_string(), "CGI/1.1".to_string()); environment_variables.insert( "REQUEST_URI".to_string(), format!( "{}{}", original_request_uri.path(), match original_request_uri.query() { Some(query) => format!("?{}", query), None => String::from(""), } ), ); environment_variables.insert( "REMOTE_PORT".to_string(), socket_data.remote_addr.port().to_string(), ); environment_variables.insert( "REMOTE_ADDR".to_string(), socket_data.remote_addr.ip().to_canonical().to_string(), ); environment_variables.insert( "SCRIPT_FILENAME".to_string(), execute_pathbuf.to_string_lossy().to_string(), ); if let Ok(script_path) = execute_pathbuf.as_path().strip_prefix(wwwroot) { environment_variables.insert( "SCRIPT_NAME".to_string(), format!( "/{}", match cfg!(windows) { true => script_path.to_string_lossy().to_string().replace("\\", "/"), false => script_path.to_string_lossy().to_string(), } ), ); } if socket_data.encrypted { environment_variables.insert("HTTPS".to_string(), "ON".to_string()); } let mut content_length_set = false; for (header_name, header_value) in hyper_request.headers().iter() { let env_header_name = match *header_name { header::CONTENT_LENGTH => { content_length_set = true; "CONTENT_LENGTH".to_string() } header::CONTENT_TYPE => "CONTENT_TYPE".to_string(), _ => { let mut result = String::new(); result.push_str("HTTP_"); for c in header_name.as_str().to_uppercase().chars() { if c.is_alphanumeric() { result.push(c); } else { result.push('_'); } } result } }; if environment_variables.contains_key(&env_header_name) { let value = environment_variables.get_mut(&env_header_name); if let Some(value) = value { if env_header_name == "HTTP_COOKIE" { value.push_str("; "); } else { // See https://stackoverflow.com/a/1801191 value.push_str(", "); } value.push_str(String::from_utf8_lossy(header_value.as_bytes()).as_ref()); } else { environment_variables.insert( env_header_name, String::from_utf8_lossy(header_value.as_bytes()).to_string(), ); } } else { environment_variables.insert( env_header_name, String::from_utf8_lossy(header_value.as_bytes()).to_string(), ); } } if !content_length_set { environment_variables.insert("CONTENT_LENGTH".to_string(), "0".to_string()); } let (hyper_request, _, _) = request.into_parts(); execute_wsgi( hyper_request, error_logger, wsgi_process_pool, environment_variables, ) .await } async fn execute_wsgi( hyper_request: HyperRequest, error_logger: &ErrorLogger, wsgi_process_pool: Arc, environment_variables: LinkedHashMap, ) -> Result> { let ipc_mutex = wsgi_process_pool .obtain_process_with_init_async_ipc() .await?; let (_, body) = hyper_request.into_parts(); let mut body_stream = body.into_data_stream().map_err(std::io::Error::other); let application_id = { let (tx, rx) = &mut *ipc_mutex.lock().await; write_ipc_message_async( tx, &postcard::to_allocvec(&ServerToProcessPoolMessage { application_id: None, environment_variables: Some(environment_variables), body_chunk: None, body_error_message: None, requests_body_chunk: false, })?, ) .await?; let application_id; loop { let received_message = postcard::from_bytes::(&read_ipc_message_async(rx).await?)?; if let Some(error_message) = received_message.error_message { Err(anyhow::anyhow!(error_message))? } if let Some(application_id_obtained) = received_message.application_id { application_id = application_id_obtained; break; } if let Some(error_log_line) = received_message.error_log_line { error_logger.log(&error_log_line).await; } else if received_message.requests_body_chunk { let body_chunk; let body_error_message; match body_stream.next().await { None => { body_chunk = None; body_error_message = None; } Some(Err(err)) => { body_chunk = None; body_error_message = Some(err.to_string()); } Some(Ok(chunk)) => { body_chunk = Some(chunk.to_vec()); body_error_message = None; } }; write_ipc_message_async( tx, &postcard::to_allocvec(&ServerToProcessPoolMessage { application_id: None, environment_variables: None, body_chunk, body_error_message, requests_body_chunk: false, })?, ) .await?; } } application_id }; let wsgi_head = Arc::new(Mutex::new(ResponseHeadHyper::new())); let wsgi_head_clone = wsgi_head.clone(); let error_logger_arc = Arc::new(error_logger.clone()); let body_stream_mutex = Arc::new(Mutex::new(body_stream)); let mut response_stream = futures_util::stream::unfold(ipc_mutex, move |ipc_mutex| { let wsgi_head_clone = wsgi_head_clone.clone(); let error_logger_arc_clone = error_logger_arc.clone(); let body_stream_mutex_clone = body_stream_mutex.clone(); Box::pin(async move { let ipc_mutex_borrowed = &ipc_mutex; let chunk_result: Result, Box> = async { let (tx, rx) = &mut *ipc_mutex_borrowed.lock().await; write_ipc_message_async( tx, &postcard::to_allocvec(&ServerToProcessPoolMessage { application_id: Some(application_id), environment_variables: None, body_chunk: None, body_error_message: None, requests_body_chunk: true, })?, ) .await?; loop { let received_message = postcard::from_bytes::(&read_ipc_message_async(rx).await?)?; if let Some(error_message) = received_message.error_message { Err(anyhow::anyhow!(error_message))? } else if let Some(body_chunk) = received_message.body_chunk { if let Some(status_code) = received_message.status_code { let mut wsgi_head_locked = wsgi_head_clone.lock().await; wsgi_head_locked.status = StatusCode::from_u16(status_code)?; if let Some(headers) = received_message.headers { let mut header_map = HeaderMap::new(); for (key, value) in headers { for value in value { header_map.append( HeaderName::from_str(&key)?, HeaderValue::from_bytes(value.as_bytes())?, ); } } wsgi_head_locked.headers = Some(header_map); } } return Ok(Some(Bytes::from(body_chunk))); } else if let Some(error_log_line) = received_message.error_log_line { error_logger_arc_clone.log(&error_log_line).await; } else if received_message.requests_body_chunk { let body_chunk; let body_error_message; match body_stream_mutex_clone.lock().await.next().await { None => { body_chunk = None; body_error_message = None; } Some(Err(err)) => { body_chunk = None; body_error_message = Some(err.to_string()); } Some(Ok(chunk)) => { body_chunk = Some(chunk.to_vec()); body_error_message = None; } }; write_ipc_message_async( tx, &postcard::to_allocvec(&ServerToProcessPoolMessage { application_id: None, environment_variables: None, body_chunk, body_error_message, requests_body_chunk: false, })?, ) .await?; } else { return Ok(None); } } } .await; match chunk_result { Err(error) => Some((Err(std::io::Error::other(error.to_string())), ipc_mutex)), Ok(None) => None, Ok(Some(chunk)) => Some((Ok(chunk), ipc_mutex)), } }) }); let first_chunk = response_stream.next().await; let response_body = if let Some(Err(first_chunk_error)) = first_chunk { Err(first_chunk_error)? } else if let Some(Ok(first_chunk)) = first_chunk { let response_stream_first_item = futures_util::stream::once(async move { Ok(first_chunk) }); let response_stream_combined = response_stream_first_item.chain(response_stream); let stream_body = StreamBody::new(response_stream_combined.map_ok(Frame::data)); BodyExt::boxed(stream_body) } else { BodyExt::boxed(Empty::new().map_err(|e| match e {})) }; let mut wsgi_head_locked = wsgi_head.lock().await; let mut hyper_response = Response::new(response_body); *hyper_response.status_mut() = wsgi_head_locked.status; if let Some(headers) = wsgi_head_locked.headers.take() { *hyper_response.headers_mut() = headers; } Ok( ResponseData::builder_without_request() .response(hyper_response) .build(), ) }