Airbyte CDK (已停用)
注意:AirbyteCDKLoader
已停用。請改用 AirbyteLoader
。
Airbyte 是一個資料整合平台,適用於從 API、資料庫和檔案到資料倉儲和資料湖的 ELT 管道。它擁有最大的 ELT 連接器目錄,可連接到資料倉儲和資料庫。
許多來源連接器是使用 Airbyte CDK 實作的。此載入器允許執行任何這些連接器,並將資料以文件形式傳回。
安裝
首先,您需要安裝 airbyte-cdk
python 套件。
%pip install --upgrade --quiet airbyte-cdk
然後,從 Airbyte Github 儲存庫 安裝現有的連接器,或使用 Airbyte CDK 建立您自己的連接器。
例如,若要安裝 Github 連接器,請執行
%pip install --upgrade --quiet "source_github@git+https://github.com/airbytehq/airbyte.git@master#subdirectory=airbyte-integrations/connectors/source-github"
某些來源也以常規套件形式發佈在 PyPI 上
範例
現在您可以根據匯入的來源建立 AirbyteCDKLoader
。它採用傳遞至連接器的 config
物件。您也必須依名稱 (stream_name
) 選取您要從中檢索記錄的串流。查看連接器文件頁面和規格定義,以取得有關 config 物件和可用串流的更多資訊。對於 Github 連接器,這些是
- https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-github/source_github/spec.json.
- https://docs.airbyte.com/integrations/sources/github/
from langchain_community.document_loaders.airbyte import AirbyteCDKLoader
from source_github.source import SourceGithub # plug in your own source here
config = {
# your github configuration
"credentials": {"api_url": "api.github.com", "personal_access_token": "<token>"},
"repository": "<repo>",
"start_date": "<date from which to start retrieving records from in ISO format, e.g. 2020-10-20T00:00:00Z>",
}
issues_loader = AirbyteCDKLoader(
source_class=SourceGithub, config=config, stream_name="issues"
)
API 參考:AirbyteCDKLoader
現在您可以像平常一樣載入文件
docs = issues_loader.load()
由於 load
傳回清單,因此它將封鎖直到所有文件都載入完成。若要更好地控制此程序,您也可以使用 lazy_load
方法,該方法會改為傳回迭代器
docs_iterator = issues_loader.lazy_load()
請記住,預設情況下,頁面內容為空,而 metadata 物件包含記錄中的所有資訊。若要以不同的方式建立文件,請在建立載入器時傳入 record_handler 函數
from langchain_core.documents import Document
def handle_record(record, id):
return Document(
page_content=record.data["title"] + "\n" + (record.data["body"] or ""),
metadata=record.data,
)
issues_loader = AirbyteCDKLoader(
source_class=SourceGithub,
config=config,
stream_name="issues",
record_handler=handle_record,
)
docs = issues_loader.load()
API 參考:Document
增量載入
某些串流允許增量載入,這表示來源會追蹤已同步的記錄,並且不會再次載入它們。這對於具有大量資料且經常更新的來源很有用。
若要利用此功能,請儲存載入器的 last_state
屬性,並在再次建立載入器時傳入它。這將確保僅載入新記錄。
last_state = issues_loader.last_state # store safely
incremental_issue_loader = AirbyteCDKLoader(
source_class=SourceGithub, config=config, stream_name="issues", state=last_state
)
new_docs = incremental_issue_loader.load()