跳到主要內容
Open In ColabOpen on GitHub

如何並行調用可執行物件

先決條件

本指南假設您熟悉以下概念

RunnableParallel 原始物件本質上是一個字典,其值為可執行物件(或可以強制轉換為可執行物件的事物,例如函數)。它並行執行其所有值,並且每個值都使用 RunnableParallel 的整體輸入來調用。最終傳回值是一個字典,其中包含每個值在其適當鍵下的結果。

使用 RunnableParallels 格式化

RunnableParallels 對於並行化操作很有用,但對於操作一個 Runnable 的輸出以匹配序列中下一個 Runnable 的輸入格式也很有用。您可以使用它們來分割或分叉鏈,以便多個組件可以並行處理輸入。稍後,其他組件可以加入或合併結果以合成最終回應。此類型的鏈會建立如下所示的計算圖

     Input
/ \
/ \
Branch1 Branch2
\ /
\ /
Combine

在下面,提示的輸入預期為具有鍵 "context""question" 的映射。使用者輸入只是問題。因此,我們需要使用我們的檢索器取得上下文,並在 "question" 鍵下傳遞使用者輸入。

from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings

vectorstore = FAISS.from_texts(
["harrison worked at kensho"], embedding=OpenAIEmbeddings()
)
retriever = vectorstore.as_retriever()
template = """Answer the question based only on the following context:
{context}

Question: {question}
"""

# The prompt expects input with keys for "context" and "question"
prompt = ChatPromptTemplate.from_template(template)

model = ChatOpenAI()

retrieval_chain = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt
| model
| StrOutputParser()
)

retrieval_chain.invoke("where did harrison work?")
'Harrison worked at Kensho.'
提示

請注意,在將 RunnableParallel 與另一個 Runnable 組合時,我們甚至不需要將字典包裝在 RunnableParallel 類別中 — 類型轉換會為我們處理。在鏈的上下文中,這些是等效的

{"context": retriever, "question": RunnablePassthrough()}
RunnableParallel({"context": retriever, "question": RunnablePassthrough()})
RunnableParallel(context=retriever, question=RunnablePassthrough())

請參閱關於 強制轉換 的章節以了解更多資訊。

使用 itemgetter 作為簡寫

請注意,您可以將 Python 的 itemgetter 用作簡寫,以便在與 RunnableParallel 組合時從映射中提取資料。您可以在 Python 文件中找到有關 itemgetter 的更多資訊。

在下面的範例中,我們使用 itemgetter 從映射中提取特定鍵

from operator import itemgetter

from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings

vectorstore = FAISS.from_texts(
["harrison worked at kensho"], embedding=OpenAIEmbeddings()
)
retriever = vectorstore.as_retriever()

template = """Answer the question based only on the following context:
{context}

Question: {question}

Answer in the following language: {language}
"""
prompt = ChatPromptTemplate.from_template(template)

chain = (
{
"context": itemgetter("question") | retriever,
"question": itemgetter("question"),
"language": itemgetter("language"),
}
| prompt
| model
| StrOutputParser()
)

chain.invoke({"question": "where did harrison work", "language": "italian"})
'Harrison ha lavorato a Kensho.'

並行化步驟

RunnableParallels 使您可以輕鬆地並行執行多個 Runnable,並將這些 Runnable 的輸出作為映射傳回。

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel
from langchain_openai import ChatOpenAI

model = ChatOpenAI()
joke_chain = ChatPromptTemplate.from_template("tell me a joke about {topic}") | model
poem_chain = (
ChatPromptTemplate.from_template("write a 2-line poem about {topic}") | model
)

map_chain = RunnableParallel(joke=joke_chain, poem=poem_chain)

map_chain.invoke({"topic": "bear"})
{'joke': AIMessage(content="Why don't bears like fast food? Because they can't catch it!", response_metadata={'token_usage': {'completion_tokens': 15, 'prompt_tokens': 13, 'total_tokens': 28}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': 'fp_d9767fc5b9', 'finish_reason': 'stop', 'logprobs': None}, id='run-fe024170-c251-4b7a-bfd4-64a3737c67f2-0'),
'poem': AIMessage(content='In the quiet of the forest, the bear roams free\nMajestic and wild, a sight to see.', response_metadata={'token_usage': {'completion_tokens': 24, 'prompt_tokens': 15, 'total_tokens': 39}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': 'fp_c2295e73ad', 'finish_reason': 'stop', 'logprobs': None}, id='run-2707913e-a743-4101-b6ec-840df4568a76-0')}

並行性

RunnableParallel 也可用於並行執行獨立進程,因為映射中的每個 Runnable 都是並行執行的。例如,我們可以看到我們先前的 joke_chainpoem_chainmap_chain 都具有大約相同的執行時間,即使 map_chain 執行了其他兩個。

%%timeit

joke_chain.invoke({"topic": "bear"})
610 ms ± 64 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit

poem_chain.invoke({"topic": "bear"})
599 ms ± 73.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit

map_chain.invoke({"topic": "bear"})
643 ms ± 77.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

後續步驟

您現在知道使用 RunnableParallel 格式化和並行化鏈步驟的一些方法。

若要了解更多資訊,請參閱本節中關於可執行物件的其他操作指南。


此頁面是否對您有幫助?