数据摄取
当您处理大量上下文数据时,您可能需要考虑为数据摄取启动一个独立的流程。
系统提示您需要转向自定义数据摄取流程的一些迹象:
- 您的嵌入模型达到了 API 速率限制
- 您的 Langchain VectorStore 底层数据库需要速率限制
- 您感觉需要在 Python 代码中添加自定义的调速/节流逻辑
正如我们在YouTube 教程系列中提到的,GPTR 在底层使用了 Langchain Documents 和 Langchain VectorStores。
这是两个出色的抽象,使得 GPTR 架构具有高度的可配置性。
当前的研究流程,无论您是生成关于网络还是本地文档的报告,都遵循以下步骤:
Step 1: transform your content (web results or local documents) into Langchain Documents
Step 2: Insert your Langchain Documents into a Langchain VectorStore
Step 3: Pass your Langchain Vectorstore into your GPTR report ([more on that here](https://docs.gptr.com.cn/docs/gpt-researcher/context/vector-stores) and below)
代码示例见下文
假设您的 .env 变量如下所示
OPENAI_API_KEY={Your OpenAI API Key here}
TAVILY_API_KEY={Your Tavily API Key here}
PGVECTOR_CONNECTION_STRING=postgresql://username:password...
下面是一个自定义的数据摄取流程,您可以用它将数据摄取到 Langchain VectorStore 中。请在此处查看完整的工作示例。在此示例中,我们使用 Postgres VectorStore 来嵌入一个 Github 分支的数据,但您可以使用任何受支持的 Langchain VectorStore。
请注意,当您创建 Langchain Documents 时,您应该在元数据中包含 source 和 title 字段,以便 GPTR 能够无缝地利用您的文档。在下面的示例中,我们将文档列表分成大小为 100 的块,然后一次将一个块插入到向量存储中。
步骤1:将您的内容转换为 Langchain Documents
from langchain_core.documents import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
async def transform_to_langchain_docs(self, directory_structure):
documents = []
splitter = RecursiveCharacterTextSplitter(chunk_size=200, chunk_overlap=30)
run_timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S')
for file_name in directory_structure:
if not file_name.endswith('/'):
try:
content = self.repo.get_contents(file_name, ref=self.branch_name)
try:
decoded_content = base64.b64decode(content.content).decode()
except Exception as e:
print(f"Error decoding content: {e}")
print("the problematic file_name is", file_name)
continue
print("file_name", file_name)
print("content", decoded_content)
# Split each document into smaller chunks
chunks = splitter.split_text(decoded_content)
# Extract metadata for each chunk
for index, chunk in enumerate(chunks):
metadata = {
"id": f"{run_timestamp}_{uuid4()}", # Generate a unique UUID for each document
"source": file_name,
"title": file_name,
"extension": os.path.splitext(file_name)[1],
"file_path": file_name
}
document = Document(
page_content=chunk,
metadata=metadata
)
documents.append(document)
except Exception as e:
print(f"Error saving to vector store: {e}")
return None
await save_to_vector_store(documents)
步骤2:将您的 Langchain Documents 插入到 Langchain VectorStore
from langchain_postgres import PGVector
from langchain_postgres.vectorstores import PGVector
from sqlalchemy.ext.asyncio import create_async_engine
from langchain_community.embeddings import OpenAIEmbeddings
async def save_to_vector_store(self, documents):
# The documents are already Document objects, so we don't need to convert them
embeddings = OpenAIEmbeddings()
# self.vector_store = FAISS.from_documents(documents, embeddings)
pgvector_connection_string = os.environ["PGVECTOR_CONNECTION_STRING"]
collection_name = "my_docs"
vector_store = PGVector(
embeddings=embeddings,
collection_name=collection_name,
connection=pgvector_connection_string,
use_jsonb=True
)
# for faiss
# self.vector_store = vector_store.add_documents(documents, ids=[doc.metadata["id"] for doc in documents])
# Split the documents list into chunks of 100
for i in range(0, len(documents), 100):
chunk = documents[i:i+100]
# Insert the chunk into the vector store
vector_store.add_documents(chunk, ids=[doc.metadata["id"] for doc in chunk])
步骤3:将您的 Langchain Vectorstore 传递到您的 GPTR 报告中
async_connection_string = pgvector_connection_string.replace("postgresql://", "postgresql+psycopg://")
# Initialize the async engine with the psycopg3 driver
async_engine = create_async_engine(
async_connection_string,
echo=True
)
async_vector_store = PGVector(
embeddings=embeddings,
collection_name=collection_name,
connection=async_engine,
use_jsonb=True
)
researcher = GPTResearcher(
query=query,
report_type="research_report",
report_source="langchain_vectorstore",
vector_store=async_vector_store,
)
await researcher.conduct_research()
report = await researcher.write_report()