跳到主要內容
Open In ColabOpen on GitHub

如何串流 Runnable

先決條件

本指南假設您熟悉以下概念

串流對於使基於 LLM 的應用程式對終端使用者感覺反應靈敏至關重要。

重要的 LangChain 基礎元件,例如聊天模型輸出解析器提示檢索器代理,皆實作 LangChain Runnable 介面

此介面提供兩種通用方法來串流內容

  1. 同步 stream 和非同步 astream:串流的預設實作,用於串流來自鏈的最終輸出
  2. 非同步 astream_events 和非同步 astream_log:這些方法提供了一種串流來自鏈的中間步驟最終輸出的方式。

讓我們看看這兩種方法,並嘗試了解如何使用它們。

資訊

有關 LangChain 中串流技術的更高級概述,請參閱概念指南的此章節

使用 Stream

所有 Runnable 物件都實作一個名為 stream 的同步方法和一個名為 astream 的非同步變體。

這些方法旨在以區塊串流最終輸出,並在每個區塊可用時立即產生。

只有當程式中的所有步驟都知道如何處理輸入串流時,串流才有可能實現;即一次處理一個輸入區塊,並產生相應的輸出區塊。

此處理的複雜性可能有所不同,從像發出 LLM 產生的 Token 這樣簡單的任務,到在整個 JSON 完成之前串流 JSON 結果的部分內容等更具挑戰性的任務。

開始探索串流的最佳位置是 LLM 應用程式中最重要的一個組件——LLM 本身!

LLM 和聊天模型

大型語言模型及其聊天變體是基於 LLM 的應用程式的主要瓶頸。

大型語言模型可能需要數秒才能產生對查詢的完整回應。這遠慢於應用程式對終端使用者感覺反應靈敏的 ~200-300 毫秒閾值。

使應用程式感覺更靈敏的關鍵策略是顯示中間進度;即,逐個 Token 串流模型的輸出。

我們將展示使用聊天模型進行串流的範例。從以下選項中選擇一個

pip install -qU "langchain[openai]"
import getpass
import os

if not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter API key for OpenAI: ")

from langchain.chat_models import init_chat_model

model = init_chat_model("gpt-4o-mini", model_provider="openai")

讓我們從同步 stream API 開始

chunks = []
for chunk in model.stream("what color is the sky?"):
chunks.append(chunk)
print(chunk.content, end="|", flush=True)
The| sky| appears| blue| during| the| day|.|

或者,如果您在非同步環境中工作,您可以考慮使用非同步 astream API

chunks = []
async for chunk in model.astream("what color is the sky?"):
chunks.append(chunk)
print(chunk.content, end="|", flush=True)
The| sky| appears| blue| during| the| day|.|

讓我們檢查其中一個區塊

chunks[0]
AIMessageChunk(content='The', id='run-b36bea64-5511-4d7a-b6a3-a07b3db0c8e7')

我們取回了一些稱為 AIMessageChunk 的東西。此區塊代表 AIMessage 的一部分。

訊息區塊在設計上是累加的——可以簡單地將它們加起來以獲得目前的回應狀態!

chunks[0] + chunks[1] + chunks[2] + chunks[3] + chunks[4]
AIMessageChunk(content='The sky appears blue during', id='run-b36bea64-5511-4d7a-b6a3-a07b3db0c8e7')

幾乎所有 LLM 應用程式都涉及比僅調用語言模型更多的步驟。

讓我們使用 LangChain 運算式語言 (LCEL) 建構一個簡單的鏈,將提示、模型和解析器結合在一起,並驗證串流是否有效。

我們將使用StrOutputParser來解析模型的輸出。這是一個簡單的解析器,可從 AIMessageChunk 中提取 content 欄位,從而為我們提供模型傳回的 token

提示

LCEL 是一種宣告式方式,透過鏈接不同的 LangChain 基礎元件來指定「程式」。使用 LCEL 建立的鏈受益於 streamastream 的自動實作,從而允許串流最終輸出。實際上,使用 LCEL 建立的鏈實作了整個標準 Runnable 介面。

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
parser = StrOutputParser()
chain = prompt | model | parser

async for chunk in chain.astream({"topic": "parrot"}):
print(chunk, end="|", flush=True)
Here|'s| a| joke| about| a| par|rot|:|

A man| goes| to| a| pet| shop| to| buy| a| par|rot|.| The| shop| owner| shows| him| two| stunning| pa|rr|ots| with| beautiful| pl|um|age|.|

"|There|'s| a| talking| par|rot| an|d a| non|-|talking| par|rot|,"| the| owner| says|.| "|The| talking| par|rot| costs| $|100|,| an|d the| non|-|talking| par|rot| is| $|20|."|

The| man| says|,| "|I|'ll| take| the| non|-|talking| par|rot| at| $|20|."|

He| pays| an|d leaves| with| the| par|rot|.| As| he|'s| walking| down| the| street|,| the| par|rot| looks| up| at| him| an|d says|,| "|You| know|,| you| really| are| a| stupi|d man|!"|

The| man| is| stun|ne|d an|d looks| at| the| par|rot| in| dis|bel|ief|.| The| par|rot| continues|,| "|Yes|,| you| got| r|ippe|d off| big| time|!| I| can| talk| just| as| well| as| that| other| par|rot|,| an|d you| only| pai|d $|20| |for| me|!"|

請注意,即使我們在鏈的末端使用了 parser,我們仍然獲得了串流輸出。parser 單獨對每個串流區塊進行操作。許多LCEL 基礎元件也支援這種轉換樣式的直通串流,這在建構應用程式時非常方便。

自訂函數可以設計為傳回產生器,產生器能夠對串流進行操作。

某些 Runnable,例如提示範本聊天模型,無法處理個別區塊,而是聚合所有先前的步驟。此類 Runnable 可能會中斷串流過程。

注意

LangChain 運算式語言允許您將鏈的建構與其使用模式(例如,同步/非同步、批次/串流等)分開。如果這與您正在建構的內容無關,您也可以透過個別呼叫每個組件上的 invokebatchstream,將結果分配給變數,然後在下游適當地使用它們,從而依賴標準的命令式程式設計方法。

使用輸入串流

如果您想從輸出中串流 JSON,因為它是產生的?

如果您要依賴 json.loads 來解析部分 JSON,則解析將會失敗,因為部分 JSON 不是有效的 JSON。

您可能會完全不知所措,並聲稱無法串流 JSON。

嗯,事實證明有一種方法可以做到——解析器需要對輸入串流進行操作,並嘗試將部分 JSON「自動完成」為有效狀態。

讓我們看看這樣的解析器在運作中的情況,以了解這意味著什麼。

from langchain_core.output_parsers import JsonOutputParser

chain = (
model | JsonOutputParser()
) # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models
async for text in chain.astream(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`"
):
print(text, flush=True)
API 參考:JsonOutputParser
{}
{'countries': []}
{'countries': [{}]}
{'countries': [{'name': ''}]}
{'countries': [{'name': 'France'}]}
{'countries': [{'name': 'France', 'population': 67}]}
{'countries': [{'name': 'France', 'population': 67413}]}
{'countries': [{'name': 'France', 'population': 67413000}]}
{'countries': [{'name': 'France', 'population': 67413000}, {}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain'}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {'name': 'Japan'}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {'name': 'Japan', 'population': 125}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {'name': 'Japan', 'population': 125584}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {'name': 'Japan', 'population': 125584000}]}

現在,讓我們中斷串流。我們將使用先前的範例,並在末尾附加一個提取函數,該函數從最終 JSON 中提取國家/地區名稱。

警告

鏈中任何對最終輸入而不是對輸入串流進行操作的步驟,都可能透過 streamastream 中斷串流功能。

提示

稍後,我們將討論 astream_events API,該 API 串流來自中間步驟的結果。即使鏈包含僅對最終輸入進行操作的步驟,此 API 也將串流來自中間步驟的結果。

from langchain_core.output_parsers import (
JsonOutputParser,
)


# A function that operates on finalized inputs
# rather than on an input_stream
def _extract_country_names(inputs):
"""A function that does not operates on input streams and breaks streaming."""
if not isinstance(inputs, dict):
return ""

if "countries" not in inputs:
return ""

countries = inputs["countries"]

if not isinstance(countries, list):
return ""

country_names = [
country.get("name") for country in countries if isinstance(country, dict)
]
return country_names


chain = model | JsonOutputParser() | _extract_country_names

async for text in chain.astream(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`"
):
print(text, end="|", flush=True)
API 參考:JsonOutputParser
['France', 'Spain', 'Japan']|

產生器函數

讓我們使用可以對輸入串流進行操作的產生器函數來修復串流。

提示

產生器函數(使用 yield 的函數)允許編寫對輸入串流進行操作的程式碼

from langchain_core.output_parsers import JsonOutputParser


async def _extract_country_names_streaming(input_stream):
"""A function that operates on input streams."""
country_names_so_far = set()

async for input in input_stream:
if not isinstance(input, dict):
continue

if "countries" not in input:
continue

countries = input["countries"]

if not isinstance(countries, list):
continue

for country in countries:
name = country.get("name")
if not name:
continue
if name not in country_names_so_far:
yield name
country_names_so_far.add(name)


chain = model | JsonOutputParser() | _extract_country_names_streaming

async for text in chain.astream(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
):
print(text, end="|", flush=True)
API 參考:JsonOutputParser
France|Spain|Japan|
注意

由於上面的程式碼依賴 JSON 自動完成,您可能會看到國家/地區名稱的部分名稱(例如,SpSpain),這不是人們希望的提取結果!

我們專注於串流概念,而不一定是鏈的結果。

非串流組件

某些內建組件(如檢索器)不提供任何 streaming。如果我們嘗試 stream 它們會發生什麼事?🤨

from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddings

template = """Answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

vectorstore = FAISS.from_texts(
["harrison worked at kensho", "harrison likes spicy food"],
embedding=OpenAIEmbeddings(),
)
retriever = vectorstore.as_retriever()

chunks = [chunk for chunk in retriever.stream("where did harrison work?")]
chunks
[[Document(page_content='harrison worked at kensho'),
Document(page_content='harrison likes spicy food')]]

Stream 僅產生該組件的最終結果。

這還可以接受 🥹!並非所有組件都必須實作串流——在某些情況下,串流是不必要的、困難的或根本沒有意義。

提示

使用非串流組件建構的 LCEL 鏈,在許多情況下仍然能夠串流,部分輸出的串流在鏈中的最後一個非串流步驟之後開始。

retrieval_chain = (
{
"context": retriever.with_config(run_name="Docs"),
"question": RunnablePassthrough(),
}
| prompt
| model
| StrOutputParser()
)
for chunk in retrieval_chain.stream(
"Where did harrison work? " "Write 3 made up sentences about this place."
):
print(chunk, end="|", flush=True)
Base|d on| the| given| context|,| Harrison| worke|d at| K|ens|ho|.|

Here| are| |3| |made| up| sentences| about| this| place|:|

1|.| K|ens|ho| was| a| cutting|-|edge| technology| company| known| for| its| innovative| solutions| in| artificial| intelligence| an|d data| analytics|.|

2|.| The| modern| office| space| at| K|ens|ho| feature|d open| floor| plans|,| collaborative| work|sp|aces|,| an|d a| vib|rant| atmosphere| that| fos|tere|d creativity| an|d team|work|.|

3|.| With| its| prime| location| in| the| heart| of| the| city|,| K|ens|ho| attracte|d top| talent| from| aroun|d the| worl|d,| creating| a| diverse| an|d dynamic| work| environment|.|

現在我們已經了解了 streamastream 的運作方式,讓我們冒險進入串流事件的世界 🏞️

使用串流事件

事件串流是一個 beta API。此 API 可能會根據回饋進行少量變更。

注意

本指南示範了 V2 API,並且需要 langchain-core >= 0.2。對於與舊版 LangChain 相容的 V1 API,請參閱此處

import langchain_core

langchain_core.__version__

為了使 astream_events API 正常運作

  • 在整個程式碼中盡可能使用 async(例如,非同步工具等)
  • 如果定義自訂函數/Runnable,則傳播回呼
  • 每當在沒有 LCEL 的情況下使用 Runnable 時,請務必在 LLM 上呼叫 .astream() 而不是 .ainvoke,以強制 LLM 串流 Token。
  • 如果任何事情未按預期運作,請告訴我們!:)

事件參考

以下是一個參考表,顯示了各種 Runnable 物件可能發出的一些事件。

注意

當正確實作串流時,Runnable 的輸入將在輸入串流完全耗用後才知道。這表示 inputs 通常僅包含在 end 事件中,而不是包含在 start 事件中。

事件名稱區塊輸入輸出
on_chat_model_start[模型名稱]{"messages": [[SystemMessage, HumanMessage]]}
on_chat_model_stream[模型名稱]AIMessageChunk(content="hello")
on_chat_model_end[模型名稱]{"messages": [[SystemMessage, HumanMessage]]}AIMessageChunk(content="hello world")
on_llm_start[模型名稱]{'input': 'hello'}
on_llm_stream[模型名稱]'Hello'
on_llm_end[模型名稱]'Hello human!'
on_chain_startformat_docs
on_chain_streamformat_docs"hello world!, goodbye world!"
on_chain_endformat_docs[Document(...)]"hello world!, goodbye world!"
on_tool_startsome_tool{"x": 1, "y": "2"}
on_tool_endsome_tool{"x": 1, "y": "2"}
on_retriever_start[檢索器名稱]{"query": "hello"}
on_retriever_end[檢索器名稱]{"query": "hello"}[Document(...), ..]
on_prompt_start[範本名稱]{"question": "hello"}
on_prompt_end[範本名稱]{"question": "hello"}ChatPromptValue(messages: [SystemMessage, ...])

聊天模型

讓我們從查看聊天模型產生的事件開始。

events = []
async for event in model.astream_events("hello"):
events.append(event)
注意

對於 langchain-core<0.3.37,請明確設定 version kwarg(例如,model.astream_events("hello", version="v2"))。

讓我們看看一些開始事件和一些結束事件。

events[:3]
[{'event': 'on_chat_model_start',
'data': {'input': 'hello'},
'name': 'ChatAnthropic',
'tags': [],
'run_id': 'b18d016d-8b9b-49e7-a555-44db498fcf66',
'metadata': {'ls_provider': 'anthropic',
'ls_model_name': 'claude-3-sonnet-20240229',
'ls_model_type': 'chat',
'ls_temperature': 0.0,
'ls_max_tokens': 1024},
'parent_ids': []},
{'event': 'on_chat_model_stream',
'run_id': 'b18d016d-8b9b-49e7-a555-44db498fcf66',
'name': 'ChatAnthropic',
'tags': [],
'metadata': {'ls_provider': 'anthropic',
'ls_model_name': 'claude-3-sonnet-20240229',
'ls_model_type': 'chat',
'ls_temperature': 0.0,
'ls_max_tokens': 1024},
'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-b18d016d-8b9b-49e7-a555-44db498fcf66', usage_metadata={'input_tokens': 8, 'output_tokens': 4, 'total_tokens': 12, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})},
'parent_ids': []},
{'event': 'on_chat_model_stream',
'run_id': 'b18d016d-8b9b-49e7-a555-44db498fcf66',
'name': 'ChatAnthropic',
'tags': [],
'metadata': {'ls_provider': 'anthropic',
'ls_model_name': 'claude-3-sonnet-20240229',
'ls_model_type': 'chat',
'ls_temperature': 0.0,
'ls_max_tokens': 1024},
'data': {'chunk': AIMessageChunk(content='Hello! How can', additional_kwargs={}, response_metadata={}, id='run-b18d016d-8b9b-49e7-a555-44db498fcf66')},
'parent_ids': []}]
events[-2:]
[{'event': 'on_chat_model_stream',
'run_id': 'b18d016d-8b9b-49e7-a555-44db498fcf66',
'name': 'ChatAnthropic',
'tags': [],
'metadata': {'ls_provider': 'anthropic',
'ls_model_name': 'claude-3-sonnet-20240229',
'ls_model_type': 'chat',
'ls_temperature': 0.0,
'ls_max_tokens': 1024},
'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={'stop_reason': 'end_turn', 'stop_sequence': None}, id='run-b18d016d-8b9b-49e7-a555-44db498fcf66', usage_metadata={'input_tokens': 0, 'output_tokens': 12, 'total_tokens': 12, 'input_token_details': {}})},
'parent_ids': []},
{'event': 'on_chat_model_end',
'data': {'output': AIMessageChunk(content='Hello! How can I assist you today?', additional_kwargs={}, response_metadata={'stop_reason': 'end_turn', 'stop_sequence': None}, id='run-b18d016d-8b9b-49e7-a555-44db498fcf66', usage_metadata={'input_tokens': 8, 'output_tokens': 16, 'total_tokens': 24, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})},
'run_id': 'b18d016d-8b9b-49e7-a555-44db498fcf66',
'name': 'ChatAnthropic',
'tags': [],
'metadata': {'ls_provider': 'anthropic',
'ls_model_name': 'claude-3-sonnet-20240229',
'ls_model_type': 'chat',
'ls_temperature': 0.0,
'ls_max_tokens': 1024},
'parent_ids': []}]

讓我們重新審視解析串流 JSON 的範例鏈,以探索串流事件 API。

chain = (
model | JsonOutputParser()
) # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models

events = [
event
async for event in chain.astream_events(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
)
]

如果您查看前幾個事件,您會注意到有 3 個不同的開始事件,而不是 2 個開始事件。

這三個開始事件對應於

  1. 鏈(模型 + 解析器)
  2. 模型
  3. 解析器
events[:3]
[{'event': 'on_chain_start',
'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'},
'name': 'RunnableSequence',
'tags': [],
'run_id': '4765006b-16e2-4b1d-a523-edd9fd64cb92',
'metadata': {}},
{'event': 'on_chat_model_start',
'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`')]]}},
'name': 'ChatAnthropic',
'tags': ['seq:step:1'],
'run_id': '0320c234-7b52-4a14-ae4e-5f100949e589',
'metadata': {}},
{'event': 'on_chat_model_stream',
'data': {'chunk': AIMessageChunk(content='{', id='run-0320c234-7b52-4a14-ae4e-5f100949e589')},
'run_id': '0320c234-7b52-4a14-ae4e-5f100949e589',
'name': 'ChatAnthropic',
'tags': ['seq:step:1'],
'metadata': {}}]

您認為如果您查看最後 3 個事件會看到什麼?中間的事件呢?

讓我們使用此 API 來輸出模型和解析器的串流事件。我們忽略了開始事件、結束事件和來自鏈的事件。

num_events = 0

async for event in chain.astream_events(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
):
kind = event["event"]
if kind == "on_chat_model_stream":
print(
f"Chat model chunk: {repr(event['data']['chunk'].content)}",
flush=True,
)
if kind == "on_parser_stream":
print(f"Parser chunk: {event['data']['chunk']}", flush=True)
num_events += 1
if num_events > 30:
# Truncate the output
print("...")
break
Chat model chunk: ''
Chat model chunk: '{'
Parser chunk: {}
Chat model chunk: '\n "countries'
Chat model chunk: '": [\n '
Parser chunk: {'countries': []}
Chat model chunk: '{\n "'
Parser chunk: {'countries': [{}]}
Chat model chunk: 'name": "France'
Parser chunk: {'countries': [{'name': 'France'}]}
Chat model chunk: '",\n "'
Chat model chunk: 'population": 67'
Parser chunk: {'countries': [{'name': 'France', 'population': 67}]}
Chat model chunk: '413'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413}]}
Chat model chunk: '000\n },'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413000}]}
Chat model chunk: '\n {'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413000}, {}]}
Chat model chunk: '\n "name":'
...

由於模型和解析器都支援串流,因此我們即時看到了來自這兩個組件的串流事件!有點酷,不是嗎?🦜

篩選事件

由於此 API 產生了如此多的事件,因此能夠篩選事件非常有用。

您可以依組件 name、組件 tags 或組件 type 進行篩選。

依名稱

chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
{"run_name": "my_parser"}
)

max_events = 0
async for event in chain.astream_events(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
include_names=["my_parser"],
):
print(event)
max_events += 1
if max_events > 10:
# Truncate output
print("...")
break
{'event': 'on_parser_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'my_parser', 'tags': ['seq:step:2'], 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'metadata': {}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': []}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France'}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67413}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67413000}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67413000}, {}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain'}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
...

依類型

chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
{"run_name": "my_parser"}
)

max_events = 0
async for event in chain.astream_events(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
include_types=["chat_model"],
):
print(event)
max_events += 1
if max_events > 10:
# Truncate output
print("...")
break
{'event': 'on_chat_model_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'model', 'tags': ['seq:step:1'], 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c', usage_metadata={'input_tokens': 56, 'output_tokens': 1, 'total_tokens': 57, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='{', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\n "countries', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='": [\n ', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='{\n "', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='name": "France', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='",\n "', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='population": 67', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='413', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='000\n },', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
...

依標籤

注意

標籤由給定 Runnable 的子組件繼承。

如果您使用標籤進行篩選,請確保這是您想要的。

chain = (model | JsonOutputParser()).with_config({"tags": ["my_chain"]})

max_events = 0
async for event in chain.astream_events(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
include_tags=["my_chain"],
):
print(event)
max_events += 1
if max_events > 10:
# Truncate output
print("...")
break
{'event': 'on_chain_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'RunnableSequence', 'tags': ['my_chain'], 'run_id': '58d1302e-36ce-4df7-a3cb-47cb73d57e44', 'metadata': {}, 'parent_ids': []}
{'event': 'on_chat_model_start', 'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`', additional_kwargs={}, response_metadata={})]]}}, 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'run_id': '8222e8a1-d978-4f30-87fc-b2dba838774b', 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-8222e8a1-d978-4f30-87fc-b2dba838774b', usage_metadata={'input_tokens': 56, 'output_tokens': 1, 'total_tokens': 57, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})}, 'run_id': '8222e8a1-d978-4f30-87fc-b2dba838774b', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_parser_start', 'data': {}, 'name': 'JsonOutputParser', 'tags': ['seq:step:2', 'my_chain'], 'run_id': '75604c84-e1e6-494a-8b2a-950f45d932e8', 'metadata': {}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='{', additional_kwargs={}, response_metadata={}, id='run-8222e8a1-d978-4f30-87fc-b2dba838774b')}, 'run_id': '8222e8a1-d978-4f30-87fc-b2dba838774b', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_parser_stream', 'run_id': '75604c84-e1e6-494a-8b2a-950f45d932e8', 'name': 'JsonOutputParser', 'tags': ['seq:step:2', 'my_chain'], 'metadata': {}, 'data': {'chunk': {}}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_chain_stream', 'run_id': '58d1302e-36ce-4df7-a3cb-47cb73d57e44', 'name': 'RunnableSequence', 'tags': ['my_chain'], 'metadata': {}, 'data': {'chunk': {}}, 'parent_ids': []}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\n "countries', additional_kwargs={}, response_metadata={}, id='run-8222e8a1-d978-4f30-87fc-b2dba838774b')}, 'run_id': '8222e8a1-d978-4f30-87fc-b2dba838774b', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='": [\n ', additional_kwargs={}, response_metadata={}, id='run-8222e8a1-d978-4f30-87fc-b2dba838774b')}, 'run_id': '8222e8a1-d978-4f30-87fc-b2dba838774b', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_parser_stream', 'run_id': '75604c84-e1e6-494a-8b2a-950f45d932e8', 'name': 'JsonOutputParser', 'tags': ['seq:step:2', 'my_chain'], 'metadata': {}, 'data': {'chunk': {'countries': []}}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_chain_stream', 'run_id': '58d1302e-36ce-4df7-a3cb-47cb73d57e44', 'name': 'RunnableSequence', 'tags': ['my_chain'], 'metadata': {}, 'data': {'chunk': {'countries': []}}, 'parent_ids': []}
...

非串流組件

還記得某些組件由於不對輸入串流進行操作而無法良好串流嗎?

雖然此類組件在使用 astream 時可能會中斷最終輸出的串流,但 astream_events 仍將產生來自支援串流的中間步驟的串流事件!

# Function that does not support streaming.
# It operates on the finalizes inputs rather than
# operating on the input stream.
def _extract_country_names(inputs):
"""A function that does not operates on input streams and breaks streaming."""
if not isinstance(inputs, dict):
return ""

if "countries" not in inputs:
return ""

countries = inputs["countries"]

if not isinstance(countries, list):
return ""

country_names = [
country.get("name") for country in countries if isinstance(country, dict)
]
return country_names


chain = (
model | JsonOutputParser() | _extract_country_names
) # This parser only works with OpenAI right now

正如預期的那樣,astream API 無法正常運作,因為 _extract_country_names 不對串流進行操作。

async for chunk in chain.astream(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
):
print(chunk, flush=True)
['France', 'Spain', 'Japan']

現在,讓我們確認使用 astream_events 我們仍然可以看到來自模型和解析器的串流輸出。

num_events = 0

async for event in chain.astream_events(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
):
kind = event["event"]
if kind == "on_chat_model_stream":
print(
f"Chat model chunk: {repr(event['data']['chunk'].content)}",
flush=True,
)
if kind == "on_parser_stream":
print(f"Parser chunk: {event['data']['chunk']}", flush=True)
num_events += 1
if num_events > 30:
# Truncate the output
print("...")
break
Chat model chunk: ''
Chat model chunk: '{'
Parser chunk: {}
Chat model chunk: '\n "countries'
Chat model chunk: '": [\n '
Parser chunk: {'countries': []}
Chat model chunk: '{\n "'
Parser chunk: {'countries': [{}]}
Chat model chunk: 'name": "France'
Parser chunk: {'countries': [{'name': 'France'}]}
Chat model chunk: '",\n "'
Chat model chunk: 'population": 67'
Parser chunk: {'countries': [{'name': 'France', 'population': 67}]}
Chat model chunk: '413'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413}]}
Chat model chunk: '000\n },'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413000}]}
Chat model chunk: '\n {'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413000}, {}]}
Chat model chunk: '\n "name":'
Chat model chunk: ' "Spain",'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain'}]}
Chat model chunk: '\n "population":'
Chat model chunk: ' 47'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47}]}
Chat model chunk: '351'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351}]}
...

傳播回呼

注意

如果您在工具內部使用調用 Runnable,則需要將回呼傳播到 Runnable;否則,將不會產生串流事件。

注意

當使用 RunnableLambdas@chain 裝飾器時,回呼會在幕後自動傳播。

from langchain_core.runnables import RunnableLambda
from langchain_core.tools import tool


def reverse_word(word: str):
return word[::-1]


reverse_word = RunnableLambda(reverse_word)


@tool
def bad_tool(word: str):
"""Custom tool that doesn't propagate callbacks."""
return reverse_word.invoke(word)


async for event in bad_tool.astream_events("hello"):
print(event)
API 參考:RunnableLambda | tool
{'event': 'on_tool_start', 'data': {'input': 'hello'}, 'name': 'bad_tool', 'tags': [], 'run_id': 'ea900472-a8f7-425d-b627-facdef936ee8', 'metadata': {}}
{'event': 'on_chain_start', 'data': {'input': 'hello'}, 'name': 'reverse_word', 'tags': [], 'run_id': '77b01284-0515-48f4-8d7c-eb27c1882f86', 'metadata': {}}
{'event': 'on_chain_end', 'data': {'output': 'olleh', 'input': 'hello'}, 'run_id': '77b01284-0515-48f4-8d7c-eb27c1882f86', 'name': 'reverse_word', 'tags': [], 'metadata': {}}
{'event': 'on_tool_end', 'data': {'output': 'olleh'}, 'run_id': 'ea900472-a8f7-425d-b627-facdef936ee8', 'name': 'bad_tool', 'tags': [], 'metadata': {}}

以下是正確傳播回呼的重新實作。您會注意到,現在我們也從 reverse_word Runnable 取得事件。

@tool
def correct_tool(word: str, callbacks):
"""A tool that correctly propagates callbacks."""
return reverse_word.invoke(word, {"callbacks": callbacks})


async for event in correct_tool.astream_events("hello"):
print(event)
{'event': 'on_tool_start', 'data': {'input': 'hello'}, 'name': 'correct_tool', 'tags': [], 'run_id': 'd5ea83b9-9278-49cc-9f1d-aa302d671040', 'metadata': {}}
{'event': 'on_chain_start', 'data': {'input': 'hello'}, 'name': 'reverse_word', 'tags': [], 'run_id': '44dafbf4-2f87-412b-ae0e-9f71713810df', 'metadata': {}}
{'event': 'on_chain_end', 'data': {'output': 'olleh', 'input': 'hello'}, 'run_id': '44dafbf4-2f87-412b-ae0e-9f71713810df', 'name': 'reverse_word', 'tags': [], 'metadata': {}}
{'event': 'on_tool_end', 'data': {'output': 'olleh'}, 'run_id': 'd5ea83b9-9278-49cc-9f1d-aa302d671040', 'name': 'correct_tool', 'tags': [], 'metadata': {}}

如果您從 Runnable Lambda 或 @chains 內部調用 Runnable,則回呼將代表您自動傳遞。

from langchain_core.runnables import RunnableLambda


async def reverse_and_double(word: str):
return await reverse_word.ainvoke(word) * 2


reverse_and_double = RunnableLambda(reverse_and_double)

await reverse_and_double.ainvoke("1234")

async for event in reverse_and_double.astream_events("1234"):
print(event)
API 參考:RunnableLambda
{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_and_double', 'tags': [], 'run_id': '03b0e6a1-3e60-42fc-8373-1e7829198d80', 'metadata': {}}
{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_word', 'tags': [], 'run_id': '5cf26fc8-840b-4642-98ed-623dda28707a', 'metadata': {}}
{'event': 'on_chain_end', 'data': {'output': '4321', 'input': '1234'}, 'run_id': '5cf26fc8-840b-4642-98ed-623dda28707a', 'name': 'reverse_word', 'tags': [], 'metadata': {}}
{'event': 'on_chain_stream', 'data': {'chunk': '43214321'}, 'run_id': '03b0e6a1-3e60-42fc-8373-1e7829198d80', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}}
{'event': 'on_chain_end', 'data': {'output': '43214321'}, 'run_id': '03b0e6a1-3e60-42fc-8373-1e7829198d80', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}}

以及 @chain 裝飾器

from langchain_core.runnables import chain


@chain
async def reverse_and_double(word: str):
return await reverse_word.ainvoke(word) * 2


await reverse_and_double.ainvoke("1234")

async for event in reverse_and_double.astream_events("1234"):
print(event)
API 參考:chain
{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_and_double', 'tags': [], 'run_id': '1bfcaedc-f4aa-4d8e-beee-9bba6ef17008', 'metadata': {}}
{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_word', 'tags': [], 'run_id': '64fc99f0-5d7d-442b-b4f5-4537129f67d1', 'metadata': {}}
{'event': 'on_chain_end', 'data': {'output': '4321', 'input': '1234'}, 'run_id': '64fc99f0-5d7d-442b-b4f5-4537129f67d1', 'name': 'reverse_word', 'tags': [], 'metadata': {}}
{'event': 'on_chain_stream', 'data': {'chunk': '43214321'}, 'run_id': '1bfcaedc-f4aa-4d8e-beee-9bba6ef17008', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}}
{'event': 'on_chain_end', 'data': {'output': '43214321'}, 'run_id': '1bfcaedc-f4aa-4d8e-beee-9bba6ef17008', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}}

後續步驟

現在您已經學習了一些使用 LangChain 串流最終輸出和內部步驟的方法。

若要了解更多資訊,請查看本章節中的其他操作指南,或Langchain 運算式語言的概念指南


此頁面是否有幫助?