import os import uvicorn from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware import json import datetime from openai import OpenAI from langchain_milvus import Milvus, BM25BuiltInFunction from vector import OpenAIEmbeddings from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_core.stores import InMemoryStore from langchain_classic.retrievers.parent_document_retriever import ParentDocumentRetriever from dotenv import load_dotenv # 加载 .env 文件中的环境变量, 隐藏 API Keys load_dotenv() os.environ["TOKENIZERS_PARALLELISM"] = "false" app = FastAPI() # ============================================================ # OpenAI LLM 客户端封装 (替代讲义中的 DeepSeek) # ============================================================ def create_openai_client(): """创建 OpenAI 客户端""" client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) return client def generate_openai_answer(client, prompt): """使用 OpenAI 生成回复""" response = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "user", "content": prompt} ], temperature=0.7, ) return response.choices[0].message.content # 允许所有域的请求 app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 创建 Embedding 模型 embedding_model = OpenAIEmbeddings() print("创建 Embedding 模型成功......") # 设置默认的 Milvus 数据库文件路径 URI = "./milvus_agent.db" URI1 = "./pdf_agent.db" # 创建 Milvus 连接 milvus_vectorstore = Milvus( embedding_function=embedding_model, builtin_function=BM25BuiltInFunction(), vector_field=["dense", "sparse"], index_params=[ { "metric_type": "IP", "index_type": "IVF_FLAT", }, { "metric_type": "BM25", "index_type": "SPARSE_INVERTED_INDEX" } ], connection_args={"uri": URI}, ) retriever = milvus_vectorstore.as_retriever() print("创建 Milvus 连接成功......") docstore = InMemoryStore() # 文本分割器 child_splitter = RecursiveCharacterTextSplitter( chunk_size=200, chunk_overlap=50, length_function=len, separators=["\n\n", "\n", "。", "!", "?", ";", ",", " ", ""] ) parent_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200 ) pdf_vectorstore = Milvus( embedding_function=embedding_model, builtin_function=BM25BuiltInFunction(), vector_field=["dense", "sparse"], index_params=[ { "metric_type": "IP", "index_type": "IVF_FLAT", }, { "metric_type": "BM25", "index_type": "SPARSE_INVERTED_INDEX" } ], connection_args={"uri": URI1}, consistency_level="Bounded", drop_old=False, ) # 设置父子文档检索器 parent_retriever = ParentDocumentRetriever( vectorstore=pdf_vectorstore, docstore=docstore, child_splitter=child_splitter, parent_splitter=parent_splitter, ) print("创建 Parent Milvus 连接成功......") # 创建大语言模型, 采用 OpenAI client_llm = create_openai_client() print("创建 OpenAI LLM 成功......") def format_docs(docs): return "\n\n".join(doc.page_content for doc in docs) @app.post("/") async def chatbot(request: Request): global milvus_vectorstore, retriever json_post_raw = await request.json() json_post = json.dumps(json_post_raw) json_post_list = json.loads(json_post) query = json_post_list.get('question') # 1: Milvus 召回 & 排序 # 在集合中搜索问题并检索语义 top-10 匹配项, 而且已经配置了 reranker 的处理, 采用RRF算法 recall_rerank_milvus = milvus_vectorstore.similarity_search( query, k=10, ranker_type="rrf", ranker_params={"k": 100} ) if recall_rerank_milvus: # 检索结果存放在列表中 context = [r.page_content for r in recall_rerank_milvus] context = format_docs(recall_rerank_milvus) else: context = "" # 2: PDF 文档的 Milvus 召回 # 父文档检索器按照query进行召回 res = "" retrieved_docs = parent_retriever.invoke(query) if retrieved_docs is not None and len(retrieved_docs) >= 1: res = retrieved_docs[0].page_content print("PDF res: ", res) context = context + "\n" + res # 为LLM定义系统和用户提示, 这个提示是由从Milvus检索到的文档组装而成的. SYSTEM_PROMPT = """ System: 你是一个非常得力的医学助手, 你可以通过从数据库中检索出的信息找到问题的答案. """ USER_PROMPT = f""" User: 利用介于之间的从数据库中检索出的信息来回答问题, 具体的问题介于之间. 如果提供的信息为空, 则按照你的经验知识来给出尽可能严谨准确的回答, 不知道的时候坦诚的承认不了解, 不要编造不真实的信息. {context} {query} """ # 3. 使用 OpenAI 最新版本模型, 根据提示生成回复 response = generate_openai_answer(client_llm, SYSTEM_PROMPT + USER_PROMPT.format(context, query)) now = datetime.datetime.now() time = now.strftime("%Y-%m-%d %H:%M:%S") answer = { "response": response, "status": 200, "time": time } return answer if __name__ == '__main__': # 主函数中直接启动fastapi服务 uvicorn.run(app, host='0.0.0.0', port=8103, workers=1)