mekosotto Claude Sonnet 4.6 commited on
Commit
c70b852
·
1 Parent(s): 99af1d9

feat(core): extract write_parquet() helper for §6 storage contract

Browse files
Files changed (2) hide show
  1. src/core/storage.py +36 -0
  2. tests/core/test_storage.py +61 -0
src/core/storage.py ADDED
@@ -0,0 +1,36 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Deterministic Parquet I/O for `data/processed/` outputs.
2
+
3
+ Implements AGENTS.md §6 storage convention: pyarrow engine, snappy compression,
4
+ index suppressed. Combined with `src.core.determinism.pin_threads`, this writes
5
+ byte-identical Parquet files across runs.
6
+ """
7
+ from __future__ import annotations
8
+
9
+ from pathlib import Path
10
+
11
+ import pandas as pd
12
+
13
+
14
+ def write_parquet(df: pd.DataFrame, output_path: Path) -> None:
15
+ """Write `df` to `output_path` as deterministic, snappy-compressed Parquet.
16
+
17
+ Creates parent directories as needed. Overwrites any existing file at
18
+ `output_path`. Raises `IsADirectoryError` if `output_path` resolves to an
19
+ existing directory (caller passed a directory by mistake).
20
+
21
+ Args:
22
+ df: DataFrame to persist. Dtypes preserved (uint8 stays uint8, etc.).
23
+ output_path: Destination file path (parent directories auto-created).
24
+
25
+ Raises:
26
+ IsADirectoryError: if `output_path` is an existing directory.
27
+ """
28
+ output_path = Path(output_path)
29
+ output_path.parent.mkdir(parents=True, exist_ok=True)
30
+ if output_path.is_dir():
31
+ raise IsADirectoryError(
32
+ f"output_path must be a file, got a directory: {output_path}"
33
+ )
34
+ df.to_parquet(
35
+ output_path, index=False, engine="pyarrow", compression="snappy",
36
+ )
tests/core/test_storage.py ADDED
@@ -0,0 +1,61 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Tests for src.core.storage."""
2
+ from __future__ import annotations
3
+
4
+ import hashlib
5
+ from pathlib import Path
6
+
7
+ import pandas as pd
8
+ import pytest
9
+
10
+ from src.core import storage
11
+
12
+
13
+ def _md5(path: Path) -> str:
14
+ return hashlib.md5(path.read_bytes()).hexdigest()
15
+
16
+
17
+ class TestWriteParquet:
18
+ def test_writes_parquet_at_path(self, tmp_path: Path):
19
+ df = pd.DataFrame({"a": [1, 2, 3], "b": ["x", "y", "z"]})
20
+ out = tmp_path / "out.parquet"
21
+ storage.write_parquet(df, out)
22
+ round_trip = pd.read_parquet(out)
23
+ pd.testing.assert_frame_equal(round_trip, df)
24
+
25
+ def test_creates_parent_directories(self, tmp_path: Path):
26
+ df = pd.DataFrame({"a": [1]})
27
+ out = tmp_path / "deep" / "nested" / "out.parquet"
28
+ storage.write_parquet(df, out)
29
+ assert out.exists()
30
+
31
+ def test_overwrites_existing_file(self, tmp_path: Path):
32
+ out = tmp_path / "out.parquet"
33
+ storage.write_parquet(pd.DataFrame({"a": [1]}), out)
34
+ storage.write_parquet(pd.DataFrame({"a": [2]}), out)
35
+ assert pd.read_parquet(out)["a"].tolist() == [2]
36
+
37
+ def test_raises_if_path_is_directory(self, tmp_path: Path):
38
+ (tmp_path / "out.parquet").mkdir()
39
+ with pytest.raises(IsADirectoryError):
40
+ storage.write_parquet(pd.DataFrame({"a": [1]}), tmp_path / "out.parquet")
41
+
42
+ def test_byte_deterministic_on_repeat(self, tmp_path: Path):
43
+ df = pd.DataFrame({"a": list(range(100)), "b": list(range(100, 200))})
44
+ a, b = tmp_path / "a.parquet", tmp_path / "b.parquet"
45
+ storage.write_parquet(df, a)
46
+ storage.write_parquet(df, b)
47
+ assert _md5(a) == _md5(b)
48
+
49
+ def test_preserves_uint8_dtype(self, tmp_path: Path):
50
+ """BBB fingerprints are uint8; writing must not silently widen."""
51
+ df = pd.DataFrame({"fp_0": pd.Series([0, 1], dtype="uint8")})
52
+ out = tmp_path / "out.parquet"
53
+ storage.write_parquet(df, out)
54
+ assert pd.read_parquet(out)["fp_0"].dtype == "uint8"
55
+
56
+ def test_index_not_persisted(self, tmp_path: Path):
57
+ """index=False must be the default — round-trip should reset to RangeIndex."""
58
+ df = pd.DataFrame({"a": [1, 2]}, index=["foo", "bar"])
59
+ out = tmp_path / "out.parquet"
60
+ storage.write_parquet(df, out)
61
+ assert list(pd.read_parquet(out).index) == [0, 1]