如何使用 LangChain 索引 API
在這裡,我們將看看使用 LangChain 索引 API 的基本索引工作流程。
索引 API 可讓您將來自任何來源的文件載入並保持同步到向量儲存庫中。具體來說,它有助於
- 避免將重複內容寫入向量儲存庫
- 避免重新寫入未更改的內容
- 避免重新計算未更改內容的嵌入
所有這些都應該為您節省時間和金錢,並改善您的向量搜尋結果。
至關重要的是,即使文件相對於原始來源文件經歷了多個轉換步驟(例如,透過文字分塊),索引 API 仍然可以工作。
運作方式
LangChain 索引使用記錄管理器 (RecordManager
) 來追蹤文件寫入向量儲存庫的情況。
在索引內容時,會為每個文件計算雜湊值,並將以下資訊儲存在記錄管理器中
- 文件雜湊值(頁面內容和中繼資料的雜湊值)
- 寫入時間
- 來源 ID — 每個文件都應在其元資料中包含資訊,以便我們確定此文件的最終來源
刪除模式
將文件索引到向量儲存庫時,向量儲存庫中某些現有文件可能需要刪除。在某些情況下,您可能想要移除任何與正在索引的新文件來自相同來源的現有文件。在其他情況下,您可能想要完全刪除所有現有文件。索引 API 刪除模式可讓您選擇所需的行為
清理模式 | 重複資料刪除內容 | 可平行化 | 清理已刪除的來源文件 | 清理來源文件和/或衍生文件的變更 | 清理時間 |
---|---|---|---|---|---|
無 | ✅ | ✅ | ❌ | ❌ | - |
增量 | ✅ | ✅ | ❌ | ✅ | 持續 |
完整 | ✅ | ❌ | ✅ | ✅ | 索引結束時 |
Scoped_Full | ✅ | ✅ | ❌ | ✅ | 索引結束時 |
None
不會執行任何自動清理,允許使用者手動清理舊內容。
incremental
、full
和 scoped_full
提供以下自動清理
- 如果來源文件或衍生文件的內容已更改,則所有 3 種模式都會清理(刪除)內容的先前版本。
- 如果來源文件已刪除(表示它未包含在目前正在索引的文件中),則
full
清理模式會從向量儲存庫中正確刪除它,但incremental
和scoped_full
模式則不會。
當內容發生變更時(例如,來源 PDF 檔案已修訂),在索引期間會有一段時間,新舊版本都可能傳回給使用者。這發生在新內容寫入之後,但在舊版本刪除之前。
incremental
索引可最大限度地縮短此時間段,因為它可以持續執行清理,並在寫入時進行清理。full
和scoped_full
模式會在所有批次都寫入後執行清理。
需求
- 請勿與已獨立於索引 API 預先填入內容的儲存庫一起使用,因為記錄管理器不會知道先前已插入記錄。
- 僅適用於支援以下功能的 LangChain
vectorstore
:- 依 ID 新增文件(具有
ids
參數的add_documents
方法) - 依 ID 刪除(具有
ids
參數的delete
方法)
- 依 ID 新增文件(具有
相容的向量儲存庫:Aerospike
、AnalyticDB
、AstraDB
、AwaDB
、AzureCosmosDBNoSqlVectorSearch
、AzureCosmosDBVectorSearch
、AzureSearch
、Bagel
、Cassandra
、Chroma
、CouchbaseVectorStore
、DashVector
、DatabricksVectorSearch
、DeepLake
、Dingo
、ElasticVectorSearch
、ElasticsearchStore
、FAISS
、HanaDB
、Milvus
、MongoDBAtlasVectorSearch
、MyScale
、OpenSearchVectorSearch
、PGVector
、Pinecone
、Qdrant
、Redis
、Rockset
、ScaNN
、SingleStoreDB
、SupabaseVectorStore
、SurrealDBStore
、TimescaleVector
、Vald
、VDMS
、Vearch
、VespaStore
、Weaviate
、Yellowbrick
、ZepVectorStore
、TencentVectorDB
、OpenSearchVectorSearch
。
注意
記錄管理器依賴基於時間的機制來確定可以清理哪些內容(當使用 full
或 incremental
或 scoped_full
清理模式時)。
如果兩個任務背靠背執行,並且第一個任務在時鐘時間改變之前完成,則第二個任務可能無法清理內容。
在實際設定中,這不太可能成為問題,原因如下
- RecordManager 使用更高的時間戳記解析度。
- 資料需要在第一個和第二個任務執行之間發生變更,如果任務之間的時間間隔很小,這變得不太可能。
- 索引任務通常需要幾毫秒以上的時間。
快速入門
from langchain.indexes import SQLRecordManager, index
from langchain_core.documents import Document
from langchain_elasticsearch import ElasticsearchStore
from langchain_openai import OpenAIEmbeddings
初始化向量儲存庫並設定嵌入
collection_name = "test_index"
embedding = OpenAIEmbeddings()
vectorstore = ElasticsearchStore(
es_url="https://127.0.0.1:9200", index_name="test_index", embedding=embedding
)
使用適當的命名空間初始化記錄管理器。
建議: 使用一個命名空間,該命名空間同時考慮向量儲存庫和向量儲存庫中的集合名稱;例如,「redis/my_docs」、「chromadb/my_docs」或「postgres/my_docs」。
namespace = f"elasticsearch/{collection_name}"
record_manager = SQLRecordManager(
namespace, db_url="sqlite:///record_manager_cache.sql"
)
在使用記錄管理器之前建立結構描述。
record_manager.create_schema()
讓我們索引一些測試文件
doc1 = Document(page_content="kitty", metadata={"source": "kitty.txt"})
doc2 = Document(page_content="doggy", metadata={"source": "doggy.txt"})
索引到空的向量儲存庫中
def _clear():
"""Hacky helper method to clear content. See the `full` mode section to to understand why it works."""
index([], record_manager, vectorstore, cleanup="full", source_id_key="source")
None
刪除模式
此模式不執行舊版本內容的自動清理;但是,它仍然會處理內容重複資料刪除。
_clear()
index(
[doc1, doc1, doc1, doc1, doc1],
record_manager,
vectorstore,
cleanup=None,
source_id_key="source",
)
{'num_added': 1, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
_clear()
index([doc1, doc2], record_manager, vectorstore, cleanup=None, source_id_key="source")
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
第二次所有內容都將被跳過
index([doc1, doc2], record_manager, vectorstore, cleanup=None, source_id_key="source")
{'num_added': 0, 'num_updated': 0, 'num_skipped': 2, 'num_deleted': 0}
"incremental"
刪除模式
_clear()
index(
[doc1, doc2],
record_manager,
vectorstore,
cleanup="incremental",
source_id_key="source",
)
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
再次索引應導致兩個文件都被跳過——也跳過了嵌入操作!
index(
[doc1, doc2],
record_manager,
vectorstore,
cleanup="incremental",
source_id_key="source",
)
{'num_added': 0, 'num_updated': 0, 'num_skipped': 2, 'num_deleted': 0}
如果我們在增量索引模式下不提供任何文件,則不會有任何變更。
index([], record_manager, vectorstore, cleanup="incremental", source_id_key="source")
{'num_added': 0, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
如果我們變更文件,則新版本將被寫入,並且所有共享相同來源的舊版本都將被刪除。
changed_doc_2 = Document(page_content="puppy", metadata={"source": "doggy.txt"})
index(
[changed_doc_2],
record_manager,
vectorstore,
cleanup="incremental",
source_id_key="source",
)
{'num_added': 1, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 1}
"full"
刪除模式
在 full
模式下,使用者應傳遞應索引到索引函數中的完整內容全集。
任何未傳遞到索引函數中且存在於向量儲存庫中的文件都將被刪除!
此行為對於處理來源文件的刪除很有用。
_clear()
all_docs = [doc1, doc2]
index(all_docs, record_manager, vectorstore, cleanup="full", source_id_key="source")
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
假設有人刪除了第一個文件
del all_docs[0]
all_docs
[Document(page_content='doggy', metadata={'source': 'doggy.txt'})]
使用完整模式也會清理已刪除的內容。
index(all_docs, record_manager, vectorstore, cleanup="full", source_id_key="source")
{'num_added': 0, 'num_updated': 0, 'num_skipped': 1, 'num_deleted': 1}
來源
中繼資料屬性包含一個名為 source
的欄位。此來源應指向與給定文件關聯的最終出處。
例如,如果這些文件代表某些父文件的區塊,則兩個文件的 source
應該相同,並且參考父文件。
一般來說,應始終指定 source
。僅在您永遠不打算使用 incremental
模式,並且由於某些原因無法正確指定 source
欄位時,才使用 None
。
from langchain_text_splitters import CharacterTextSplitter
doc1 = Document(
page_content="kitty kitty kitty kitty kitty", metadata={"source": "kitty.txt"}
)
doc2 = Document(page_content="doggy doggy the doggy", metadata={"source": "doggy.txt"})
new_docs = CharacterTextSplitter(
separator="t", keep_separator=True, chunk_size=12, chunk_overlap=2
).split_documents([doc1, doc2])
new_docs
[Document(page_content='kitty kit', metadata={'source': 'kitty.txt'}),
Document(page_content='tty kitty ki', metadata={'source': 'kitty.txt'}),
Document(page_content='tty kitty', metadata={'source': 'kitty.txt'}),
Document(page_content='doggy doggy', metadata={'source': 'doggy.txt'}),
Document(page_content='the doggy', metadata={'source': 'doggy.txt'})]
_clear()
index(
new_docs,
record_manager,
vectorstore,
cleanup="incremental",
source_id_key="source",
)
{'num_added': 5, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
changed_doggy_docs = [
Document(page_content="woof woof", metadata={"source": "doggy.txt"}),
Document(page_content="woof woof woof", metadata={"source": "doggy.txt"}),
]
這應刪除與 doggy.txt
來源關聯的舊版本文件,並將它們替換為新版本。
index(
changed_doggy_docs,
record_manager,
vectorstore,
cleanup="incremental",
source_id_key="source",
)
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 2}
vectorstore.similarity_search("dog", k=30)
[Document(page_content='woof woof', metadata={'source': 'doggy.txt'}),
Document(page_content='woof woof woof', metadata={'source': 'doggy.txt'}),
Document(page_content='tty kitty', metadata={'source': 'kitty.txt'}),
Document(page_content='tty kitty ki', metadata={'source': 'kitty.txt'}),
Document(page_content='kitty kit', metadata={'source': 'kitty.txt'})]
與載入器一起使用
索引可以接受文件的可迭代物件,也可以接受任何載入器。
注意: 載入器必須正確設定來源鍵。
from langchain_core.document_loaders import BaseLoader
class MyCustomLoader(BaseLoader):
def lazy_load(self):
text_splitter = CharacterTextSplitter(
separator="t", keep_separator=True, chunk_size=12, chunk_overlap=2
)
docs = [
Document(page_content="woof woof", metadata={"source": "doggy.txt"}),
Document(page_content="woof woof woof", metadata={"source": "doggy.txt"}),
]
yield from text_splitter.split_documents(docs)
def load(self):
return list(self.lazy_load())
_clear()
loader = MyCustomLoader()
loader.load()
[Document(page_content='woof woof', metadata={'source': 'doggy.txt'}),
Document(page_content='woof woof woof', metadata={'source': 'doggy.txt'})]
index(loader, record_manager, vectorstore, cleanup="full", source_id_key="source")
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
vectorstore.similarity_search("dog", k=30)
[Document(page_content='woof woof', metadata={'source': 'doggy.txt'}),
Document(page_content='woof woof woof', metadata={'source': 'doggy.txt'})]