| """ |
| Multilayer Prefabricated Vertical Drain (PVD) Consolidation Analysis |
| Calculates settlement vs time using finite difference method |
| """ |
|
|
| import numpy as np |
| import matplotlib.pyplot as plt |
| import yaml |
| import argparse |
| import os |
| from dataclasses import dataclass |
| from typing import List, Tuple, Dict, Any |
|
|
|
|
| @dataclass |
| class SoilLayer: |
| """Soil layer properties""" |
|
|
| thickness: float |
| Cv: float |
| Ch: float |
| RR: float |
| CR: float |
| sigma_ini: float |
| sigma_p: float |
| kh: float |
| ks: float |
|
|
|
|
| @dataclass |
| class PVDProperties: |
| """PVD installation properties""" |
|
|
| dw: float |
| ds: float |
| De: float |
| L_drain: float |
| qw: float |
|
|
|
|
| class PVDConsolidation: |
| """ |
| Multilayer PVD consolidation analysis using finite difference method |
| """ |
|
|
| def __init__( |
| self, |
| soil_layers: List[SoilLayer], |
| pvd: PVDProperties, |
| surcharge: float, |
| dt: float = 0.01, |
| ): |
| """ |
| Initialize PVD consolidation analysis |
| |
| Parameters: |
| ----------- |
| soil_layers : List[SoilLayer] |
| List of soil layers from top to bottom |
| pvd : PVDProperties |
| PVD installation properties |
| surcharge : float |
| Applied surcharge load (kPa) |
| dt : float |
| Time step for finite difference (years) |
| """ |
| self.layers = soil_layers |
| self.pvd = pvd |
| self.surcharge = surcharge |
| self.dt = dt |
|
|
| |
| self.total_thickness = sum(layer.thickness for layer in soil_layers) |
|
|
| |
| self._setup_mesh() |
|
|
| def _setup_mesh(self, nodes_per_meter: int = 10): |
| """Setup finite difference mesh""" |
| self.nodes_per_meter = nodes_per_meter |
|
|
| |
| self.z_coords = [] |
| self.layer_indices = [] |
| self.Cv_profile = [] |
| self.Ch_profile = [] |
| self.kh_profile = [] |
| self.ks_profile = [] |
|
|
| z = 0 |
| for i, layer in enumerate(self.layers): |
| n_nodes = max(int(layer.thickness * nodes_per_meter), 2) |
| z_layer = np.linspace(z, z + layer.thickness, n_nodes, endpoint=False) |
|
|
| self.z_coords.extend(z_layer) |
| self.layer_indices.extend([i] * len(z_layer)) |
| self.Cv_profile.extend([layer.Cv] * len(z_layer)) |
| self.Ch_profile.extend([layer.Ch] * len(z_layer)) |
| self.kh_profile.extend([layer.kh] * len(z_layer)) |
| self.ks_profile.extend([layer.ks] * len(z_layer)) |
|
|
| z += layer.thickness |
|
|
| |
| self.z_coords.append(self.total_thickness) |
| self.layer_indices.append(len(self.layers) - 1) |
| self.Cv_profile.append(self.layers[-1].Cv) |
| self.Ch_profile.append(self.layers[-1].Ch) |
| self.kh_profile.append(self.layers[-1].kh) |
| self.ks_profile.append(self.layers[-1].ks) |
|
|
| self.z_coords = np.array(self.z_coords) |
| self.layer_indices = np.array(self.layer_indices) |
| self.Cv_profile = np.array(self.Cv_profile) |
| self.Ch_profile = np.array(self.Ch_profile) |
| self.kh_profile = np.array(self.kh_profile) |
| self.ks_profile = np.array(self.ks_profile) |
|
|
| self.n_nodes = len(self.z_coords) |
| self.dz = np.diff(self.z_coords) |
|
|
| def calculate_pvd_factors_layer(self, layer_idx: int) -> Tuple[float, float, float]: |
| """ |
| Calculate PVD influence factors for a specific layer |
| |
| Parameters: |
| ----------- |
| layer_idx : int |
| Layer index |
| |
| Returns: |
| -------- |
| Fn, Fs, Fr : float |
| Geometric, smear, and well resistance factors for the layer |
| """ |
| layer = self.layers[layer_idx] |
|
|
| |
| n = self.pvd.De / self.pvd.dw |
| Fn = (n**2 / (n**2 - 1)) * np.log(n) - 3 / 4 + 1 / n**2 |
|
|
| |
| s = self.pvd.ds / self.pvd.dw |
| Fs = ((layer.kh / layer.ks) - 1) * np.log(s) |
|
|
| |
| L = self.pvd.L_drain |
| if self.pvd.qw > 1e10: |
| Fr = 0.0 |
| else: |
| Fr = (np.pi * L**2 * layer.kh) / (8 * self.pvd.qw) |
|
|
| return Fn, Fs, Fr |
|
|
| def calculate_pvd_factors(self) -> Tuple[float, float, float]: |
| """ |
| Calculate PVD influence factors using weighted average permeabilities |
| |
| Returns: |
| -------- |
| Fn, Fs, Fr : float |
| Geometric, smear, and well resistance factors |
| """ |
| |
| kh_avg = np.average(self.kh_profile, weights=np.ones(len(self.kh_profile))) |
| ks_avg = np.average(self.ks_profile, weights=np.ones(len(self.ks_profile))) |
|
|
| |
| n = self.pvd.De / self.pvd.dw |
|
|
| |
| Fn = (n**2 / (n**2 - 1)) * np.log(n) - 3 / 4 + 1 / n**2 |
|
|
| |
| s = self.pvd.ds / self.pvd.dw |
| Fs = ((kh_avg / ks_avg) - 1) * np.log(s) |
|
|
| |
| |
| |
| L = self.pvd.L_drain |
| if self.pvd.qw > 1e10: |
| Fr = 0.0 |
| else: |
| |
| |
| Fr = (np.pi * L**2 * kh_avg) / (8 * self.pvd.qw) |
|
|
| return Fn, Fs, Fr |
|
|
| def calculate_Uh(self, t: float) -> np.ndarray: |
| """ |
| Calculate degree of consolidation in horizontal direction |
| using finite difference method with layer-specific PVD factors |
| Only applies to layers within drain length L |
| |
| Parameters: |
| ----------- |
| t : float |
| Time (years) |
| |
| Returns: |
| -------- |
| Uh : ndarray |
| Degree of horizontal consolidation at each node |
| (0 for nodes beyond drain length) |
| """ |
| |
| F_total_layers = [] |
| for layer_idx in range(len(self.layers)): |
| Fn, Fs, Fr = self.calculate_pvd_factors_layer(layer_idx) |
| F_total_layers.append(Fn + Fs + Fr) |
|
|
| |
| u = np.ones(self.n_nodes) |
| u_history = [u.copy()] |
|
|
| |
| n_steps = int(t / self.dt) |
|
|
| for step in range(n_steps): |
| u_new = u.copy() |
|
|
| |
| |
| |
|
|
| for i in range(1, self.n_nodes - 1): |
| |
| depth = self.z_coords[i] |
|
|
| if depth <= self.pvd.L_drain: |
| |
| Ch = self.Ch_profile[i] |
| layer_idx = self.layer_indices[i] |
| F_total = F_total_layers[layer_idx] |
|
|
| |
| |
| Th = Ch * self.dt / (self.pvd.De**2 / 4) |
|
|
| |
| decay_rate = 8 * Th / F_total |
| u_new[i] = u[i] * np.exp(-decay_rate) |
| else: |
| |
| |
| pass |
|
|
| |
| u_new[0] = 0 |
| u_new[-1] = 0 |
|
|
| u = u_new |
|
|
| if step % max(1, n_steps // 100) == 0: |
| u_history.append(u.copy()) |
|
|
| |
| Uh = 1 - u |
|
|
| return Uh |
|
|
| def calculate_Uv(self, t: float) -> np.ndarray: |
| """ |
| Calculate degree of consolidation in vertical direction |
| |
| Parameters: |
| ----------- |
| t : float |
| Time (years) |
| |
| Returns: |
| -------- |
| Uv : ndarray |
| Degree of vertical consolidation at each node |
| """ |
| Uv = np.zeros(self.n_nodes) |
|
|
| for i in range(self.n_nodes): |
| z = self.z_coords[i] |
| Cv = self.Cv_profile[i] |
| H = self.total_thickness |
|
|
| |
| Tv = Cv * t / H**2 |
|
|
| |
| if Tv < 0.217: |
| Uv[i] = np.sqrt(4 * Tv / np.pi) |
| else: |
| Uv[i] = 1 - (8 / np.pi**2) * np.exp(-(np.pi**2) * Tv / 4) |
|
|
| Uv[i] = min(Uv[i], 1.0) |
|
|
| return Uv |
|
|
| def calculate_total_U(self, t: float) -> np.ndarray: |
| """ |
| Calculate total degree of consolidation (combined vertical and horizontal) |
| |
| For nodes within drain length: U = 1 - (1 - Uh)(1 - Uv) |
| For nodes beyond drain length: U = Uv (only vertical consolidation) |
| |
| Parameters: |
| ----------- |
| t : float |
| Time (years) |
| |
| Returns: |
| -------- |
| U : ndarray |
| Total degree of consolidation at each node |
| """ |
| Uh = self.calculate_Uh(t) |
| Uv = self.calculate_Uv(t) |
|
|
| U = np.zeros(self.n_nodes) |
|
|
| for i in range(self.n_nodes): |
| depth = self.z_coords[i] |
|
|
| if depth <= self.pvd.L_drain: |
| |
| U[i] = 1 - (1 - Uh[i]) * (1 - Uv[i]) |
| else: |
| |
| U[i] = Uv[i] |
|
|
| return U |
|
|
| def calculate_settlement(self, t: float) -> Tuple[float, np.ndarray]: |
| """ |
| Calculate total settlement at time t |
| |
| Parameters: |
| ----------- |
| t : float |
| Time (years) |
| |
| Returns: |
| -------- |
| total_settlement : float |
| Total settlement (m) |
| layer_settlements : ndarray |
| Settlement for each layer (m) |
| """ |
| U = self.calculate_total_U(t) |
|
|
| layer_settlements = np.zeros(len(self.layers)) |
|
|
| for i, layer in enumerate(self.layers): |
| |
| layer_mask = self.layer_indices == i |
| U_layer = np.mean(U[layer_mask]) |
|
|
| |
| sigma_final = layer.sigma_ini + self.surcharge |
|
|
| |
| if sigma_final <= layer.sigma_p: |
| |
| Sc_final = ( |
| layer.RR * np.log10(sigma_final / layer.sigma_ini) * layer.thickness |
| ) |
| else: |
| if layer.sigma_ini >= layer.sigma_p: |
| |
| Sc_final = ( |
| layer.CR |
| * np.log10(sigma_final / layer.sigma_ini) |
| * layer.thickness |
| ) |
| else: |
| |
| Sc_recomp = ( |
| layer.RR |
| * np.log10(layer.sigma_p / layer.sigma_ini) |
| * layer.thickness |
| ) |
| Sc_virgin = ( |
| layer.CR |
| * np.log10(sigma_final / layer.sigma_p) |
| * layer.thickness |
| ) |
| Sc_final = Sc_recomp + Sc_virgin |
|
|
| |
| layer_settlements[i] = U_layer * Sc_final |
|
|
| total_settlement = np.sum(layer_settlements) |
|
|
| return total_settlement, layer_settlements |
|
|
| def settlement_vs_time( |
| self, t_max: float, n_points: int = 100 |
| ) -> Tuple[np.ndarray, np.ndarray]: |
| """ |
| Calculate settlement vs time curve |
| |
| Parameters: |
| ----------- |
| t_max : float |
| Maximum time (years) |
| n_points : int |
| Number of time points |
| |
| Returns: |
| -------- |
| time : ndarray |
| Time array (years) |
| settlement : ndarray |
| Settlement array (m) |
| """ |
| time = np.linspace(0, t_max, n_points) |
| settlement = np.zeros(n_points) |
|
|
| for i, t in enumerate(time): |
| if t == 0: |
| settlement[i] = 0 |
| else: |
| settlement[i], _ = self.calculate_settlement(t) |
|
|
| return time, settlement |
|
|
| def plot_settlement_vs_time( |
| self, t_max: float, n_points: int = 100, save_path: str = None |
| ): |
| """ |
| Plot settlement vs time curve |
| |
| Parameters: |
| ----------- |
| t_max : float |
| Maximum time (years) |
| n_points : int |
| Number of time points |
| save_path : str, optional |
| Path to save the plot |
| """ |
| time, settlement = self.settlement_vs_time(t_max, n_points) |
|
|
| |
| settlement_mm = settlement * 1000 |
|
|
| plt.figure(figsize=(10, 6)) |
| plt.plot(time, settlement_mm, "b-", linewidth=2) |
| plt.xlabel("Time (years)", fontsize=12) |
| plt.ylabel("Settlement (mm)", fontsize=12) |
| plt.title( |
| "Settlement vs Time - PVD Consolidation", fontsize=14, fontweight="bold" |
| ) |
| plt.grid(True, alpha=0.3) |
| plt.tight_layout() |
|
|
| if save_path: |
| plt.savefig(save_path, dpi=300, bbox_inches="tight") |
|
|
| plt.show() |
|
|
| return time, settlement_mm |
|
|
| def plot_degree_of_consolidation(self, t: float, save_path: str = None): |
| """ |
| Plot degree of consolidation profile at time t |
| |
| Parameters: |
| ----------- |
| t : float |
| Time (years) |
| save_path : str, optional |
| Path to save the plot |
| """ |
| Uh = self.calculate_Uh(t) |
| Uv = self.calculate_Uv(t) |
| U = self.calculate_total_U(t) |
|
|
| fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6)) |
|
|
| |
| ax1.plot(Uh * 100, self.z_coords, "r-", linewidth=2, label="Horizontal (Uh)") |
| ax1.plot(Uv * 100, self.z_coords, "b-", linewidth=2, label="Vertical (Uv)") |
| ax1.plot(U * 100, self.z_coords, "g-", linewidth=2, label="Total (U)") |
|
|
| |
| ax1.axhline( |
| y=self.pvd.L_drain, |
| color="orange", |
| linestyle="--", |
| linewidth=1.5, |
| alpha=0.7, |
| label=f"Drain Length = {self.pvd.L_drain:.1f} m", |
| ) |
|
|
| ax1.set_xlabel("Degree of Consolidation (%)", fontsize=12) |
| ax1.set_ylabel("Depth (m)", fontsize=12) |
| ax1.set_title( |
| f"Consolidation Profile at t = {t:.2f} years", |
| fontsize=12, |
| fontweight="bold", |
| ) |
| ax1.invert_yaxis() |
| ax1.grid(True, alpha=0.3) |
| ax1.legend() |
|
|
| |
| u = 1 - U |
| ax2.plot(u * self.surcharge, self.z_coords, "k-", linewidth=2) |
| ax2.set_xlabel("Excess Pore Pressure (kPa)", fontsize=12) |
| ax2.set_ylabel("Depth (m)", fontsize=12) |
| ax2.set_title( |
| f"Excess Pore Pressure at t = {t:.2f} years", fontsize=12, fontweight="bold" |
| ) |
| ax2.invert_yaxis() |
| ax2.grid(True, alpha=0.3) |
|
|
| plt.tight_layout() |
|
|
| if save_path: |
| plt.savefig(save_path, dpi=300, bbox_inches="tight") |
|
|
| plt.show() |
|
|
| def get_summary_report(self, t_check: List[float]) -> str: |
| """ |
| Generate summary report for specified time points |
| |
| Parameters: |
| ----------- |
| t_check : List[float] |
| Time points to check (years) |
| |
| Returns: |
| -------- |
| report : str |
| Summary report |
| """ |
| Fn, Fs, Fr = self.calculate_pvd_factors() |
|
|
| report = "=" * 70 + "\n" |
| report += "PVD CONSOLIDATION ANALYSIS SUMMARY\n" |
| report += "=" * 70 + "\n\n" |
|
|
| report += "PVD PARAMETERS:\n" |
| report += f" Equivalent drain diameter (dw): {self.pvd.dw:.3f} m\n" |
| report += f" Smear zone diameter (ds): {self.pvd.ds:.3f} m\n" |
| report += f" Unit cell diameter (De): {self.pvd.De:.3f} m\n" |
| report += f" Drain length (L): {self.pvd.L_drain:.2f} m\n" |
| report += ( |
| f" Drain spacing ratio (n = De/dw): {self.pvd.De / self.pvd.dw:.2f}\n" |
| ) |
| report += f" Geometric factor (Fn): {Fn:.4f}\n" |
| report += f" Smear factor (Fs - avg): {Fs:.4f}\n" |
| report += f" Well resistance (Fr - avg): {Fr:.4f}\n" |
| report += f" Total resistance (F - avg): {Fn + Fs + Fr:.4f}\n\n" |
|
|
| report += "SOIL PROFILE:\n" |
| cumulative_depth = 0 |
| for i, layer in enumerate(self.layers): |
| depth_top = cumulative_depth |
| depth_bottom = cumulative_depth + layer.thickness |
|
|
| |
| if depth_bottom <= self.pvd.L_drain: |
| drain_status = "Full PVD effect (Uh + Uv)" |
| elif depth_top >= self.pvd.L_drain: |
| drain_status = "No PVD effect (Uv only)" |
| else: |
| drain_status = "Partial PVD effect" |
|
|
| report += f" Layer {i + 1} ({depth_top:.1f}m - {depth_bottom:.1f}m): {drain_status}\n" |
| report += f" Thickness: {layer.thickness:.2f} m\n" |
| report += f" Ch: {layer.Ch:.4f} m²/year\n" |
| report += f" Cv: {layer.Cv:.4f} m²/year\n" |
| report += f" kh: {layer.kh:.4f} m/year\n" |
| report += f" ks: {layer.ks:.4f} m/year\n" |
| report += f" RR: {layer.RR:.4f}\n" |
| report += f" CR: {layer.CR:.4f}\n" |
| report += f" σ'ini: {layer.sigma_ini:.1f} kPa\n" |
| report += f" σ'p: {layer.sigma_p:.1f} kPa\n\n" |
|
|
| cumulative_depth = depth_bottom |
|
|
| report += f"Applied surcharge: {self.surcharge:.1f} kPa\n\n" |
| report += "=" * 70 + "\n" |
| report += "SETTLEMENT vs TIME:\n" |
| report += "=" * 70 + "\n" |
| report += f"{'Time (years)':<15} {'Settlement (mm)':<20} {'U (%)':<15}\n" |
| report += "-" * 70 + "\n" |
|
|
| for t in t_check: |
| settlement, _ = self.calculate_settlement(t) |
| U = self.calculate_total_U(t) |
| U_avg = np.mean(U) |
| report += f"{t:<15.2f} {settlement * 1000:<20.2f} {U_avg * 100:<15.1f}\n" |
|
|
| report += "=" * 70 + "\n" |
|
|
| return report |
|
|
|
|
| def example_usage(): |
| """Example usage of PVD consolidation analysis""" |
|
|
| |
| layers = [ |
| SoilLayer( |
| thickness=5.0, |
| Cv=0.5, |
| Ch=1.5, |
| RR=0.05, |
| CR=0.30, |
| sigma_ini=50.0, |
| sigma_p=80.0, |
| kh=2.0, |
| ks=1.0, |
| ), |
| SoilLayer( |
| thickness=8.0, |
| Cv=0.3, |
| Ch=1.0, |
| RR=0.04, |
| CR=0.35, |
| sigma_ini=90.0, |
| sigma_p=90.0, |
| kh=1.5, |
| ks=0.75, |
| ), |
| SoilLayer( |
| thickness=7.0, |
| Cv=0.4, |
| Ch=1.2, |
| RR=0.045, |
| CR=0.32, |
| sigma_ini=140.0, |
| sigma_p=150.0, |
| kh=1.8, |
| ks=0.9, |
| ), |
| ] |
|
|
| |
| pvd = PVDProperties( |
| dw=0.05, |
| ds=0.15, |
| De=1.5, |
| L_drain=20.0, |
| qw=100.0, |
| ) |
|
|
| |
| surcharge = 100.0 |
|
|
| |
| analysis = PVDConsolidation(layers, pvd, surcharge, dt=0.01) |
|
|
| |
| t_check = [0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0] |
| print(analysis.get_summary_report(t_check)) |
|
|
| |
| print("\nGenerating settlement vs time plot...") |
| time, settlement = analysis.plot_settlement_vs_time( |
| t_max=20.0, n_points=200, save_path="settlement_vs_time.png" |
| ) |
|
|
| |
| print("Generating consolidation profile plots...") |
| for t in [0.5, 2.0, 10.0]: |
| analysis.plot_degree_of_consolidation( |
| t, save_path=f"consolidation_profile_t{t:.1f}.png" |
| ) |
|
|
| print("\nAnalysis complete!") |
|
|
|
|
| def load_yaml_data(yaml_file: str) -> Dict[str, Any]: |
| """ |
| Load PVD analysis data from YAML file |
| |
| Parameters: |
| ----------- |
| yaml_file : str |
| Path to YAML file |
| |
| Returns: |
| -------- |
| data : dict |
| Dictionary containing analysis parameters |
| """ |
| with open(yaml_file, "r") as f: |
| data = yaml.safe_load(f) |
| return data |
|
|
|
|
| def create_analysis_from_yaml(yaml_file: str) -> PVDConsolidation: |
| """ |
| Create PVDConsolidation object from YAML file |
| |
| Parameters: |
| ----------- |
| yaml_file : str |
| Path to YAML file |
| |
| Returns: |
| -------- |
| analysis : PVDConsolidation |
| PVD consolidation analysis object |
| """ |
| data = load_yaml_data(yaml_file) |
|
|
| |
| layers = [] |
| for layer_data in data["soil_layers"]: |
| layer = SoilLayer( |
| thickness=layer_data["thickness"], |
| Cv=layer_data["Cv"], |
| Ch=layer_data["Ch"], |
| RR=layer_data["RR"], |
| CR=layer_data["CR"], |
| sigma_ini=layer_data["sigma_ini"], |
| sigma_p=layer_data["sigma_p"], |
| kh=layer_data["kh"], |
| ks=layer_data["ks"], |
| ) |
| layers.append(layer) |
|
|
| |
| pvd_data = data["pvd"] |
| pvd = PVDProperties( |
| dw=pvd_data["dw"], |
| ds=pvd_data["ds"], |
| De=pvd_data["De"], |
| L_drain=pvd_data["L_drain"], |
| qw=pvd_data["qw"], |
| ) |
|
|
| |
| surcharge = data["analysis"]["surcharge"] |
| dt = data["analysis"].get("dt", 0.01) |
|
|
| |
| analysis = PVDConsolidation(layers, pvd, surcharge, dt) |
|
|
| return analysis, data |
|
|
|
|
| def run_analysis_from_yaml(yaml_file: str, output_dir: str = None): |
| """ |
| Run complete PVD analysis from YAML file |
| |
| Parameters: |
| ----------- |
| yaml_file : str |
| Path to YAML input file |
| output_dir : str, optional |
| Directory to save output files |
| """ |
| print(f"Loading data from: {yaml_file}") |
| analysis, data = create_analysis_from_yaml(yaml_file) |
|
|
| |
| if output_dir is None: |
| output_dir = os.path.dirname(yaml_file) or "." |
| os.makedirs(output_dir, exist_ok=True) |
|
|
| |
| analysis_params = data["analysis"] |
| t_max = analysis_params.get("t_max", 20.0) |
| n_points = analysis_params.get("n_points", 200) |
| t_check = analysis_params.get("t_check", [0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0]) |
| t_profiles = analysis_params.get("t_profiles", [0.5, 2.0, 10.0]) |
|
|
| |
| print("\n" + "=" * 70) |
| print("RUNNING PVD CONSOLIDATION ANALYSIS") |
| print("=" * 70 + "\n") |
|
|
| report = analysis.get_summary_report(t_check) |
| print(report) |
|
|
| |
| report_file = os.path.join(output_dir, "pvd_analysis_report.txt") |
| with open(report_file, "w") as f: |
| f.write(report) |
| print(f"\nReport saved to: {report_file}") |
|
|
| |
| print("\nGenerating settlement vs time plot...") |
| settlement_plot = os.path.join(output_dir, "settlement_vs_time.png") |
| time, settlement = analysis.plot_settlement_vs_time( |
| t_max=t_max, n_points=n_points, save_path=settlement_plot |
| ) |
| print(f"Plot saved to: {settlement_plot}") |
|
|
| |
| csv_file = os.path.join(output_dir, "settlement_data.csv") |
| np.savetxt( |
| csv_file, |
| np.column_stack((time, settlement * 1000)), |
| delimiter=",", |
| header="Time (years),Settlement (mm)", |
| comments="", |
| ) |
| print(f"Data saved to: {csv_file}") |
|
|
| |
| print("\nGenerating consolidation profile plots...") |
| for t in t_profiles: |
| profile_plot = os.path.join(output_dir, f"consolidation_profile_t{t:.1f}y.png") |
| analysis.plot_degree_of_consolidation(t, save_path=profile_plot) |
| print(f"Profile at t={t:.1f} years saved to: {profile_plot}") |
|
|
| print("\n" + "=" * 70) |
| print("ANALYSIS COMPLETE!") |
| print("=" * 70) |
|
|
|
|
| def main(): |
| """Main CLI interface""" |
| parser = argparse.ArgumentParser( |
| description="PVD Consolidation Analysis - Settlement vs Time Calculator", |
| formatter_class=argparse.RawDescriptionHelpFormatter, |
| epilog=""" |
| Examples: |
| # Run analysis from YAML file |
| python pvd_consolidation.py --data input.yaml |
| |
| # Specify output directory |
| python pvd_consolidation.py --data input.yaml --output results/ |
| |
| # Run example analysis |
| python pvd_consolidation.py --example |
| """, |
| ) |
|
|
| parser.add_argument("--data", type=str, help="Path to YAML input file") |
| parser.add_argument( |
| "--output", |
| type=str, |
| default=None, |
| help="Output directory for results (default: same as input file)", |
| ) |
| parser.add_argument( |
| "--example", |
| action="store_true", |
| help="Run example analysis with default parameters", |
| ) |
|
|
| args = parser.parse_args() |
|
|
| if args.example: |
| print("Running example analysis...") |
| example_usage() |
| elif args.data: |
| if not os.path.exists(args.data): |
| print(f"Error: Input file '{args.data}' not found!") |
| return |
| run_analysis_from_yaml(args.data, args.output) |
| else: |
| parser.print_help() |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|