如何執行自訂函數
本指南假設您熟悉以下概念
您可以將任意函數用作可執行物件。這對於格式化或當您需要其他 LangChain 組件未提供的功能時非常有用,並且用作可執行物件的自訂函數稱為 RunnableLambdas
。
請注意,這些函數的所有輸入都需要是單一參數。如果您有一個接受多個參數的函數,您應該編寫一個包裝器,該包裝器接受單一字典輸入並將其解包為多個參數。
本指南將涵蓋
- 如何使用
RunnableLambda
建構子和便捷的@chain
裝飾器從自訂函數顯式建立可執行物件 - 在鏈中使用時,將自訂函數強制轉換為可執行物件
- 如何在您的自訂函數中接受和使用運行元數據
- 如何透過讓自訂函數返回生成器來進行串流
使用建構子
下面,我們使用 RunnableLambda
建構子顯式包裝我們的自訂邏輯
%pip install -qU langchain langchain_openai
import os
from getpass import getpass
if "OPENAI_API_KEY" not in os.environ:
os.environ["OPENAI_API_KEY"] = getpass()
from operator import itemgetter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI
def length_function(text):
return len(text)
def _multiple_length_function(text1, text2):
return len(text1) * len(text2)
def multiple_length_function(_dict):
return _multiple_length_function(_dict["text1"], _dict["text2"])
model = ChatOpenAI()
prompt = ChatPromptTemplate.from_template("what is {a} + {b}")
chain = (
{
"a": itemgetter("foo") | RunnableLambda(length_function),
"b": {"text1": itemgetter("foo"), "text2": itemgetter("bar")}
| RunnableLambda(multiple_length_function),
}
| prompt
| model
)
chain.invoke({"foo": "bar", "bar": "gah"})
AIMessage(content='3 + 9 equals 12.', response_metadata={'token_usage': {'completion_tokens': 8, 'prompt_tokens': 14, 'total_tokens': 22}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': 'fp_c2295e73ad', 'finish_reason': 'stop', 'logprobs': None}, id='run-73728de3-e483-49e3-ad54-51bd9570e71a-0')
便捷的 @chain
裝飾器
您還可以透過添加 @chain
裝飾器將任意函數轉換為鏈。這在功能上等同於將函數包裝在如上所示的 RunnableLambda
建構子中。這是一個範例
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import chain
prompt1 = ChatPromptTemplate.from_template("Tell me a joke about {topic}")
prompt2 = ChatPromptTemplate.from_template("What is the subject of this joke: {joke}")
@chain
def custom_chain(text):
prompt_val1 = prompt1.invoke({"topic": text})
output1 = ChatOpenAI().invoke(prompt_val1)
parsed_output1 = StrOutputParser().invoke(output1)
chain2 = prompt2 | ChatOpenAI() | StrOutputParser()
return chain2.invoke({"joke": parsed_output1})
custom_chain.invoke("bears")
'The subject of the joke is the bear and his girlfriend.'
上面,@chain
裝飾器用於將 custom_chain
轉換為可執行物件,我們使用 .invoke()
方法調用它。
如果您正在使用 LangSmith 進行追蹤,您應該在其中看到 custom_chain
追蹤,以及嵌套在下方的對 OpenAI 的調用。
鏈中的自動強制轉換
當在鏈中使用管道運算符 (|
) 中的自訂函數時,您可以省略 RunnableLambda
或 @chain
建構子並依賴強制轉換。這是一個簡單的範例,其中一個函數接收模型的輸出並返回其前五個字母
prompt = ChatPromptTemplate.from_template("tell me a story about {topic}")
model = ChatOpenAI()
chain_with_coerced_function = prompt | model | (lambda x: x.content[:5])
chain_with_coerced_function.invoke({"topic": "bears"})
'Once '
請注意,我們不需要將自訂函數 (lambda x: x.content[:5])
包裝在 RunnableLambda
建構子中,因為管道運算符左側的模型已經是可執行物件。自訂函數被強制轉換為可執行物件。有關更多信息,請參閱本節。
傳遞運行元數據
可執行 lambda 可以選擇性地接受 RunnableConfig 參數,它們可以使用該參數將回調、標籤和其他配置信息傳遞給嵌套運行。
import json
from langchain_core.runnables import RunnableConfig
def parse_or_fix(text: str, config: RunnableConfig):
fixing_chain = (
ChatPromptTemplate.from_template(
"Fix the following text:\n\n\`\`\`text\n{input}\n\`\`\`\nError: {error}"
" Don't narrate, just respond with the fixed data."
)
| model
| StrOutputParser()
)
for _ in range(3):
try:
return json.loads(text)
except Exception as e:
text = fixing_chain.invoke({"input": text, "error": e}, config)
return "Failed to parse"
from langchain_community.callbacks import get_openai_callback
with get_openai_callback() as cb:
output = RunnableLambda(parse_or_fix).invoke(
"{foo: bar}", {"tags": ["my-tag"], "callbacks": [cb]}
)
print(output)
print(cb)
{'foo': 'bar'}
Tokens Used: 62
Prompt Tokens: 56
Completion Tokens: 6
Successful Requests: 1
Total Cost (USD): $9.6e-05
from langchain_community.callbacks import get_openai_callback
with get_openai_callback() as cb:
output = RunnableLambda(parse_or_fix).invoke(
"{foo: bar}", {"tags": ["my-tag"], "callbacks": [cb]}
)
print(output)
print(cb)
{'foo': 'bar'}
Tokens Used: 62
Prompt Tokens: 56
Completion Tokens: 6
Successful Requests: 1
Total Cost (USD): $9.6e-05
串流
RunnableLambda 最適合不需要支援串流的程式碼。如果您需要支援串流(即能夠操作輸入塊並產生輸出塊),請改為使用 RunnableGenerator,如下面的範例所示。
您可以在鏈中使用生成器函數(即使用 yield
關鍵字並像迭代器一樣運作的函數)。
這些生成器的簽名應為 Iterator[Input] -> Iterator[Output]
。或對於非同步生成器:AsyncIterator[Input] -> AsyncIterator[Output]
。
這些對於以下情況很有用
- 實作自訂輸出解析器
- 修改前一步驟的輸出,同時保留串流功能
這是一個逗號分隔列表的自訂輸出解析器範例。首先,我們建立一個將此類列表生成為文本的鏈
from typing import Iterator, List
prompt = ChatPromptTemplate.from_template(
"Write a comma-separated list of 5 animals similar to: {animal}. Do not include numbers"
)
str_chain = prompt | model | StrOutputParser()
for chunk in str_chain.stream({"animal": "bear"}):
print(chunk, end="", flush=True)
lion, tiger, wolf, gorilla, panda
接下來,我們定義一個自訂函數,該函數將聚合當前串流的輸出,並在模型生成列表中的下一個逗號時產生它
# This is a custom parser that splits an iterator of llm tokens
# into a list of strings separated by commas
def split_into_list(input: Iterator[str]) -> Iterator[List[str]]:
# hold partial input until we get a comma
buffer = ""
for chunk in input:
# add current chunk to buffer
buffer += chunk
# while there are commas in the buffer
while "," in buffer:
# split buffer on comma
comma_index = buffer.index(",")
# yield everything before the comma
yield [buffer[:comma_index].strip()]
# save the rest for the next iteration
buffer = buffer[comma_index + 1 :]
# yield the last chunk
yield [buffer.strip()]
list_chain = str_chain | split_into_list
for chunk in list_chain.stream({"animal": "bear"}):
print(chunk, flush=True)
['lion']
['tiger']
['wolf']
['gorilla']
['raccoon']
調用它會給出一個完整的數值陣列
list_chain.invoke({"animal": "bear"})
['lion', 'tiger', 'wolf', 'gorilla', 'raccoon']
非同步版本
如果您在 async
環境中工作,這是上面範例的非同步版本
from typing import AsyncIterator
async def asplit_into_list(
input: AsyncIterator[str],
) -> AsyncIterator[List[str]]: # async def
buffer = ""
async for (
chunk
) in input: # `input` is a `async_generator` object, so use `async for`
buffer += chunk
while "," in buffer:
comma_index = buffer.index(",")
yield [buffer[:comma_index].strip()]
buffer = buffer[comma_index + 1 :]
yield [buffer.strip()]
list_chain = str_chain | asplit_into_list
async for chunk in list_chain.astream({"animal": "bear"}):
print(chunk, flush=True)
['lion']
['tiger']
['wolf']
['gorilla']
['panda']
await list_chain.ainvoke({"animal": "bear"})
['lion', 'tiger', 'wolf', 'gorilla', 'panda']
後續步驟
現在您已經學習了幾種在鏈中使用自訂邏輯的不同方法,以及如何實作串流。
若要了解更多資訊,請參閱本節中關於可執行物件的其他操作指南。