| import json |
| from fs_s3fs import S3FS |
| from src.libs.logger import logger |
| from src.libs.s3fs import get_s3_credentials |
| from phi.vectordb.pgvector import PgVector2 |
| from phi.knowledge.json import JSONKnowledgeBase |
| from src.databases.postgres import sqlalchemy_engine |
|
|
| class JSONKnowledgeBaseExtended(JSONKnowledgeBase): |
| s3fs: S3FS = None |
|
|
| def __init__( |
| self, |
| s3_bucket_name, |
| vector_db, |
| s3_access_key_id, |
| s3_secret_access_key, |
| s3_endpoint_url, |
| s3_region, |
| ): |
| super().__init__(path=s3_bucket_name, vector_db=vector_db, bucket_name = s3_bucket_name) |
|
|
| |
| self.s3fs = S3FS( |
| bucket_name=s3_bucket_name, |
| aws_access_key_id=s3_access_key_id, |
| aws_secret_access_key=s3_secret_access_key, |
| endpoint_url = s3_endpoint_url, |
| region = s3_region, |
| ) |
|
|
| def load_knowledge_base(self, recreate: bool = False): |
| json_knowledge_base.load(recreate=recreate) |
|
|
| def store_json_data_in_s3(self, json_data, file_path): |
| if file_path[0] == '/': |
| file_path = f"/json-data/{file_path[1:]}" |
| else: |
| file_path = f"/json-data/{file_path}" |
|
|
| logger.info(f"Storing JSON data in S3 bucket: {self.s3fs._bucket_name} at path: {file_path}") |
| |
| |
| self.s3fs.open(path = f"/{file_path}", mode = 'w').write(json.dumps(json_data, indent=2)) |
| return True |
|
|
|
|
| |
| _s3_credendtials = get_s3_credentials() |
| _json_knowledge_base_arguments = { |
| "vector_db": PgVector2( |
| collection="json_documents", |
| db_engine=sqlalchemy_engine |
| ), |
| **_s3_credendtials |
| } |
|
|
| |
| json_knowledge_base = JSONKnowledgeBaseExtended( |
| **_json_knowledge_base_arguments |
| ) |
|
|