| import os
|
| os.environ["USER_AGENT"] = "MyRAGApp/1.0 (https://myapp.example.com; myemail@example.com)"
|
| os.environ["CHROMA_TELEMETRY_DISABLED"] = "true"
|
| from zhipuai import ZhipuAI
|
| from dotenv import load_dotenv, find_dotenv
|
| from langchain_community.document_loaders import (
|
| TextLoader, PythonLoader, CSVLoader, JSONLoader,
|
| Docx2txtLoader, UnstructuredPowerPointLoader,
|
| PyMuPDFLoader, UnstructuredMarkdownLoader,
|
| UnstructuredImageLoader, WebBaseLoader
|
| )
|
| _ = load_dotenv(find_dotenv())
|
| client = ZhipuAI(api_key=os.environ["ZHIPUAI_API_KEY"])
|
|
|
|
|
| from zhipuEmbedding import ZhipuAiEmbeddings
|
| from langchain_community.vectorstores import Chroma
|
|
|
| def dataLoadToVectordb(texts):
|
| embedding = ZhipuAiEmbeddings()
|
| persist_directory = 'data_base/vector_db/chroma'
|
| vectordb = Chroma.from_documents(
|
| documents=texts,
|
| embedding=embedding,
|
| persist_directory=persist_directory
|
| )
|
| print(f"向量库中存储的数量:{vectordb._collection.count()}")
|
| return
|
|
|
| def get_file_paths(folder_path):
|
| current_dir = os.getcwd()
|
| abs_folder_path = os.path.abspath(folder_path)
|
| print(f"当前工作目录:{current_dir}")
|
| print(f"目标文件夹绝对路径:{abs_folder_path}")
|
| print(f"目标路径是否存在:{os.path.exists(abs_folder_path)}")
|
| print(f"目标路径是否是文件夹:{os.path.isdir(abs_folder_path)}")
|
|
|
| file_paths = []
|
| for root, dirs, files in os.walk(folder_path):
|
| for file in files:
|
| file_path = os.path.join(root, file)
|
| file_paths.append(file_path)
|
| print(file_paths[:3])
|
|
|
|
|
| texts = []
|
| for file_path in file_paths:
|
| splitDocuments(file_path, texts)
|
|
|
|
|
|
|
|
|
|
|
| from langchain_text_splitters import RecursiveCharacterTextSplitter
|
|
|
|
|
| CHUNK_SIZE = 500
|
|
|
|
|
| OVERLAP_SIZE = 0
|
|
|
| text_splitter = RecursiveCharacterTextSplitter(
|
| chunk_size=CHUNK_SIZE,
|
| chunk_overlap=OVERLAP_SIZE
|
| )
|
| docs = text_splitter.split_documents(texts)
|
| print(f"切分后的文件数量:{docs}")
|
|
|
|
|
|
|
| for i in range(0,len(docs),64):
|
| input_embeddings = docs[i : i + 64]
|
|
|
| dataLoadToVectordb(input_embeddings)
|
|
|
| def splitDocuments(file_path, texts):
|
| file_type = file_path.split('.')[-1].lower()
|
| loader = None
|
| if file_type == 'pdf':
|
| loader = PyMuPDFLoader(file_path)
|
| elif file_type == 'md':
|
| loader = UnstructuredMarkdownLoader(file_path)
|
| elif file_type == 'txt':
|
| loader = TextLoader(file_path, encoding="utf-8")
|
| elif file_type == 'py':
|
| loader = PythonLoader(file_path)
|
| elif file_type == 'csv':
|
| loader = CSVLoader(file_path, encoding="utf-8")
|
| elif file_type == 'json':
|
| loader = JSONLoader(file_path, jq_schema=".content", text_content=False)
|
| elif file_type == 'docx':
|
| loader = Docx2txtLoader(file_path)
|
| elif file_type in ['xlsx', 'xls']:
|
|
|
| print(f"不支持的文件格式:{file_type} | 文件路径:{file_path}")
|
| return
|
| elif file_type in ['pptx', 'ppt']:
|
| loader = UnstructuredPowerPointLoader(file_path)
|
| elif file_type in ['png', 'jpg', 'jpeg']:
|
| loader = UnstructuredImageLoader(file_path)
|
| elif file_type == 'url':
|
| loader = WebBaseLoader(file_path)
|
| elif file_type == 'epub':
|
|
|
| print(f"不支持的文件格式:{file_type} | 文件路径:{file_path}")
|
| return
|
| else:
|
| print(f"不支持的文件格式:{file_type} | 文件路径:{file_path}")
|
| return
|
|
|
| if loader is not None:
|
|
|
| texts.extend(loader.load())
|
|
|
|
|
|
|
| if __name__ == "__main__":
|
| get_file_paths("data_base/data")
|
|
|