如何分派自訂回呼事件
本指南假設您熟悉以下概念
- 回呼
- 自訂回呼處理器
- Astream Events API,
astream_events
方法將顯示自訂回呼事件。
在某些情況下,您可能想要從 Runnable 內部分派自訂回呼事件,以便可以在自訂回呼處理器中或透過 Astream Events API 顯示該事件。
例如,如果您有一個包含多個步驟的長時間運行的工具,您可以在步驟之間分派自訂事件,並使用這些自訂事件來監控進度。您也可以向應用程式的最終使用者顯示這些自訂事件,以向他們展示目前任務的進度。
若要分派自訂事件,您需要決定事件的兩個屬性:name
和 data
。
屬性 | 類型 | 描述 |
---|---|---|
name | str | 使用者定義的事件名稱。 |
data | Any | 與事件關聯的資料。這可以是任何內容,但我們建議使其為 JSON 可序列化。 |
- 分派自訂回呼事件需要
langchain-core>=0.2.15
。 - 自訂回呼事件只能從現有的
Runnable
內部分派。 - 如果使用
astream_events
,您必須使用version='v2'
才能看到自訂事件。 - LangSmith 尚不支援傳送或呈現自訂回呼事件。
如果您在 python<=3.10 中執行非同步程式碼,LangChain 無法自動傳播配置,包括 astream_events() 所需的回呼到子 runnable。這是您可能無法看到從自訂 runnable 或工具發出事件的常見原因。
如果您正在執行 python<=3.10,您將需要在非同步環境中手動將 RunnableConfig
物件傳播到子 runnable。有關如何手動傳播配置的範例,請參閱下方 bar
RunnableLambda 的實作。
如果您正在執行 python>=3.11,則 RunnableConfig
將自動傳播到非同步環境中的子 runnable。但是,如果您的程式碼可能在其他 Python 版本中執行,手動傳播 RunnableConfig
仍然是一個好主意。
Astream Events API
使用自訂事件最有效的方式是透過 Astream Events API。
我們可以使用 async
adispatch_custom_event
API 在非同步設定中發出自訂事件。
若要透過 astream events API 查看自訂事件,您需要使用較新的 v2
API 版本的 astream_events
。
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda
from langchain_core.runnables.config import RunnableConfig
@RunnableLambda
async def foo(x: str) -> str:
await adispatch_custom_event("event1", {"x": x})
await adispatch_custom_event("event2", 5)
return x
async for event in foo.astream_events("hello world", version="v2"):
print(event)
{'event': 'on_chain_start', 'data': {'input': 'hello world'}, 'name': 'foo', 'tags': [], 'run_id': 'f354ffe8-4c22-4881-890a-c1cad038a9a6', 'metadata': {}, 'parent_ids': []}
{'event': 'on_custom_event', 'run_id': 'f354ffe8-4c22-4881-890a-c1cad038a9a6', 'name': 'event1', 'tags': [], 'metadata': {}, 'data': {'x': 'hello world'}, 'parent_ids': []}
{'event': 'on_custom_event', 'run_id': 'f354ffe8-4c22-4881-890a-c1cad038a9a6', 'name': 'event2', 'tags': [], 'metadata': {}, 'data': 5, 'parent_ids': []}
{'event': 'on_chain_stream', 'run_id': 'f354ffe8-4c22-4881-890a-c1cad038a9a6', 'name': 'foo', 'tags': [], 'metadata': {}, 'data': {'chunk': 'hello world'}, 'parent_ids': []}
{'event': 'on_chain_end', 'data': {'output': 'hello world'}, 'run_id': 'f354ffe8-4c22-4881-890a-c1cad038a9a6', 'name': 'foo', 'tags': [], 'metadata': {}, 'parent_ids': []}
在 python <= 3.10 中,您必須手動傳播配置!
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda
from langchain_core.runnables.config import RunnableConfig
@RunnableLambda
async def bar(x: str, config: RunnableConfig) -> str:
"""An example that shows how to manually propagate config.
You must do this if you're running python<=3.10.
"""
await adispatch_custom_event("event1", {"x": x}, config=config)
await adispatch_custom_event("event2", 5, config=config)
return x
async for event in bar.astream_events("hello world", version="v2"):
print(event)
{'event': 'on_chain_start', 'data': {'input': 'hello world'}, 'name': 'bar', 'tags': [], 'run_id': 'c787b09d-698a-41b9-8290-92aaa656f3e7', 'metadata': {}, 'parent_ids': []}
{'event': 'on_custom_event', 'run_id': 'c787b09d-698a-41b9-8290-92aaa656f3e7', 'name': 'event1', 'tags': [], 'metadata': {}, 'data': {'x': 'hello world'}, 'parent_ids': []}
{'event': 'on_custom_event', 'run_id': 'c787b09d-698a-41b9-8290-92aaa656f3e7', 'name': 'event2', 'tags': [], 'metadata': {}, 'data': 5, 'parent_ids': []}
{'event': 'on_chain_stream', 'run_id': 'c787b09d-698a-41b9-8290-92aaa656f3e7', 'name': 'bar', 'tags': [], 'metadata': {}, 'data': {'chunk': 'hello world'}, 'parent_ids': []}
{'event': 'on_chain_end', 'data': {'output': 'hello world'}, 'run_id': 'c787b09d-698a-41b9-8290-92aaa656f3e7', 'name': 'bar', 'tags': [], 'metadata': {}, 'parent_ids': []}
非同步回呼處理器
您也可以透過非同步回呼處理器使用分派的事件。
from typing import Any, Dict, List, Optional
from uuid import UUID
from langchain_core.callbacks import AsyncCallbackHandler
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda
from langchain_core.runnables.config import RunnableConfig
class AsyncCustomCallbackHandler(AsyncCallbackHandler):
async def on_custom_event(
self,
name: str,
data: Any,
*,
run_id: UUID,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> None:
print(
f"Received event {name} with data: {data}, with tags: {tags}, with metadata: {metadata} and run_id: {run_id}"
)
@RunnableLambda
async def bar(x: str, config: RunnableConfig) -> str:
"""An example that shows how to manually propagate config.
You must do this if you're running python<=3.10.
"""
await adispatch_custom_event("event1", {"x": x}, config=config)
await adispatch_custom_event("event2", 5, config=config)
return x
async_handler = AsyncCustomCallbackHandler()
await foo.ainvoke(1, {"callbacks": [async_handler], "tags": ["foo", "bar"]})
Received event event1 with data: {'x': 1}, with tags: ['foo', 'bar'], with metadata: {} and run_id: a62b84be-7afd-4829-9947-7165df1f37d9
Received event event2 with data: 5, with tags: ['foo', 'bar'], with metadata: {} and run_id: a62b84be-7afd-4829-9947-7165df1f37d9
1
同步回呼處理器
讓我們看看如何在同步環境中使用 dispatch_custom_event
發出自訂事件。
您必須從現有的 Runnable
內部呼叫 dispatch_custom_event
。
from typing import Any, Dict, List, Optional
from uuid import UUID
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.callbacks.manager import (
dispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda
from langchain_core.runnables.config import RunnableConfig
class CustomHandler(BaseCallbackHandler):
def on_custom_event(
self,
name: str,
data: Any,
*,
run_id: UUID,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> None:
print(
f"Received event {name} with data: {data}, with tags: {tags}, with metadata: {metadata} and run_id: {run_id}"
)
@RunnableLambda
def foo(x: int, config: RunnableConfig) -> int:
dispatch_custom_event("event1", {"x": x})
dispatch_custom_event("event2", {"x": x})
return x
handler = CustomHandler()
foo.invoke(1, {"callbacks": [handler], "tags": ["foo", "bar"]})
Received event event1 with data: {'x': 1}, with tags: ['foo', 'bar'], with metadata: {} and run_id: 27b5ce33-dc26-4b34-92dd-08a89cb22268
Received event event2 with data: {'x': 1}, with tags: ['foo', 'bar'], with metadata: {} and run_id: 27b5ce33-dc26-4b34-92dd-08a89cb22268
1
後續步驟
您已經了解如何發出自訂事件,您可以查看更深入的 astream events 指南,這是利用自訂事件最簡單的方式。