Kafka
Kafka 是一個分散式訊息系統,用於發布和訂閱記錄流。此示範展示如何使用 KafkaChatMessageHistory
從 Kafka 叢集儲存和檢索聊天訊息。
執行示範需要運作中的 Kafka 叢集。您可以按照此指示在本機建立 Kafka 叢集。
from langchain_community.chat_message_histories import KafkaChatMessageHistory
chat_session_id = "chat-message-history-kafka"
bootstrap_servers = "localhost:64797" # host:port. `localhost:Plaintext Ports` if setup Kafka cluster locally
history = KafkaChatMessageHistory(
chat_session_id,
bootstrap_servers,
)
API 參考:KafkaChatMessageHistory
建構 KafkaChatMessageHistory
的選用參數
ttl_ms
:聊天訊息的存活時間,以毫秒為單位。partition
:用於儲存聊天訊息的主題分區號碼。replication_factor
:用於儲存聊天訊息的主題複寫因數。
KafkaChatMessageHistory
內部使用 Kafka 消費者來讀取聊天訊息,並且能夠持久性地標記已消費的位置。它具有以下方法來檢索聊天訊息
messages
:從最後一個訊息繼續消費聊天訊息。messages_from_beginning
:將消費者重置到歷史記錄的開頭並消費訊息。選用參數max_message_count
:要讀取的最大訊息數。max_time_sec
:讀取訊息的最大時間,以秒為單位。
messages_from_latest
:將消費者重置到聊天歷史記錄的結尾,並嘗試消費訊息。選用參數與上述相同。messages_from_last_consumed
:傳回從最後消費的訊息繼續的訊息,與messages
類似,但具有選用參數。
max_message_count
和 max_time_sec
用於避免在檢索訊息時無限期地封鎖。因此,messages
和其他檢索訊息的方法可能不會傳回聊天歷史記錄中的所有訊息。您需要指定 max_message_count
和 max_time_sec
以在單一批次中檢索所有聊天歷史記錄。
新增訊息並檢索。
history.add_user_message("hi!")
history.add_ai_message("whats up?")
history.messages
[HumanMessage(content='hi!'), AIMessage(content='whats up?')]
再次呼叫 messages
會傳回一個空列表,因為消費者位於聊天歷史記錄的結尾。
history.messages
[]
新增新訊息並繼續消費。
history.add_user_message("hi again!")
history.add_ai_message("whats up again?")
history.messages
[HumanMessage(content='hi again!'), AIMessage(content='whats up again?')]
要重置消費者並從頭開始讀取
history.messages_from_beginning()
[HumanMessage(content='hi again!'),
AIMessage(content='whats up again?'),
HumanMessage(content='hi!'),
AIMessage(content='whats up?')]
將消費者設定到聊天歷史記錄的結尾,新增幾個新訊息,然後消費
history.messages_from_latest()
history.add_user_message("HI!")
history.add_ai_message("WHATS UP?")
history.messages
[HumanMessage(content='HI!'), AIMessage(content='WHATS UP?')]