ragllm / zhipuEmbeddingsData.py
Toadied's picture
Upload 16 files
edf63e7 verified
import os
os.environ["USER_AGENT"] = "MyRAGApp/1.0 (https://myapp.example.com; myemail@example.com)"
os.environ["CHROMA_TELEMETRY_DISABLED"] = "true"
from zhipuai import ZhipuAI
from dotenv import load_dotenv, find_dotenv
from langchain_community.document_loaders import (
TextLoader, PythonLoader, CSVLoader, JSONLoader,
Docx2txtLoader, UnstructuredPowerPointLoader,
PyMuPDFLoader, UnstructuredMarkdownLoader,
UnstructuredImageLoader, WebBaseLoader
)
_ = load_dotenv(find_dotenv())
client = ZhipuAI(api_key=os.environ["ZHIPUAI_API_KEY"])
#数据存入向量库
from zhipuEmbedding import ZhipuAiEmbeddings
from langchain_community.vectorstores import Chroma
def dataLoadToVectordb(texts):
embedding = ZhipuAiEmbeddings()
persist_directory = 'data_base/vector_db/chroma'
vectordb = Chroma.from_documents(
documents=texts,
embedding=embedding,
persist_directory=persist_directory
)
print(f"向量库中存储的数量:{vectordb._collection.count()}")
return
def get_file_paths(folder_path):
current_dir = os.getcwd()
abs_folder_path = os.path.abspath(folder_path)
print(f"当前工作目录:{current_dir}")
print(f"目标文件夹绝对路径:{abs_folder_path}")
print(f"目标路径是否存在:{os.path.exists(abs_folder_path)}")
print(f"目标路径是否是文件夹:{os.path.isdir(abs_folder_path)}")
# 1.获取所有文件
file_paths = []
for root, dirs, files in os.walk(folder_path):
for file in files:
file_path = os.path.join(root, file)
file_paths.append(file_path)
print(file_paths[:3])
# 下载所有文件并存储到text
texts = []
for file_path in file_paths:
splitDocuments(file_path, texts)
#2。清洗数据
#去除多余换行,符号,空格等
#3.文档数据分割
from langchain_text_splitters import RecursiveCharacterTextSplitter
# 知识库中单段文本长度
CHUNK_SIZE = 500
# 知识库中相邻文本重合长度
OVERLAP_SIZE = 0
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=CHUNK_SIZE,
chunk_overlap=OVERLAP_SIZE
)
docs = text_splitter.split_documents(texts)
print(f"切分后的文件数量:{docs}")
#print(f"切分后的字符数(可以用来大致评估 token 数):{sum([len(doc.page_content) for doc in docs])}")
#dataLoadToVectordb(docs)
for i in range(0,len(docs),64):
input_embeddings = docs[i : i + 64]
#input_embeddings = [text.strip() for text in input_embeddings if text.strip()]
dataLoadToVectordb(input_embeddings)
def splitDocuments(file_path, texts):
file_type = file_path.split('.')[-1].lower()
loader = None
if file_type == 'pdf':
loader = PyMuPDFLoader(file_path) # PDF首选(高效稳定)
elif file_type == 'md':
loader = UnstructuredMarkdownLoader(file_path) # Markdown
elif file_type == 'txt':
loader = TextLoader(file_path, encoding="utf-8") # 纯文本
elif file_type == 'py':
loader = PythonLoader(file_path) # Python代码
elif file_type == 'csv':
loader = CSVLoader(file_path, encoding="utf-8") # 表格
elif file_type == 'json':
loader = JSONLoader(file_path, jq_schema=".content", text_content=False) # JSON
elif file_type == 'docx':
loader = Docx2txtLoader(file_path) # Word(docx)
elif file_type in ['xlsx', 'xls']:
#loader = ExcelLoader(file_path) # Excel(新旧格式)
print(f"不支持的文件格式:{file_type} | 文件路径:{file_path}")
return
elif file_type in ['pptx', 'ppt']:
loader = UnstructuredPowerPointLoader(file_path) # PPT(新旧格式)
elif file_type in ['png', 'jpg', 'jpeg']:
loader = UnstructuredImageLoader(file_path) # 图片(OCR提取)
elif file_type == 'url':
loader = WebBaseLoader(file_path) # 普通网页
elif file_type == 'epub':
#loader = EpubLoader(file_path) # 电子书
print(f"不支持的文件格式:{file_type} | 文件路径:{file_path}")
return
else:
print(f"不支持的文件格式:{file_type} | 文件路径:{file_path}")
return
if loader is not None:
texts.extend(loader.load())
if __name__ == "__main__":
get_file_paths("data_base/data")