| import glob |
| import os |
| from langchain_community.document_loaders import PyMuPDFLoader, TextLoader, CSVLoader |
| from langchain.text_splitter import CharacterTextSplitter |
| from langchain.docstore.document import Document |
| from sentence_transformers import SentenceTransformer |
| from langchain_pinecone import PineconeVectorStore |
| from pinecone.grpc import PineconeGRPC as Pinecone |
| from pinecone import ServerlessSpec |
| import time |
| from langchain_community.embeddings import SentenceTransformerEmbeddings |
|
|
| from dotenv import load_dotenv |
| load_dotenv() |
|
|
|
|
| |
| def come_data(splits): |
| docs = [] |
| for i in range(len(splits)): |
| spcon = splits[i].page_content |
| url = splits[i].metadata['source'] |
| con = Document(page_content=spcon, metadata={'source': url}) |
| docs.append(con) |
| return docs |
|
|
|
|
|
|
|
|
|
|
| |
| def flatten_list(lst): |
| return [item for sublist in lst for item in flatten_list(sublist)] if isinstance(lst, list) else [lst] |
|
|
|
|
| |
| def all_files(path): |
| print(f'RAG์ ๋ค์ด๊ฐ ๋ชจ๋ ๋ฐ์ดํฐ๋ {path}์ ๋ด์์ฃผ์ธ์.\n\n\n') |
| f = glob.glob(path + '/**', recursive=True) |
| f_docs = [] |
| for file in f: |
| a = False |
| if file.endswith('.txt'): |
| loader = TextLoader(file) |
| document = loader.load() |
| a = True |
| elif file.endswith('.csv'): |
| loader = CSVLoader(file) |
| document = loader.load() |
| a = True |
| elif file.endswith('.pdf'): |
| loader = PyMuPDFLoader(file) |
| document = loader.load() |
| a = True |
| |
| if a: |
| print(file.split('/')[-1] + ' split ์งํ ์ค') |
| text_splitter = CharacterTextSplitter.from_tiktoken_encoder( |
| separator=".", |
| chunk_size=500, |
| chunk_overlap=0, |
| ) |
| splits = text_splitter.split_documents(document) |
| docs = come_data(splits) |
| f_docs.append(docs) |
| print(file.split('/')[-1] + ' split ์งํ ์๋ฃ. \n' + file.split('/')[-1] + ' split ๊ฐฏ์ : ' + str(len(docs))) |
| flattened_list = flatten_list(f_docs) |
| |
| ''' |
| flattened ๋ docs๋ฅผ ๋ฒกํฐ db๋ก ๋ฃ์ด์ค ๊ฒ |
| ''' |
|
|
|
|
| |
| |
| embedding_model = SentenceTransformerEmbeddings(model_name='BM-K/KoSimCSE-roberta-multitask', model_kwargs={"trust_remote_code":True}) |
| |
| |
|
|
| api_key = os.environ['PINECONE_API_KEY'] |
| pc = Pinecone(api_key=api_key) |
|
|
| index_name = os.getenv('INDEX_NAME') |
|
|
| print('Vector DB ์ด๊ธฐํ. Index_name = ' + str(index_name)) |
| spec = ServerlessSpec(cloud='aws', region='us-east-1') |
|
|
| |
| collect_name = [] |
| for n in pc.list_indexes().indexes: |
| collect_name.append(n.name) |
| |
| if index_name in collect_name: |
| pc.delete_index(index_name) |
| print('๊ธฐ์กด ์ธ๋ฑ์ค ์ญ์ ์๋ฃ') |
| time.sleep(3) |
| |
| |
| pc.create_index( |
| index_name, |
| dimension=768, |
| metric='cosine', |
| spec=spec |
| ) |
| |
| |
| |
| print('Vector DB ๋ค์ด๊ฐ๋ ์ค. Index_name = ' + str(index_name)) |
|
|
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| Vectorstore = PineconeVectorStore.from_documents( |
| documents=flattened_list, |
| index_name=index_name, |
| embedding=embedding_model |
| ) |
|
|
| print('์ ์ฅ ์๋ฃ') |
| return Vectorstore, flattened_list |