跳到主要內容
Open on GitHub

串流

串流對於增強建構於 LLM 之上的應用程式的響應能力至關重要。透過逐步顯示輸出,即使在完整響應準備就緒之前,串流也能顯著改善使用者體驗 (UX),尤其是在處理 LLM 的延遲時。

概觀

LLM 生成完整響應通常會產生幾秒鐘的延遲,這在具有多個模型呼叫的複雜應用程式中變得更加明顯。幸運的是,LLM 會迭代生成響應,允許在產生中間結果時顯示這些結果。透過串流這些中間輸出,LangChain 可以在 LLM 驅動的應用程式中實現更流暢的 UX,並在其設計核心中提供對串流的內建支援。

在本指南中,我們將討論 LLM 應用程式中的串流,並探索 LangChain 的串流 API 如何促進應用程式中各種組件的即時輸出。

在 LLM 應用程式中串流什麼

在涉及 LLM 的應用程式中,可以串流幾種類型的資料,以透過減少感知延遲和提高透明度來改善使用者體驗。這些包括

1. 串流 LLM 輸出

最常見且關鍵的串流資料是 LLM 本身產生的輸出。LLM 通常需要時間才能生成完整響應,透過即時串流輸出,使用者可以在產生部分結果時看到這些結果。這提供了即時回饋,並有助於減少使用者的等待時間。

2. 串流管線或工作流程進度

除了僅串流 LLM 輸出外,串流更複雜的工作流程或管線的進度也很有用,讓使用者了解應用程式的整體進度。這可能包括

  • 在 LangGraph 工作流程中: 使用 LangGraph,工作流程由代表各種步驟的節點和邊緣組成。此處的串流涉及追蹤 圖形狀態 的變更,因為個別 節點 請求更新。這允許更精細地監控工作流程中目前處於活動狀態的節點,即時更新工作流程在不同階段的進度狀態。

  • 在 LCEL 管線中:LCEL 管線串流更新涉及捕獲個別 子可執行物件 的進度。例如,當管線的不同步驟或組件執行時,您可以串流目前正在執行的子可執行物件,從而即時了解整個管線的進度。

串流管線或工作流程進度對於向使用者清楚呈現應用程式在執行過程中所處的位置至關重要。

3. 串流自訂資料

在某些情況下,您可能需要串流 自訂資料,這些資料超出管線或工作流程結構提供的資訊。此自訂資訊會注入到工作流程中的特定步驟中,無論該步驟是工具還是 LangGraph 節點。例如,您可以串流有關工具即時執行操作的更新,或 LangGraph 節點的進度。此精細資料直接從步驟內部發出,可更詳細地了解工作流程的執行情況,並且在需要更高可見性的複雜流程中特別有用。

串流 API

LangChain 有兩個主要的 API 用於即時串流輸出。實作 Runnable 介面 的任何組件都支援這些 API,包括 LLM編譯的 LangGraph 圖形,以及使用 LCEL 生成的任何 Runnable。

  1. 同步 stream 和非同步 astream:用於串流來自個別 Runnable(例如,聊天模型)的輸出,因為它們是產生的,或串流使用 LangGraph 建立的任何工作流程。
  2. 僅限非同步 astream_events:使用此 API 取得完全使用 LCEL 建構的 LLM 應用程式的自訂事件和中間輸出。請注意,此 API 可用,但在使用 LangGraph 時不需要。
注意

此外,還有一個 舊版 非同步 astream_log API。不建議新專案使用此 API,它比其他串流 API 更複雜且功能較少。

stream()astream()

stream() 方法傳回一個迭代器,該迭代器會同步產生輸出區塊,因為它們是產生的。您可以使用 for 迴圈即時處理每個區塊。例如,當使用 LLM 時,這允許以增量方式串流輸出,因為它是生成的,從而減少使用者的等待時間。

stream()astream() 方法產生的區塊類型取決於正在串流的組件。例如,當從 LLM 串流時,每個組件都將是一個 AIMessageChunk;但是,對於其他組件,區塊可能會有所不同。

stream() 方法傳回一個迭代器,該迭代器會產生這些區塊,因為它們是產生的。例如,

for chunk in component.stream(some_input):
# IMPORTANT: Keep the processing of each chunk as efficient as possible.
# While you're processing the current chunk, the upstream component is
# waiting to produce the next one. For example, if working with LangGraph,
# graph execution is paused while the current chunk is being processed.
# In extreme cases, this could even result in timeouts (e.g., when llm outputs are
# streamed from an API that has a timeout).
print(chunk)

非同步版本 astream() 的工作方式類似,但專為非阻塞工作流程而設計。您可以在非同步程式碼中使用它來實現相同的即時串流行為。

與聊天模型一起使用

當將 stream()astream() 與聊天模型一起使用時,輸出會以 AIMessageChunks 的形式串流,因為它是 LLM 生成的。這允許您以增量方式呈現或處理 LLM 的輸出,因為它是產生的,這在互動式應用程式或介面中特別有用。

與 LangGraph 一起使用

LangGraph 編譯的圖形是 Runnable,並支援標準串流 API。

當將 streamastream 方法與 LangGraph 一起使用時,您可以選擇 一個或多個 串流模式,這些模式允許您控制串流的輸出類型。可用的串流模式為

  • "values":針對每個步驟發出 狀態 的所有值。
  • "updates":在每個步驟之後,僅發出節點名稱和節點傳回的更新。
  • "debug":針對每個步驟發出偵錯事件。
  • "messages":發出 LLM 訊息 逐個 Token
  • "custom":發出使用 LangGraph 的 StreamWriter 寫入的自訂輸出。

如需更多資訊,請參閱

與 LCEL 一起使用

如果您使用 LangChain 的運算式語言 (LCEL) 組成多個 Runnable,stream()astream() 方法會依慣例串流鏈中最後一個步驟的輸出。這允許以增量方式串流最終處理的結果。LCEL 嘗試最佳化管線中的串流延遲,以便盡快取得最後一個步驟的串流結果。

astream_events

提示

使用 astream_events API 存取完全使用 LCEL 建構的 LLM 應用程式的自訂資料和中間輸出。

雖然此 API 也可用於 LangGraph,但在使用 LangGraph 時通常不需要,因為 streamastream 方法為 LangGraph 圖形提供全面的串流功能。

對於使用 LCEL 建構的鏈,.stream() 方法僅串流鏈中最後一個步驟的輸出。這對於某些應用程式可能已足夠,但當您建立多個 LLM 呼叫在一起的更複雜鏈時,您可能想要使用鏈的中間值以及最終輸出。例如,當建構基於文件的聊天應用程式時,您可能想要傳回來源以及最終生成結果。

有一些方法可以 使用回呼,或者透過以這樣的方式建構您的鏈,使其使用類似鏈式 .assign() 呼叫的方式將中間值傳遞到結尾,但 LangChain 也包含一個 .astream_events() 方法,該方法結合了回呼的彈性和 .stream() 的人體工學。呼叫時,它會傳回一個迭代器,該迭代器會產生 各種事件類型,您可以根據專案的需求篩選和處理這些事件。

以下是一個小範例,僅列印包含串流聊天模型輸出的事件

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_anthropic import ChatAnthropic

model = ChatAnthropic(model="claude-3-sonnet-20240229")

prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
parser = StrOutputParser()
chain = prompt | model | parser

async for event in chain.astream_events({"topic": "parrot"}):
kind = event["event"]
if kind == "on_chat_model_stream":
print(event, end="|", flush=True)

您可以大致將其視為回呼事件的迭代器(儘管格式不同) - 並且您幾乎可以在所有 LangChain 組件上使用它!

請參閱 本指南,以取得有關如何使用 .astream_events() 的更詳細資訊,包括列出可用事件的表格。

將自訂資料寫入串流

若要將自訂資料寫入串流,您需要根據您正在使用的組件選擇以下方法之一

  1. LangGraph 的 StreamWriter 可用於寫入自訂資料,這些資料將在使用 LangGraph 時透過 streamastream API 浮現。重要 這是 LangGraph 功能,因此在使用純 LCEL 時不可用。請參閱 如何串流自訂資料 以取得更多資訊。
  2. dispatch_events / adispatch_events 可用於寫入自訂資料,這些資料將透過 astream_events API 浮現。請參閱 如何分派自訂回呼事件 以取得更多資訊。

「自動串流」聊天模型

LangChain 透過在某些情況下自動啟用串流模式來簡化從 聊天模型 的串流,即使您沒有明確呼叫串流方法。當您使用非串流 invoke 方法但仍想要串流整個應用程式(包括來自聊天模型的中間結果)時,這特別有用。

運作方式

當您在聊天模型上呼叫 invoke(或 ainvoke)方法時,如果 LangChain 偵測到您正在嘗試串流整個應用程式,它將自動切換到串流模式。

在幕後,它會讓 invoke(或 ainvoke)使用 stream(或 astream)方法來產生其輸出。調用的結果將與使用 invoke 的程式碼相同;但是,當串流聊天模型時,LangChain 將負責在 LangChain 的 回呼系統 中調用 on_llm_new_token 事件。這些回呼事件允許 LangGraph stream/astreamastream_events 即時浮現聊天模型的輸出。

範例

def node(state):
...
# The code below uses the invoke method, but LangChain will
# automatically switch to streaming mode
# when it detects that the overall
# application is being streamed.
ai_message = model.invoke(state["messages"])
...

for chunk in compiled_graph.stream(..., mode="messages"):
...

非同步程式設計

LangChain 提供許多方法的同步(sync)和非同步(async)版本。非同步方法通常以 "a" 為字首(例如,ainvokeastream)。在編寫非同步程式碼時,務必始終如一地使用這些非同步方法,以確保非阻塞行為和最佳效能。

如果串流資料未即時顯示,請確保您為工作流程使用了正確的非同步方法。

請查看 LangChain 中的非同步程式設計指南,以取得有關使用 LangChain 編寫非同步程式碼的更多資訊。

請參閱以下操作指南,以取得 LangChain 中串流的特定範例

若要將自訂資料寫入串流,請參閱以下資源


此頁面是否對您有幫助?