跳到主要內容
Open In ColabOpen on GitHub

Airbyte CDK (已停用)

注意:AirbyteCDKLoader 已停用。請改用 AirbyteLoader

Airbyte 是一個資料整合平台,適用於從 API、資料庫和檔案到資料倉儲和資料湖的 ELT 管道。它擁有最大的 ELT 連接器目錄,可連接到資料倉儲和資料庫。

許多來源連接器是使用 Airbyte CDK 實作的。此載入器允許執行任何這些連接器,並將資料以文件形式傳回。

安裝

首先,您需要安裝 airbyte-cdk python 套件。

%pip install --upgrade --quiet  airbyte-cdk

然後,從 Airbyte Github 儲存庫 安裝現有的連接器,或使用 Airbyte CDK 建立您自己的連接器。

例如,若要安裝 Github 連接器,請執行

%pip install --upgrade --quiet  "source_github@git+https://github.com/airbytehq/airbyte.git@master#subdirectory=airbyte-integrations/connectors/source-github"

某些來源也以常規套件形式發佈在 PyPI 上

範例

現在您可以根據匯入的來源建立 AirbyteCDKLoader。它採用傳遞至連接器的 config 物件。您也必須依名稱 (stream_name) 選取您要從中檢索記錄的串流。查看連接器文件頁面和規格定義,以取得有關 config 物件和可用串流的更多資訊。對於 Github 連接器,這些是

from langchain_community.document_loaders.airbyte import AirbyteCDKLoader
from source_github.source import SourceGithub # plug in your own source here

config = {
# your github configuration
"credentials": {"api_url": "api.github.com", "personal_access_token": "<token>"},
"repository": "<repo>",
"start_date": "<date from which to start retrieving records from in ISO format, e.g. 2020-10-20T00:00:00Z>",
}

issues_loader = AirbyteCDKLoader(
source_class=SourceGithub, config=config, stream_name="issues"
)
API 參考:AirbyteCDKLoader

現在您可以像平常一樣載入文件

docs = issues_loader.load()

由於 load 傳回清單,因此它將封鎖直到所有文件都載入完成。若要更好地控制此程序,您也可以使用 lazy_load 方法,該方法會改為傳回迭代器

docs_iterator = issues_loader.lazy_load()

請記住,預設情況下,頁面內容為空,而 metadata 物件包含記錄中的所有資訊。若要以不同的方式建立文件,請在建立載入器時傳入 record_handler 函數

from langchain_core.documents import Document


def handle_record(record, id):
return Document(
page_content=record.data["title"] + "\n" + (record.data["body"] or ""),
metadata=record.data,
)


issues_loader = AirbyteCDKLoader(
source_class=SourceGithub,
config=config,
stream_name="issues",
record_handler=handle_record,
)

docs = issues_loader.load()
API 參考:Document

增量載入

某些串流允許增量載入,這表示來源會追蹤已同步的記錄,並且不會再次載入它們。這對於具有大量資料且經常更新的來源很有用。

若要利用此功能,請儲存載入器的 last_state 屬性,並在再次建立載入器時傳入它。這將確保僅載入新記錄。

last_state = issues_loader.last_state  # store safely

incremental_issue_loader = AirbyteCDKLoader(
source_class=SourceGithub, config=config, stream_name="issues", state=last_state
)

new_docs = incremental_issue_loader.load()

此頁面是否對您有幫助?