// FastCGI handler code inspired by SVR.JS's GreenRhombus mod, translated from JavaScript to Rust. // Based on the "cgi" and "scgi" module use std::env; use std::error::Error; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; use crate::ferron_common::{ ErrorLogger, HyperRequest, HyperResponse, RequestData, ResponseData, ServerConfig, ServerModule, ServerModuleHandlers, SocketData, }; use crate::ferron_common::{HyperUpgraded, WithRuntime}; use async_trait::async_trait; use futures_util::future::Either; use futures_util::TryStreamExt; use hashlink::LinkedHashMap; use http_body_util::{BodyExt, StreamBody}; use httparse::EMPTY_HEADER; use hyper::body::{Bytes, Frame}; use hyper::{header, Response, StatusCode}; use hyper_tungstenite::HyperWebsocket; use tokio::fs; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio::net::TcpStream; use tokio::runtime::Handle; use tokio::sync::RwLock; use tokio_util::codec::{FramedRead, FramedWrite}; use tokio_util::io::{ReaderStream, SinkWriter, StreamReader}; use crate::ferron_res::server_software::SERVER_SOFTWARE; use crate::ferron_util::cgi_response::CgiResponse; use crate::ferron_util::copy_move::Copier; use crate::ferron_util::fcgi_decoder::{FcgiDecodedData, FcgiDecoder}; use crate::ferron_util::fcgi_encoder::FcgiEncoder; use crate::ferron_util::fcgi_name_value_pair::construct_fastcgi_name_value_pair; use crate::ferron_util::fcgi_record::construct_fastcgi_record; use crate::ferron_util::read_to_end_move::ReadToEndFuture; use crate::ferron_util::split_stream_by_map::SplitStreamByMapExt; use crate::ferron_util::ttl_cache::TtlCache; pub fn server_module_init( _config: &ServerConfig, ) -> Result, Box> { let cache = Arc::new(RwLock::new(TtlCache::new(Duration::from_millis(100)))); Ok(Box::new(FcgiModule::new(cache))) } #[allow(clippy::type_complexity)] struct FcgiModule { path_cache: Arc, Option)>>>, } impl FcgiModule { #[allow(clippy::type_complexity)] fn new(path_cache: Arc, Option)>>>) -> Self { Self { path_cache } } } impl ServerModule for FcgiModule { fn get_handlers(&self, handle: Handle) -> Box { Box::new(FcgiModuleHandlers { path_cache: self.path_cache.clone(), handle, }) } } #[allow(clippy::type_complexity)] struct FcgiModuleHandlers { handle: Handle, path_cache: Arc, Option)>>>, } #[async_trait] impl ServerModuleHandlers for FcgiModuleHandlers { async fn request_handler( &mut self, request: RequestData, config: &ServerConfig, socket_data: &SocketData, error_logger: &ErrorLogger, ) -> Result> { WithRuntime::new(self.handle.clone(), async move { let mut fastcgi_script_exts = Vec::new(); let fastcgi_script_exts_yaml = &config["fcgiScriptExtensions"]; if let Some(fastcgi_script_exts_obtained) = fastcgi_script_exts_yaml.as_vec() { for fastcgi_script_ext_yaml in fastcgi_script_exts_obtained.iter() { if let Some(fastcgi_script_ext) = fastcgi_script_ext_yaml.as_str() { fastcgi_script_exts.push(fastcgi_script_ext); } } } let mut fastcgi_to = "tcp://localhost:4000/"; let fastcgi_to_yaml = &config["fcgiTo"]; if let Some(fastcgi_to_obtained) = fastcgi_to_yaml.as_str() { fastcgi_to = fastcgi_to_obtained; } let mut fastcgi_path = None; if let Some(fastcgi_path_obtained) = config["fcgiPath"].as_str() { fastcgi_path = Some(fastcgi_path_obtained.to_string()); } let hyper_request = request.get_hyper_request(); let request_path = hyper_request.uri().path(); let mut request_path_bytes = request_path.bytes(); if request_path_bytes.len() < 1 || request_path_bytes.nth(0) != Some(b'/') { return Ok( ResponseData::builder(request) .status(StatusCode::BAD_REQUEST) .build(), ); } let mut execute_pathbuf = None; let mut execute_path_info = None; let mut wwwroot_detected = None; if let Some(fastcgi_path) = fastcgi_path { let mut canonical_fastcgi_path: &str = &fastcgi_path; if canonical_fastcgi_path.bytes().last() == Some(b'/') { canonical_fastcgi_path = &canonical_fastcgi_path[..(canonical_fastcgi_path.len() - 1)]; } let request_path_with_slashes = match request_path == canonical_fastcgi_path { true => format!("{}/", request_path), false => request_path.to_string(), }; if let Some(stripped_request_path) = request_path_with_slashes.strip_prefix(canonical_fastcgi_path) { let wwwroot_yaml = &config["wwwroot"]; let wwwroot = wwwroot_yaml.as_str().unwrap_or("/nonexistent"); let wwwroot_unknown = PathBuf::from(wwwroot); let wwwroot_pathbuf = match wwwroot_unknown.as_path().is_absolute() { true => wwwroot_unknown, false => match fs::canonicalize(&wwwroot_unknown).await { Ok(pathbuf) => pathbuf, Err(_) => wwwroot_unknown, }, }; wwwroot_detected = Some(wwwroot_pathbuf.clone()); let wwwroot = wwwroot_pathbuf.as_path(); let mut relative_path = &request_path[1..]; while relative_path.as_bytes().first().copied() == Some(b'/') { relative_path = &relative_path[1..]; } let decoded_relative_path = match urlencoding::decode(relative_path) { Ok(path) => path.to_string(), Err(_) => { return Ok( ResponseData::builder(request) .status(StatusCode::BAD_REQUEST) .build(), ); } }; let joined_pathbuf = wwwroot.join(decoded_relative_path); execute_pathbuf = Some(joined_pathbuf); execute_path_info = stripped_request_path .strip_prefix("/") .map(|s| s.to_string()); } } if execute_pathbuf.is_none() { if let Some(wwwroot) = config["wwwroot"].as_str() { let cache_key = format!( "{}{}{}", match config["ip"].as_str() { Some(ip) => format!("{}-", ip), None => String::from(""), }, match config["domain"].as_str() { Some(domain) => format!("{}-", domain), None => String::from(""), }, request_path ); let wwwroot_unknown = PathBuf::from(wwwroot); let wwwroot_pathbuf = match wwwroot_unknown.as_path().is_absolute() { true => wwwroot_unknown, false => match fs::canonicalize(&wwwroot_unknown).await { Ok(pathbuf) => pathbuf, Err(_) => wwwroot_unknown, }, }; wwwroot_detected = Some(wwwroot_pathbuf.clone()); let wwwroot = wwwroot_pathbuf.as_path(); let read_rwlock = self.path_cache.read().await; let (execute_pathbuf_got, execute_path_info_got) = match read_rwlock.get(&cache_key) { Some(data) => { drop(read_rwlock); data } None => { drop(read_rwlock); let mut relative_path = &request_path[1..]; while relative_path.as_bytes().first().copied() == Some(b'/') { relative_path = &relative_path[1..]; } let decoded_relative_path = match urlencoding::decode(relative_path) { Ok(path) => path.to_string(), Err(_) => { return Ok( ResponseData::builder(request) .status(StatusCode::BAD_REQUEST) .build(), ); } }; let joined_pathbuf = wwwroot.join(decoded_relative_path); let mut execute_pathbuf: Option = None; let mut execute_path_info: Option = None; match fs::metadata(&joined_pathbuf).await { Ok(metadata) => { if metadata.is_file() { let contained_extension = joined_pathbuf .extension() .map(|a| format!(".{}", a.to_string_lossy())); if let Some(contained_extension) = contained_extension { if fastcgi_script_exts.contains(&(&contained_extension as &str)) { execute_pathbuf = Some(joined_pathbuf); } } } else if metadata.is_dir() { let indexes = vec!["index.php", "index.cgi"]; for index in indexes { let temp_joined_pathbuf = joined_pathbuf.join(index); match fs::metadata(&temp_joined_pathbuf).await { Ok(temp_metadata) => { if temp_metadata.is_file() { let contained_extension = temp_joined_pathbuf .extension() .map(|a| format!(".{}", a.to_string_lossy())); if let Some(contained_extension) = contained_extension { if fastcgi_script_exts.contains(&(&contained_extension as &str)) { execute_pathbuf = Some(temp_joined_pathbuf); break; } } } } Err(_) => continue, }; } } } Err(err) => { if err.kind() == tokio::io::ErrorKind::NotADirectory { // TODO: find a file let mut temp_pathbuf = joined_pathbuf.clone(); loop { if !temp_pathbuf.pop() { break; } match fs::metadata(&temp_pathbuf).await { Ok(metadata) => { if metadata.is_file() { let temp_path = temp_pathbuf.as_path(); if !temp_path.starts_with(wwwroot) { // Traversed above the webroot, so ignore that. break; } let path_info = match joined_pathbuf.as_path().strip_prefix(temp_path) { Ok(path) => { let path = path.to_string_lossy().to_string(); Some(match cfg!(windows) { true => path.replace("\\", "/"), false => path, }) } Err(_) => None, }; let mut request_path_normalized = match cfg!(windows) { true => request_path.to_lowercase(), false => request_path.to_string(), }; while request_path_normalized.contains("//") { request_path_normalized = request_path_normalized.replace("//", "/"); } if request_path_normalized == "/cgi-bin" || request_path_normalized.starts_with("/cgi-bin/") { execute_pathbuf = Some(temp_pathbuf); execute_path_info = path_info; break; } else { let contained_extension = temp_pathbuf .extension() .map(|a| format!(".{}", a.to_string_lossy())); if let Some(contained_extension) = contained_extension { if fastcgi_script_exts.contains(&(&contained_extension as &str)) { execute_pathbuf = Some(temp_pathbuf); execute_path_info = path_info; break; } } } } else { break; } } Err(err) => match err.kind() { tokio::io::ErrorKind::NotADirectory => (), _ => break, }, }; } } } }; let data = (execute_pathbuf, execute_path_info); let mut write_rwlock = self.path_cache.write().await; write_rwlock.cleanup(); write_rwlock.insert(cache_key, data.clone()); drop(write_rwlock); data } }; execute_pathbuf = execute_pathbuf_got; execute_path_info = execute_path_info_got; } } if let Some(execute_pathbuf) = execute_pathbuf { if let Some(wwwroot_detected) = wwwroot_detected { return execute_fastcgi_with_environment_variables( request, socket_data, error_logger, wwwroot_detected.as_path(), execute_pathbuf, execute_path_info, config["serverAdministratorEmail"].as_str(), fastcgi_to, ) .await; } } Ok(ResponseData::builder(request).build()) }) .await } async fn proxy_request_handler( &mut self, request: RequestData, _config: &ServerConfig, _socket_data: &SocketData, _error_logger: &ErrorLogger, ) -> Result> { Ok(ResponseData::builder(request).build()) } async fn response_modifying_handler( &mut self, response: HyperResponse, ) -> Result> { Ok(response) } async fn proxy_response_modifying_handler( &mut self, response: HyperResponse, ) -> Result> { Ok(response) } async fn connect_proxy_request_handler( &mut self, _upgraded_request: HyperUpgraded, _connect_address: &str, _config: &ServerConfig, _socket_data: &SocketData, _error_logger: &ErrorLogger, ) -> Result<(), Box> { Ok(()) } fn does_connect_proxy_requests(&mut self) -> bool { false } async fn websocket_request_handler( &mut self, _websocket: HyperWebsocket, _uri: &hyper::Uri, _config: &ServerConfig, _socket_data: &SocketData, _error_logger: &ErrorLogger, ) -> Result<(), Box> { Ok(()) } fn does_websocket_requests(&mut self, _config: &ServerConfig, _socket_data: &SocketData) -> bool { false } } #[allow(clippy::too_many_arguments)] async fn execute_fastcgi_with_environment_variables( request: RequestData, socket_data: &SocketData, error_logger: &ErrorLogger, wwwroot: &Path, execute_pathbuf: PathBuf, path_info: Option, server_administrator_email: Option<&str>, fastcgi_to: &str, ) -> Result> { let mut environment_variables: LinkedHashMap = LinkedHashMap::new(); let hyper_request = request.get_hyper_request(); let original_request_uri = request.get_original_url().unwrap_or(hyper_request.uri()); if let Some(auth_user) = request.get_auth_user() { if let Some(authorization) = hyper_request.headers().get(header::AUTHORIZATION) { let authorization_value = String::from_utf8_lossy(authorization.as_bytes()).to_string(); let mut authorization_value_split = authorization_value.split(" "); if let Some(authorization_type) = authorization_value_split.next() { environment_variables.insert("AUTH_TYPE".to_string(), authorization_type.to_string()); } } environment_variables.insert("REMOTE_USER".to_string(), auth_user.to_string()); } environment_variables.insert( "QUERY_STRING".to_string(), match hyper_request.uri().query() { Some(query) => query.to_string(), None => "".to_string(), }, ); environment_variables.insert("SERVER_SOFTWARE".to_string(), SERVER_SOFTWARE.to_string()); environment_variables.insert( "SERVER_PROTOCOL".to_string(), match hyper_request.version() { hyper::Version::HTTP_09 => "HTTP/0.9".to_string(), hyper::Version::HTTP_10 => "HTTP/1.0".to_string(), hyper::Version::HTTP_11 => "HTTP/1.1".to_string(), hyper::Version::HTTP_2 => "HTTP/2.0".to_string(), hyper::Version::HTTP_3 => "HTTP/3.0".to_string(), _ => "HTTP/Unknown".to_string(), }, ); environment_variables.insert( "SERVER_PORT".to_string(), socket_data.local_addr.port().to_string(), ); environment_variables.insert( "SERVER_ADDR".to_string(), socket_data.local_addr.ip().to_canonical().to_string(), ); if let Some(server_administrator_email) = server_administrator_email { environment_variables.insert( "SERVER_ADMIN".to_string(), server_administrator_email.to_string(), ); } if let Some(host) = hyper_request.headers().get(header::HOST) { environment_variables.insert( "SERVER_NAME".to_string(), String::from_utf8_lossy(host.as_bytes()).to_string(), ); } environment_variables.insert( "DOCUMENT_ROOT".to_string(), wwwroot.to_string_lossy().to_string(), ); environment_variables.insert( "PATH_INFO".to_string(), match &path_info { Some(path_info) => format!("/{}", path_info), None => "".to_string(), }, ); environment_variables.insert( "PATH_TRANSLATED".to_string(), match &path_info { Some(path_info) => { let mut path_translated = execute_pathbuf.clone(); path_translated.push(path_info); path_translated.to_string_lossy().to_string() } None => "".to_string(), }, ); environment_variables.insert( "REQUEST_METHOD".to_string(), hyper_request.method().to_string(), ); environment_variables.insert("GATEWAY_INTERFACE".to_string(), "CGI/1.1".to_string()); environment_variables.insert( "REQUEST_URI".to_string(), format!( "{}{}", original_request_uri.path(), match original_request_uri.query() { Some(query) => format!("?{}", query), None => String::from(""), } ), ); environment_variables.insert( "REMOTE_PORT".to_string(), socket_data.remote_addr.port().to_string(), ); environment_variables.insert( "REMOTE_ADDR".to_string(), socket_data.remote_addr.ip().to_canonical().to_string(), ); environment_variables.insert( "SCRIPT_FILENAME".to_string(), execute_pathbuf.to_string_lossy().to_string(), ); if let Ok(script_path) = execute_pathbuf.as_path().strip_prefix(wwwroot) { environment_variables.insert( "SCRIPT_NAME".to_string(), format!( "/{}", match cfg!(windows) { true => script_path.to_string_lossy().to_string().replace("\\", "/"), false => script_path.to_string_lossy().to_string(), } ), ); } if socket_data.encrypted { environment_variables.insert("HTTPS".to_string(), "ON".to_string()); } let mut content_length_set = false; for (header_name, header_value) in hyper_request.headers().iter() { let env_header_name = match *header_name { header::CONTENT_LENGTH => { content_length_set = true; "CONTENT_LENGTH".to_string() } header::CONTENT_TYPE => "CONTENT_TYPE".to_string(), _ => { let mut result = String::new(); result.push_str("HTTP_"); for c in header_name.as_str().to_uppercase().chars() { if c.is_alphanumeric() { result.push(c); } else { result.push('_'); } } result } }; if environment_variables.contains_key(&env_header_name) { let value = environment_variables.get_mut(&env_header_name); if let Some(value) = value { if env_header_name == "HTTP_COOKIE" { value.push_str("; "); } else { // See https://stackoverflow.com/a/1801191 value.push_str(", "); } value.push_str(String::from_utf8_lossy(header_value.as_bytes()).as_ref()); } else { environment_variables.insert( env_header_name, String::from_utf8_lossy(header_value.as_bytes()).to_string(), ); } } else { environment_variables.insert( env_header_name, String::from_utf8_lossy(header_value.as_bytes()).to_string(), ); } } if !content_length_set { environment_variables.insert("CONTENT_LENGTH".to_string(), "0".to_string()); } let (hyper_request, _, _) = request.into_parts(); execute_fastcgi( hyper_request, error_logger, fastcgi_to, environment_variables, ) .await } async fn execute_fastcgi( hyper_request: HyperRequest, error_logger: &ErrorLogger, fastcgi_to: &str, mut environment_variables: LinkedHashMap, ) -> Result> { let (_, body) = hyper_request.into_parts(); // Insert other environment variables for (key, value) in env::vars_os() { let key_string = key.to_string_lossy().to_string(); let value_string = value.to_string_lossy().to_string(); environment_variables .entry(key_string) .or_insert(value_string); } let fastcgi_to_fixed = if let Some(stripped) = fastcgi_to.strip_prefix("unix:///") { // hyper::Uri fails to parse a string if there is an empty authority, so add an "ignore" authority to Unix socket URLs &format!("unix://ignore/{}", stripped) } else { fastcgi_to }; let fastcgi_to_url = fastcgi_to_fixed.parse::()?; let scheme_str = fastcgi_to_url.scheme_str(); let (socket_reader, mut socket_writer) = match scheme_str { Some("tcp") => { let host = match fastcgi_to_url.host() { Some(host) => host, None => Err(anyhow::anyhow!("The FastCGI URL doesn't include the host"))?, }; let port = match fastcgi_to_url.port_u16() { Some(port) => port, None => Err(anyhow::anyhow!("The FastCGI URL doesn't include the port"))?, }; let addr = format!("{}:{}", host, port); match connect_tcp(&addr).await { Ok(data) => data, Err(err) => match err.kind() { tokio::io::ErrorKind::ConnectionRefused | tokio::io::ErrorKind::NotFound | tokio::io::ErrorKind::HostUnreachable => { error_logger .log(&format!("Service unavailable: {}", err)) .await; return Ok( ResponseData::builder_without_request() .status(StatusCode::SERVICE_UNAVAILABLE) .build(), ); } _ => Err(err)?, }, } } Some("unix") => { let path = fastcgi_to_url.path(); match connect_unix(path).await { Ok(data) => data, Err(err) => match err.kind() { tokio::io::ErrorKind::ConnectionRefused | tokio::io::ErrorKind::NotFound | tokio::io::ErrorKind::HostUnreachable => { error_logger .log(&format!("Service unavailable: {}", err)) .await; return Ok( ResponseData::builder_without_request() .status(StatusCode::SERVICE_UNAVAILABLE) .build(), ); } _ => Err(err)?, }, } } _ => Err(anyhow::anyhow!( "Only HTTP and HTTPS reverse proxy URLs are supported." ))?, }; // Construct and send BEGIN_REQUEST record // Use the responder role and don't use keep-alive let begin_request_packet = construct_fastcgi_record(1, 1, &[0, 1, 0, 0, 0, 0, 0, 0]); socket_writer.write_all(&begin_request_packet).await?; // Construct and send PARAMS records let mut environment_variables_to_wrap = Vec::new(); for (key, value) in environment_variables.iter() { let mut environment_variable = construct_fastcgi_name_value_pair(key.as_bytes(), value.as_bytes()); environment_variables_to_wrap.append(&mut environment_variable); } if !environment_variables_to_wrap.is_empty() { let mut offset = 0; while offset < environment_variables_to_wrap.len() { let chunk_size = std::cmp::min(65536, environment_variables_to_wrap.len() - offset); let chunk = &environment_variables_to_wrap[offset..offset + chunk_size]; // Record type 4 means PARAMS let params_packet = construct_fastcgi_record(4, 1, chunk); socket_writer.write_all(¶ms_packet).await?; offset += chunk_size; } } let params_packet_terminating = construct_fastcgi_record(4, 1, &[]); socket_writer.write_all(¶ms_packet_terminating).await?; let cgi_stdin_reader = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other)); // Emulated standard input, standard output, and standard error type EitherStream = Either, Result>; let stdin = SinkWriter::new(FramedWrite::new(socket_writer, FcgiEncoder::new())); let stdout_and_stderr = FramedRead::new(socket_reader, FcgiDecoder::new()); let (stdout_stream, stderr_stream) = stdout_and_stderr.split_by_map(|item| match item { Ok(FcgiDecodedData::Stdout(bytes)) => EitherStream::Left(Ok(bytes)), Ok(FcgiDecodedData::Stderr(bytes)) => EitherStream::Right(Ok(bytes)), Err(err) => EitherStream::Left(Err(err)), }); let stdout = StreamReader::new(stdout_stream); let stderr = StreamReader::new(stderr_stream); let mut cgi_response = CgiResponse::new(stdout); let stdin_copy_future = Copier::with_zero_packet_writing(cgi_stdin_reader, stdin).copy(); let mut stdin_copy_future_pinned = Box::pin(stdin_copy_future); let stderr_read_future = ReadToEndFuture::new(stderr); let mut stderr_read_future_pinned = Box::pin(stderr_read_future); let mut headers = [EMPTY_HEADER; 128]; let mut early_stdin_copied = false; // Needed to wrap this in another scope to prevent errors with multiple mutable borrows. { let mut head_obtained = false; let stdout_parse_future = cgi_response.get_head(); tokio::pin!(stdout_parse_future); // Cannot use a loop with tokio::select, since stdin_copy_future_pinned being constantly ready will make the web server stop responding to HTTP requests tokio::select! { biased; result = &mut stdin_copy_future_pinned => { early_stdin_copied = true; result?; }, obtained_head = &mut stdout_parse_future => { let obtained_head = obtained_head?; if !obtained_head.is_empty() { httparse::parse_headers(obtained_head, &mut headers)?; } head_obtained = true; }, result = &mut stderr_read_future_pinned => { let stderr_vec = result?; let stderr_string = String::from_utf8_lossy(stderr_vec.as_slice()).to_string(); if !stderr_string.is_empty() { error_logger .log(&format!("There were CGI errors: {}", stderr_string)) .await; } return Ok( ResponseData::builder_without_request() .status(StatusCode::INTERNAL_SERVER_ERROR) .build(), ); }, } if !head_obtained { // Kept it same as in the tokio::select macro tokio::select! { biased; result = &mut stderr_read_future_pinned => { let stderr_vec = result?; let stderr_string = String::from_utf8_lossy(stderr_vec.as_slice()).to_string(); if !stderr_string.is_empty() { error_logger .log(&format!("There were FastCGI errors: {}", stderr_string)) .await; } return Ok( ResponseData::builder_without_request() .status(StatusCode::INTERNAL_SERVER_ERROR) .build(), ); }, obtained_head = &mut stdout_parse_future => { let obtained_head = obtained_head?; if !obtained_head.is_empty() { httparse::parse_headers(obtained_head, &mut headers)?; } } } } } let mut response_builder = Response::builder(); let mut status_code = 200; for header in headers { if header == EMPTY_HEADER { break; } let mut is_status_header = false; match &header.name.to_lowercase() as &str { "location" => { if !(300..=399).contains(&status_code) { status_code = 302; } } "status" => { is_status_header = true; let header_value_cow = String::from_utf8_lossy(header.value); let mut split_status = header_value_cow.split(" "); let first_part = split_status.next(); if let Some(first_part) = first_part { if first_part.starts_with("HTTP/") { let second_part = split_status.next(); if let Some(second_part) = second_part { if let Ok(parsed_status_code) = second_part.parse::() { status_code = parsed_status_code; } } } else if let Ok(parsed_status_code) = first_part.parse::() { status_code = parsed_status_code; } } } _ => (), } if !is_status_header { response_builder = response_builder.header(header.name, header.value); } } response_builder = response_builder.status(status_code); let reader_stream = ReaderStream::new(cgi_response); let stream_body = StreamBody::new(reader_stream.map_ok(Frame::data)); let boxed_body = stream_body.boxed(); let response = response_builder.body(boxed_body)?; let error_logger = error_logger.clone(); Ok( ResponseData::builder_without_request() .response(response) .parallel_fn(async move { let mut stdin_copied = early_stdin_copied; if !stdin_copied { tokio::select! { biased; _ = &mut stdin_copy_future_pinned => { stdin_copied = true; }, result = &mut stderr_read_future_pinned => { let stderr_vec = result.unwrap_or(vec![]); let stderr_string = String::from_utf8_lossy(stderr_vec.as_slice()).to_string(); if !stderr_string.is_empty() { error_logger .log(&format!("There were FastCGI errors: {}", stderr_string)) .await; } }, } } if stdin_copied { let stderr_vec = stderr_read_future_pinned.await.unwrap_or(vec![]); let stderr_string = String::from_utf8_lossy(stderr_vec.as_slice()).to_string(); if !stderr_string.is_empty() { error_logger .log(&format!("There were FastCGI errors: {}", stderr_string)) .await; } } else { stdin_copy_future_pinned.await.unwrap_or_default(); } }) .build(), ) } async fn connect_tcp( addr: &str, ) -> Result< ( Box, Box, ), tokio::io::Error, > { let socket = TcpStream::connect(addr).await?; socket.set_nodelay(true)?; let (socket_reader_set, socket_writer_set) = tokio::io::split(socket); Ok((Box::new(socket_reader_set), Box::new(socket_writer_set))) } #[allow(dead_code)] #[cfg(unix)] async fn connect_unix( path: &str, ) -> Result< ( Box, Box, ), tokio::io::Error, > { use tokio::net::UnixStream; let socket = UnixStream::connect(path).await?; let (socket_reader_set, socket_writer_set) = tokio::io::split(socket); Ok((Box::new(socket_reader_set), Box::new(socket_writer_set))) } #[allow(dead_code)] #[cfg(not(unix))] async fn connect_unix( _path: &str, ) -> Result< ( Box, Box, ), tokio::io::Error, > { Err(tokio::io::Error::new( tokio::io::ErrorKind::Unsupported, "Unix sockets are not supports on non-Unix platforms.", )) }