victorgeek's picture
Upload folder using huggingface_hub
9552aa0 verified
#[cfg(not(unix))]
compile_error!("This module is supported only on Unix and Unix-like systems.");
use std::collections::HashMap;
use std::error::Error;
use std::io::{BufReader, Read};
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Arc;
use std::thread;
use crate::ferron_common::{
ErrorLogger, HyperRequest, HyperUpgraded, RequestData, ResponseData, ServerConfig, ServerModule,
ServerModuleHandlers, SocketData,
};
use crate::ferron_common::{HyperResponse, WithRuntime};
use crate::ferron_res::server_software::SERVER_SOFTWARE;
use crate::ferron_util::ip_match::ip_match;
use crate::ferron_util::match_hostname::match_hostname;
use crate::ferron_util::match_location::match_location;
use crate::ferron_util::preforked_process_pool::{
read_ipc_message, read_ipc_message_async, write_ipc_message, write_ipc_message_async,
PreforkedProcessPool,
};
use crate::ferron_util::wsgi_load_application::load_wsgi_application;
use crate::ferron_util::wsgid_body_reader::WsgidBodyReader;
use crate::ferron_util::wsgid_error_stream::WsgidErrorStream;
use crate::ferron_util::wsgid_input_stream::WsgidInputStream;
use crate::ferron_util::wsgid_message_structs::{
ProcessPoolToServerMessage, ServerToProcessPoolMessage,
};
use crate::ferron_util::wsgid_structs::{WsgidApplicationLocationWrap, WsgidApplicationWrap};
use async_trait::async_trait;
use futures_util::{StreamExt, TryStreamExt};
use hashlink::LinkedHashMap;
use http::{HeaderMap, HeaderName, HeaderValue, StatusCode};
use http_body_util::{BodyExt, Empty, StreamBody};
use hyper::body::{Bytes, Frame};
use hyper::header;
use hyper::Response;
use hyper_tungstenite::HyperWebsocket;
use interprocess::unnamed_pipe::{Recver, Sender};
use pyo3::exceptions::{PyAssertionError, PyException};
use pyo3::prelude::*;
use pyo3::types::{PyBool, PyCFunction, PyDict, PyIterator, PyString, PyTuple};
//use postcard::{DeOptions, SerOptions};
use tokio::fs;
use tokio::runtime::Handle;
use tokio::sync::Mutex;
struct ResponseHead {
status: u16,
headers: Option<LinkedHashMap<String, Vec<String>>>,
is_set: bool,
is_sent: bool,
}
impl ResponseHead {
fn new() -> Self {
Self {
status: 200,
headers: None,
is_set: false,
is_sent: false,
}
}
}
fn wsgi_pool_fn(tx: Sender, rx: Recver, wsgi_script_path: PathBuf) {
let wsgi_application_result: Result<Py<PyAny>, Box<dyn Error + Send + Sync>> =
load_wsgi_application(wsgi_script_path.as_path(), false);
let mut body_iterators = HashMap::new();
let mut application_id = 0;
let mut wsgi_head = Arc::new(Mutex::new(ResponseHead::new()));
let rx_mutex = Arc::new(Mutex::new(rx));
let tx_mutex = Arc::new(Mutex::new(tx));
loop {
let received_raw_message = match read_ipc_message(&mut rx_mutex.blocking_lock()) {
Ok(message) => message,
Err(_) => break,
};
let received_message =
match postcard::from_bytes::<ServerToProcessPoolMessage>(&received_raw_message) {
Ok(message) => message,
Err(_) => continue,
};
if let Some(error) = (|| -> Result<(), Box<dyn Error + Send + Sync>> {
let wsgi_application = wsgi_application_result
.as_ref()
.map_err(|x| anyhow::anyhow!(x.to_string()))?;
if let Some(environment_variables) = received_message.environment_variables {
wsgi_head = Arc::new(Mutex::new(ResponseHead::new()));
let wsgi_head_clone = wsgi_head.clone();
let tx_mutex_clone = tx_mutex.clone();
let rx_mutex_clone = rx_mutex.clone();
let body_iterator = Python::with_gil(move |py| -> PyResult<Py<PyIterator>> {
let start_response = PyCFunction::new_closure(
py,
None,
None,
move |args: &Bound<'_, PyTuple>, kwargs: Option<&Bound<'_, PyDict>>| -> PyResult<_> {
let args_native = args.extract::<(String, Vec<(String, String)>)>()?;
let exc_info = kwargs.map_or(Ok(None), |kwargs| {
let exc_info = kwargs.get_item("exc_info");
if let Ok(Some(exc_info)) = exc_info {
if exc_info.is_none() {
Ok(None)
} else {
Ok(Some(exc_info))
}
} else {
exc_info
}
})?;
let mut wsgi_head_locked = wsgi_head_clone.blocking_lock();
if let Some(exc_info) = exc_info {
if wsgi_head_locked.is_sent {
let exc_info_tuple = exc_info.downcast::<PyTuple>()?;
let exc_info_exception = exc_info_tuple
.get_item(1)?
.getattr("with_traceback")?
.call((exc_info_tuple.get_item(2)?,), None)?
.downcast::<PyException>()?
.clone();
Err(exc_info_exception)?
}
} else if wsgi_head_locked.is_set {
Err(PyAssertionError::new_err("Headers already set"))?
}
let status_code_string_option = args_native.0.split(" ").next();
if let Some(status_code_string) = status_code_string_option {
wsgi_head_locked.status = status_code_string
.parse()
.map_err(|e: std::num::ParseIntError| anyhow::anyhow!(e))?;
} else {
Err(anyhow::anyhow!("Can't extract status code"))?;
}
let mut header_map: LinkedHashMap<String, Vec<String>> = LinkedHashMap::new();
for header in args_native.1 {
let header_name = header.0.to_lowercase();
let header_value = header.1;
if let Some(header_values) = header_map.get_mut(&header_name) {
header_values.push(header_value);
} else {
header_map.insert(header_name, vec![header_value]);
}
}
wsgi_head_locked.headers = Some(header_map);
wsgi_head_locked.is_set = true;
Ok(())
},
)?;
let mut environment: HashMap<String, Bound<'_, PyAny>> = HashMap::new();
let is_https = environment_variables.contains_key("HTTPS");
let content_length =
if let Some(content_length) = environment_variables.get("CONTENT_LENGTH") {
content_length.parse::<u64>().ok()
} else {
None
};
for (environment_variable, environment_variable_value) in environment_variables {
environment.insert(
environment_variable,
PyString::new(py, &environment_variable_value).into_any(),
);
}
environment.insert(
"wsgi.version".to_string(),
PyTuple::new(py, [1, 0])?.into_any(),
);
environment.insert(
"wsgi.url_scheme".to_string(),
PyString::new(py, if is_https { "https" } else { "http" }).into_any(),
);
environment.insert(
"wsgi.input".to_string(),
(if let Some(content_length) = content_length {
WsgidInputStream::new(
BufReader::new(WsgidBodyReader::new(
tx_mutex_clone.clone(),
rx_mutex_clone.clone(),
))
.take(content_length),
)
} else {
WsgidInputStream::new(BufReader::new(WsgidBodyReader::new(
tx_mutex_clone.clone(),
rx_mutex_clone.clone(),
)))
})
.into_pyobject(py)?
.into_any(),
);
environment.insert(
"wsgi.errors".to_string(),
WsgidErrorStream::new(tx_mutex_clone.clone())
.into_pyobject(py)?
.into_any(),
);
environment.insert(
"wsgi.multithread".to_string(),
PyBool::new(py, false).as_any().clone(),
);
environment.insert(
"wsgi.multiprocess".to_string(),
PyBool::new(py, true).as_any().clone(),
);
environment.insert(
"wsgi.run_once".to_string(),
PyBool::new(py, false).as_any().clone(),
);
let body_unknown = wsgi_application.call(py, (environment, start_response), None)?;
let body_iterator = body_unknown
.downcast_bound::<PyIterator>(py)?
.clone()
.unbind();
Ok(body_iterator)
})?;
let current_application_id = application_id;
body_iterators.insert(current_application_id, Arc::new(body_iterator));
application_id += 1;
write_ipc_message(
&mut tx_mutex.blocking_lock(),
&postcard::to_allocvec::<ProcessPoolToServerMessage>(&ProcessPoolToServerMessage {
application_id: Some(current_application_id),
status_code: None,
headers: None,
body_chunk: None,
error_log_line: None,
error_message: None,
requests_body_chunk: false,
})?,
)?
} else if received_message.requests_body_chunk {
if let Some(application_id) = received_message.application_id {
if let Some(body_iterator_arc) = body_iterators.get(&application_id) {
let wsgi_head_clone = wsgi_head.clone();
let body_iterator_arc_clone = body_iterator_arc.clone();
let body_chunk_result = Python::with_gil(|py| -> PyResult<Option<Vec<u8>>> {
let mut body_iterator_bound = body_iterator_arc_clone.bind(py).clone();
if let Some(body_chunk) = body_iterator_bound.next() {
Ok(Some(body_chunk?.extract::<Vec<u8>>()?))
} else {
Ok(None)
}
});
let body_chunk = (match body_chunk_result {
Err(error) => Err(std::io::Error::other(error)),
Ok(None) => Ok(None),
Ok(Some(chunk)) => {
let wsgi_head_locked = wsgi_head_clone.blocking_lock();
if !wsgi_head_locked.is_set {
Err(std::io::Error::other(
"The \"start_response\" function hasn't been called.",
))
} else {
Ok(Some(chunk))
}
}
})?;
let status_code;
let headers;
let mut wsgi_head_locked = wsgi_head_clone.blocking_lock();
if wsgi_head_locked.is_sent {
status_code = None;
headers = None;
} else {
status_code = Some(wsgi_head_locked.status);
headers = wsgi_head_locked.headers.take();
wsgi_head_locked.is_sent = true;
}
drop(wsgi_head_locked);
if body_chunk.is_none() {
body_iterators.remove(&application_id);
}
write_ipc_message(
&mut tx_mutex.blocking_lock(),
&postcard::to_allocvec::<ProcessPoolToServerMessage>(&ProcessPoolToServerMessage {
application_id: None,
status_code,
headers,
body_chunk,
error_log_line: None,
error_message: None,
requests_body_chunk: false,
})?,
)?
} else {
Err(anyhow::anyhow!("The WSGI request wasn't initialized"))?
}
} else {
Err(anyhow::anyhow!("The WSGI request wasn't initialized"))?
}
}
Ok(())
})()
.err()
{
if write_ipc_message(
&mut tx_mutex.blocking_lock(),
&postcard::to_allocvec::<ProcessPoolToServerMessage>(&ProcessPoolToServerMessage {
application_id: None,
status_code: None,
headers: None,
body_chunk: None,
error_log_line: None,
error_message: Some(error.to_string()),
requests_body_chunk: false,
})
.unwrap_or_default(),
)
.is_err()
{
break;
}
}
}
}
fn init_wsgi_process_pool(
wsgi_script_path: PathBuf,
) -> Result<PreforkedProcessPool, Box<dyn Error + Send + Sync>> {
let available_parallelism = thread::available_parallelism()?.get();
// Safety: The function depends on `nix::unistd::fork`, which is executed before any threads are spawned.
// The forking function is safe to call for single-threaded applications.
unsafe {
PreforkedProcessPool::new(available_parallelism, move |tx, rx| {
let wsgi_script_path_clone = wsgi_script_path.clone();
wsgi_pool_fn(tx, rx, wsgi_script_path_clone)
})
}
}
pub fn server_module_init(
config: &ServerConfig,
) -> Result<Box<dyn ServerModule + Send + Sync>, Box<dyn Error + Send + Sync>> {
let mut global_wsgi_process_pool = None;
let mut host_wsgi_process_pools = Vec::new();
if let Some(wsgi_process_pool_path) = config["global"]["wsgidApplicationPath"].as_str() {
global_wsgi_process_pool = Some(Arc::new(init_wsgi_process_pool(PathBuf::from_str(
wsgi_process_pool_path,
)?)?));
}
let global_wsgi_path = config["global"]["wsgidPath"]
.as_str()
.map(|s| s.to_string());
if let Some(hosts) = config["hosts"].as_vec() {
for host_yaml in hosts.iter() {
let domain = host_yaml["domain"].as_str().map(String::from);
let ip = host_yaml["ip"].as_str().map(String::from);
let mut locations = Vec::new();
if let Some(locations_yaml) = host_yaml["locations"].as_vec() {
for location_yaml in locations_yaml.iter() {
if let Some(path_str) = location_yaml["path"].as_str() {
let path = String::from(path_str);
if let Some(wsgi_process_pool_path) = location_yaml["wsgidApplicationPath"].as_str() {
locations.push(WsgidApplicationLocationWrap::new(
path,
Arc::new(init_wsgi_process_pool(PathBuf::from_str(
wsgi_process_pool_path,
)?)?),
location_yaml["wsgidPath"].as_str().map(|s| s.to_string()),
));
}
}
}
}
if let Some(wsgi_process_pool_path) = host_yaml["wsgidApplicationPath"].as_str() {
host_wsgi_process_pools.push(WsgidApplicationWrap::new(
domain,
ip,
Some(Arc::new(init_wsgi_process_pool(PathBuf::from_str(
wsgi_process_pool_path,
)?)?)),
host_yaml["wsgiPath"].as_str().map(|s| s.to_string()),
locations,
));
} else if !locations.is_empty() {
host_wsgi_process_pools.push(WsgidApplicationWrap::new(
domain,
ip,
None,
host_yaml["wsgiPath"].as_str().map(|s| s.to_string()),
locations,
));
}
}
}
Ok(Box::new(WsgidModule::new(
global_wsgi_process_pool,
global_wsgi_path,
Arc::new(host_wsgi_process_pools),
)))
}
struct WsgidModule {
global_wsgi_process_pool: Option<Arc<PreforkedProcessPool>>,
global_wsgi_path: Option<String>,
host_wsgi_process_pools: Arc<Vec<WsgidApplicationWrap>>,
}
impl WsgidModule {
fn new(
global_wsgi_process_pool: Option<Arc<PreforkedProcessPool>>,
global_wsgi_path: Option<String>,
host_wsgi_process_pools: Arc<Vec<WsgidApplicationWrap>>,
) -> Self {
Self {
global_wsgi_process_pool,
global_wsgi_path,
host_wsgi_process_pools,
}
}
}
impl ServerModule for WsgidModule {
fn get_handlers(&self, handle: Handle) -> Box<dyn ServerModuleHandlers + Send> {
Box::new(WsgidModuleHandlers {
handle,
global_wsgi_process_pool: self.global_wsgi_process_pool.clone(),
global_wsgi_path: self.global_wsgi_path.clone(),
host_wsgi_process_pools: self.host_wsgi_process_pools.clone(),
})
}
}
struct WsgidModuleHandlers {
handle: Handle,
global_wsgi_process_pool: Option<Arc<PreforkedProcessPool>>,
global_wsgi_path: Option<String>,
host_wsgi_process_pools: Arc<Vec<WsgidApplicationWrap>>,
}
#[async_trait]
impl ServerModuleHandlers for WsgidModuleHandlers {
async fn request_handler(
&mut self,
request: RequestData,
config: &ServerConfig,
socket_data: &SocketData,
error_logger: &ErrorLogger,
) -> Result<ResponseData, Box<dyn Error + Send + Sync>> {
WithRuntime::new(self.handle.clone(), async move {
let hyper_request = request.get_hyper_request();
// Use .take() instead of .clone(), since the values in Options will only be used once.
let mut wsgi_process_pool = self.global_wsgi_process_pool.take();
let mut wsgi_path = self.global_wsgi_path.take();
// Should have used a HashMap instead of iterating over an array for better performance...
for host_wsgi_process_pool_wrap in self.host_wsgi_process_pools.iter() {
if match_hostname(
match &host_wsgi_process_pool_wrap.domain {
Some(value) => Some(value as &str),
None => None,
},
match hyper_request.headers().get(header::HOST) {
Some(value) => value.to_str().ok(),
None => None,
},
) && match &host_wsgi_process_pool_wrap.ip {
Some(value) => ip_match(value as &str, socket_data.remote_addr.ip()),
None => true,
} {
wsgi_process_pool = host_wsgi_process_pool_wrap.wsgi_process_pool.clone();
wsgi_path = host_wsgi_process_pool_wrap.wsgi_path.clone();
if let Ok(path_decoded) = urlencoding::decode(
request
.get_original_url()
.unwrap_or(request.get_hyper_request().uri())
.path(),
) {
for location_wrap in host_wsgi_process_pool_wrap.locations.iter() {
if match_location(&location_wrap.path, &path_decoded) {
wsgi_process_pool = Some(location_wrap.wsgi_process_pool.clone());
wsgi_path = location_wrap.wsgi_path.clone();
break;
}
}
}
break;
}
}
let request_path = hyper_request.uri().path();
let mut request_path_bytes = request_path.bytes();
if request_path_bytes.len() < 1 || request_path_bytes.nth(0) != Some(b'/') {
return Ok(
ResponseData::builder(request)
.status(StatusCode::BAD_REQUEST)
.build(),
);
}
if let Some(wsgi_process_pool) = wsgi_process_pool {
let wsgi_path = wsgi_path.unwrap_or("/".to_string());
let mut canonical_wsgi_path: &str = &wsgi_path;
if canonical_wsgi_path.bytes().last() == Some(b'/') {
canonical_wsgi_path = &canonical_wsgi_path[..(canonical_wsgi_path.len() - 1)];
}
let request_path_with_slashes = match request_path == canonical_wsgi_path {
true => format!("{}/", request_path),
false => request_path.to_string(),
};
if let Some(stripped_request_path) =
request_path_with_slashes.strip_prefix(canonical_wsgi_path)
{
let wwwroot_yaml = &config["wwwroot"];
let wwwroot = wwwroot_yaml.as_str().unwrap_or("/nonexistent");
let wwwroot_unknown = PathBuf::from(wwwroot);
let wwwroot_pathbuf = match wwwroot_unknown.as_path().is_absolute() {
true => wwwroot_unknown,
false => match fs::canonicalize(&wwwroot_unknown).await {
Ok(pathbuf) => pathbuf,
Err(_) => wwwroot_unknown,
},
};
let wwwroot = wwwroot_pathbuf.as_path();
let mut relative_path = &request_path[1..];
while relative_path.as_bytes().first().copied() == Some(b'/') {
relative_path = &relative_path[1..];
}
let decoded_relative_path = match urlencoding::decode(relative_path) {
Ok(path) => path.to_string(),
Err(_) => {
return Ok(
ResponseData::builder(request)
.status(StatusCode::BAD_REQUEST)
.build(),
);
}
};
let joined_pathbuf = wwwroot.join(decoded_relative_path);
let execute_pathbuf = joined_pathbuf;
let execute_path_info = stripped_request_path
.strip_prefix("/")
.map(|s| s.to_string());
return execute_wsgi_with_environment_variables(
request,
socket_data,
error_logger,
wwwroot,
execute_pathbuf,
execute_path_info,
config["serverAdministratorEmail"].as_str(),
wsgi_process_pool,
)
.await;
}
}
Ok(ResponseData::builder(request).build())
})
.await
}
async fn proxy_request_handler(
&mut self,
request: RequestData,
_config: &ServerConfig,
_socket_data: &SocketData,
_error_logger: &ErrorLogger,
) -> Result<ResponseData, Box<dyn Error + Send + Sync>> {
Ok(ResponseData::builder(request).build())
}
async fn response_modifying_handler(
&mut self,
response: HyperResponse,
) -> Result<HyperResponse, Box<dyn Error + Send + Sync>> {
Ok(response)
}
async fn proxy_response_modifying_handler(
&mut self,
response: HyperResponse,
) -> Result<HyperResponse, Box<dyn Error + Send + Sync>> {
Ok(response)
}
async fn connect_proxy_request_handler(
&mut self,
_upgraded_request: HyperUpgraded,
_connect_address: &str,
_config: &ServerConfig,
_socket_data: &SocketData,
_error_logger: &ErrorLogger,
) -> Result<(), Box<dyn Error + Send + Sync>> {
Ok(())
}
fn does_connect_proxy_requests(&mut self) -> bool {
false
}
async fn websocket_request_handler(
&mut self,
_websocket: HyperWebsocket,
_uri: &hyper::Uri,
_config: &ServerConfig,
_socket_data: &SocketData,
_error_logger: &ErrorLogger,
) -> Result<(), Box<dyn Error + Send + Sync>> {
Ok(())
}
fn does_websocket_requests(&mut self, _config: &ServerConfig, _socket_data: &SocketData) -> bool {
false
}
}
struct ResponseHeadHyper {
status: StatusCode,
headers: Option<HeaderMap>,
}
impl ResponseHeadHyper {
fn new() -> Self {
Self {
status: StatusCode::OK,
headers: None,
}
}
}
#[allow(clippy::too_many_arguments)]
async fn execute_wsgi_with_environment_variables(
request: RequestData,
socket_data: &SocketData,
error_logger: &ErrorLogger,
wwwroot: &Path,
execute_pathbuf: PathBuf,
path_info: Option<String>,
server_administrator_email: Option<&str>,
wsgi_process_pool: Arc<PreforkedProcessPool>,
) -> Result<ResponseData, Box<dyn Error + Send + Sync>> {
let mut environment_variables: LinkedHashMap<String, String> = LinkedHashMap::new();
let hyper_request = request.get_hyper_request();
let original_request_uri = request.get_original_url().unwrap_or(hyper_request.uri());
if let Some(auth_user) = request.get_auth_user() {
if let Some(authorization) = hyper_request.headers().get(header::AUTHORIZATION) {
let authorization_value = String::from_utf8_lossy(authorization.as_bytes()).to_string();
let mut authorization_value_split = authorization_value.split(" ");
if let Some(authorization_type) = authorization_value_split.next() {
environment_variables.insert("AUTH_TYPE".to_string(), authorization_type.to_string());
}
}
environment_variables.insert("REMOTE_USER".to_string(), auth_user.to_string());
}
environment_variables.insert(
"QUERY_STRING".to_string(),
match hyper_request.uri().query() {
Some(query) => query.to_string(),
None => "".to_string(),
},
);
environment_variables.insert("SERVER_SOFTWARE".to_string(), SERVER_SOFTWARE.to_string());
environment_variables.insert(
"SERVER_PROTOCOL".to_string(),
match hyper_request.version() {
hyper::Version::HTTP_09 => "HTTP/0.9".to_string(),
hyper::Version::HTTP_10 => "HTTP/1.0".to_string(),
hyper::Version::HTTP_11 => "HTTP/1.1".to_string(),
hyper::Version::HTTP_2 => "HTTP/2.0".to_string(),
hyper::Version::HTTP_3 => "HTTP/3.0".to_string(),
_ => "HTTP/Unknown".to_string(),
},
);
environment_variables.insert(
"SERVER_PORT".to_string(),
socket_data.local_addr.port().to_string(),
);
environment_variables.insert(
"SERVER_ADDR".to_string(),
socket_data.local_addr.ip().to_canonical().to_string(),
);
if let Some(server_administrator_email) = server_administrator_email {
environment_variables.insert(
"SERVER_ADMIN".to_string(),
server_administrator_email.to_string(),
);
}
if let Some(host) = hyper_request.headers().get(header::HOST) {
environment_variables.insert(
"SERVER_NAME".to_string(),
String::from_utf8_lossy(host.as_bytes()).to_string(),
);
}
environment_variables.insert(
"DOCUMENT_ROOT".to_string(),
wwwroot.to_string_lossy().to_string(),
);
environment_variables.insert(
"PATH_INFO".to_string(),
match &path_info {
Some(path_info) => format!("/{}", path_info),
None => "".to_string(),
},
);
environment_variables.insert(
"PATH_TRANSLATED".to_string(),
match &path_info {
Some(path_info) => {
let mut path_translated = execute_pathbuf.clone();
path_translated.push(path_info);
path_translated.to_string_lossy().to_string()
}
None => "".to_string(),
},
);
environment_variables.insert(
"REQUEST_METHOD".to_string(),
hyper_request.method().to_string(),
);
environment_variables.insert("GATEWAY_INTERFACE".to_string(), "CGI/1.1".to_string());
environment_variables.insert(
"REQUEST_URI".to_string(),
format!(
"{}{}",
original_request_uri.path(),
match original_request_uri.query() {
Some(query) => format!("?{}", query),
None => String::from(""),
}
),
);
environment_variables.insert(
"REMOTE_PORT".to_string(),
socket_data.remote_addr.port().to_string(),
);
environment_variables.insert(
"REMOTE_ADDR".to_string(),
socket_data.remote_addr.ip().to_canonical().to_string(),
);
environment_variables.insert(
"SCRIPT_FILENAME".to_string(),
execute_pathbuf.to_string_lossy().to_string(),
);
if let Ok(script_path) = execute_pathbuf.as_path().strip_prefix(wwwroot) {
environment_variables.insert(
"SCRIPT_NAME".to_string(),
format!(
"/{}",
match cfg!(windows) {
true => script_path.to_string_lossy().to_string().replace("\\", "/"),
false => script_path.to_string_lossy().to_string(),
}
),
);
}
if socket_data.encrypted {
environment_variables.insert("HTTPS".to_string(), "ON".to_string());
}
let mut content_length_set = false;
for (header_name, header_value) in hyper_request.headers().iter() {
let env_header_name = match *header_name {
header::CONTENT_LENGTH => {
content_length_set = true;
"CONTENT_LENGTH".to_string()
}
header::CONTENT_TYPE => "CONTENT_TYPE".to_string(),
_ => {
let mut result = String::new();
result.push_str("HTTP_");
for c in header_name.as_str().to_uppercase().chars() {
if c.is_alphanumeric() {
result.push(c);
} else {
result.push('_');
}
}
result
}
};
if environment_variables.contains_key(&env_header_name) {
let value = environment_variables.get_mut(&env_header_name);
if let Some(value) = value {
if env_header_name == "HTTP_COOKIE" {
value.push_str("; ");
} else {
// See https://stackoverflow.com/a/1801191
value.push_str(", ");
}
value.push_str(String::from_utf8_lossy(header_value.as_bytes()).as_ref());
} else {
environment_variables.insert(
env_header_name,
String::from_utf8_lossy(header_value.as_bytes()).to_string(),
);
}
} else {
environment_variables.insert(
env_header_name,
String::from_utf8_lossy(header_value.as_bytes()).to_string(),
);
}
}
if !content_length_set {
environment_variables.insert("CONTENT_LENGTH".to_string(), "0".to_string());
}
let (hyper_request, _, _) = request.into_parts();
execute_wsgi(
hyper_request,
error_logger,
wsgi_process_pool,
environment_variables,
)
.await
}
async fn execute_wsgi(
hyper_request: HyperRequest,
error_logger: &ErrorLogger,
wsgi_process_pool: Arc<PreforkedProcessPool>,
environment_variables: LinkedHashMap<String, String>,
) -> Result<ResponseData, Box<dyn Error + Send + Sync>> {
let ipc_mutex = wsgi_process_pool
.obtain_process_with_init_async_ipc()
.await?;
let (_, body) = hyper_request.into_parts();
let mut body_stream = body.into_data_stream().map_err(std::io::Error::other);
let application_id = {
let (tx, rx) = &mut *ipc_mutex.lock().await;
write_ipc_message_async(
tx,
&postcard::to_allocvec(&ServerToProcessPoolMessage {
application_id: None,
environment_variables: Some(environment_variables),
body_chunk: None,
body_error_message: None,
requests_body_chunk: false,
})?,
)
.await?;
let application_id;
loop {
let received_message =
postcard::from_bytes::<ProcessPoolToServerMessage>(&read_ipc_message_async(rx).await?)?;
if let Some(error_message) = received_message.error_message {
Err(anyhow::anyhow!(error_message))?
}
if let Some(application_id_obtained) = received_message.application_id {
application_id = application_id_obtained;
break;
}
if let Some(error_log_line) = received_message.error_log_line {
error_logger.log(&error_log_line).await;
} else if received_message.requests_body_chunk {
let body_chunk;
let body_error_message;
match body_stream.next().await {
None => {
body_chunk = None;
body_error_message = None;
}
Some(Err(err)) => {
body_chunk = None;
body_error_message = Some(err.to_string());
}
Some(Ok(chunk)) => {
body_chunk = Some(chunk.to_vec());
body_error_message = None;
}
};
write_ipc_message_async(
tx,
&postcard::to_allocvec(&ServerToProcessPoolMessage {
application_id: None,
environment_variables: None,
body_chunk,
body_error_message,
requests_body_chunk: false,
})?,
)
.await?;
}
}
application_id
};
let wsgi_head = Arc::new(Mutex::new(ResponseHeadHyper::new()));
let wsgi_head_clone = wsgi_head.clone();
let error_logger_arc = Arc::new(error_logger.clone());
let body_stream_mutex = Arc::new(Mutex::new(body_stream));
let mut response_stream = futures_util::stream::unfold(ipc_mutex, move |ipc_mutex| {
let wsgi_head_clone = wsgi_head_clone.clone();
let error_logger_arc_clone = error_logger_arc.clone();
let body_stream_mutex_clone = body_stream_mutex.clone();
Box::pin(async move {
let ipc_mutex_borrowed = &ipc_mutex;
let chunk_result: Result<Option<Bytes>, Box<dyn Error + Send + Sync>> = async {
let (tx, rx) = &mut *ipc_mutex_borrowed.lock().await;
write_ipc_message_async(
tx,
&postcard::to_allocvec(&ServerToProcessPoolMessage {
application_id: Some(application_id),
environment_variables: None,
body_chunk: None,
body_error_message: None,
requests_body_chunk: true,
})?,
)
.await?;
loop {
let received_message =
postcard::from_bytes::<ProcessPoolToServerMessage>(&read_ipc_message_async(rx).await?)?;
if let Some(error_message) = received_message.error_message {
Err(anyhow::anyhow!(error_message))?
} else if let Some(body_chunk) = received_message.body_chunk {
if let Some(status_code) = received_message.status_code {
let mut wsgi_head_locked = wsgi_head_clone.lock().await;
wsgi_head_locked.status = StatusCode::from_u16(status_code)?;
if let Some(headers) = received_message.headers {
let mut header_map = HeaderMap::new();
for (key, value) in headers {
for value in value {
header_map.append(
HeaderName::from_str(&key)?,
HeaderValue::from_bytes(value.as_bytes())?,
);
}
}
wsgi_head_locked.headers = Some(header_map);
}
}
return Ok(Some(Bytes::from(body_chunk)));
} else if let Some(error_log_line) = received_message.error_log_line {
error_logger_arc_clone.log(&error_log_line).await;
} else if received_message.requests_body_chunk {
let body_chunk;
let body_error_message;
match body_stream_mutex_clone.lock().await.next().await {
None => {
body_chunk = None;
body_error_message = None;
}
Some(Err(err)) => {
body_chunk = None;
body_error_message = Some(err.to_string());
}
Some(Ok(chunk)) => {
body_chunk = Some(chunk.to_vec());
body_error_message = None;
}
};
write_ipc_message_async(
tx,
&postcard::to_allocvec(&ServerToProcessPoolMessage {
application_id: None,
environment_variables: None,
body_chunk,
body_error_message,
requests_body_chunk: false,
})?,
)
.await?;
} else {
return Ok(None);
}
}
}
.await;
match chunk_result {
Err(error) => Some((Err(std::io::Error::other(error.to_string())), ipc_mutex)),
Ok(None) => None,
Ok(Some(chunk)) => Some((Ok(chunk), ipc_mutex)),
}
})
});
let first_chunk = response_stream.next().await;
let response_body = if let Some(Err(first_chunk_error)) = first_chunk {
Err(first_chunk_error)?
} else if let Some(Ok(first_chunk)) = first_chunk {
let response_stream_first_item = futures_util::stream::once(async move { Ok(first_chunk) });
let response_stream_combined = response_stream_first_item.chain(response_stream);
let stream_body = StreamBody::new(response_stream_combined.map_ok(Frame::data));
BodyExt::boxed(stream_body)
} else {
BodyExt::boxed(Empty::new().map_err(|e| match e {}))
};
let mut wsgi_head_locked = wsgi_head.lock().await;
let mut hyper_response = Response::new(response_body);
*hyper_response.status_mut() = wsgi_head_locked.status;
if let Some(headers) = wsgi_head_locked.headers.take() {
*hyper_response.headers_mut() = headers;
}
Ok(
ResponseData::builder_without_request()
.response(hyper_response)
.build(),
)
}