| import os |
| from MemoryEngine import MemoryEngine |
|
|
| class MemoryBatchParser: |
| def __init__(self, segment_length=2000): |
| self.engine = MemoryEngine() |
| self.segment_length = segment_length |
|
|
| def _chunk_text(self, full_text): |
| |
| return [full_text[i:i+self.segment_length] for i in range(0, len(full_text), self.segment_length)] |
|
|
| def ingest_txt(self, filepath, default_structure="General > Ingested", agent_id="batch_parser"): |
| if not os.path.exists(filepath): |
| print("File not found.") |
| return |
|
|
| with open(filepath, "r", encoding="utf-8") as f: |
| content = f.read() |
|
|
| segments = self._chunk_text(content) |
|
|
| for i, seg in enumerate(segments): |
| self.engine.insert_segment( |
| raw_text=seg.strip(), |
| concepts="", |
| terms="", |
| structure=default_structure, |
| datapoints="", |
| comparisons="", |
| applications="", |
| agent_id=agent_id |
| ) |
| print(f"[{i+1}/{len(segments)}] Segment processed.") |
|
|
| |
| if __name__ == "__main__": |
| parser = MemoryBatchParser() |
| parser.ingest_txt("example_upload.txt") |
|
|