Airbyte Salesforce (已停用)
注意:此連接器專用載入器已停用。請改用 AirbyteLoader
。
Airbyte 是一個資料整合平台,適用於從 API、資料庫和檔案到資料倉儲和資料湖的 ELT 管道。它擁有最大的 ELT 連接器目錄,可連接到資料倉儲和資料庫。
此載入器將 Salesforce 連接器作為文件載入器公開,讓您可以將各種 Salesforce 物件載入為文件。
安裝
首先,您需要安裝 airbyte-source-salesforce
python 套件。
%pip install --upgrade --quiet airbyte-source-salesforce
範例
請查看 Airbyte 文件頁面,以了解如何設定讀取器的詳細資訊。config 物件應遵循的 JSON 結構描述可以在 Github 上找到:https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml。
一般形狀如下所示
{
"client_id": "<oauth client id>",
"client_secret": "<oauth client secret>",
"refresh_token": "<oauth refresh token>",
"start_date": "<date from which to start retrieving records from in ISO format, e.g. 2020-10-20T00:00:00Z>",
"is_sandbox": False, # set to True if you're using a sandbox environment
"streams_criteria": [ # Array of filters for salesforce objects that should be loadable
{"criteria": "exacts", "value": "Account"}, # Exact name of salesforce object
{"criteria": "starts with", "value": "Asset"}, # Prefix of the name
# Other allowed criteria: ends with, contains, starts not with, ends not with, not contains, not exacts
],
}
預設情況下,所有欄位都以中繼資料的形式儲存在文件中,而文字則設定為空字串。透過轉換讀取器傳回的文件來建構文件的文字。
from langchain_community.document_loaders.airbyte import AirbyteSalesforceLoader
config = {
# your salesforce configuration
}
loader = AirbyteSalesforceLoader(
config=config, stream_name="asset"
) # check the documentation linked above for a list of all streams
現在您可以透過一般方式載入文件
docs = loader.load()
由於 load
會傳回列表,因此它會封鎖直到所有文件都載入完成。為了更好地控制此過程,您也可以使用 lazy_load
方法,該方法會改為傳回迭代器
docs_iterator = loader.lazy_load()
請記住,預設情況下,頁面內容為空,而中繼資料物件包含記錄中的所有資訊。若要以不同的方式建立文件,請在建立載入器時傳入 record_handler 函數
from langchain_core.documents import Document
def handle_record(record, id):
return Document(page_content=record.data["title"], metadata=record.data)
loader = AirbyteSalesforceLoader(
config=config, record_handler=handle_record, stream_name="asset"
)
docs = loader.load()
增量載入
某些串流允許增量載入,這表示來源會追蹤已同步的記錄,並且不會再次載入它們。這對於具有大量資料且頻繁更新的來源非常有用。
為了利用這一點,請儲存載入器的 last_state
屬性,並在再次建立載入器時傳入它。這將確保僅載入新記錄。
last_state = loader.last_state # store safely
incremental_loader = AirbyteSalesforceLoader(
config=config, stream_name="asset", state=last_state
)
new_docs = incremental_loader.load()