跳到主要內容
Open In ColabOpen on GitHub

Kafka

Kafka 是一個分散式訊息系統,用於發布和訂閱記錄流。此示範展示如何使用 KafkaChatMessageHistory 從 Kafka 叢集儲存和檢索聊天訊息。

執行示範需要運作中的 Kafka 叢集。您可以按照此指示在本機建立 Kafka 叢集。

from langchain_community.chat_message_histories import KafkaChatMessageHistory

chat_session_id = "chat-message-history-kafka"
bootstrap_servers = "localhost:64797" # host:port. `localhost:Plaintext Ports` if setup Kafka cluster locally
history = KafkaChatMessageHistory(
chat_session_id,
bootstrap_servers,
)

建構 KafkaChatMessageHistory 的選用參數

  • ttl_ms:聊天訊息的存活時間,以毫秒為單位。
  • partition:用於儲存聊天訊息的主題分區號碼。
  • replication_factor:用於儲存聊天訊息的主題複寫因數。

KafkaChatMessageHistory 內部使用 Kafka 消費者來讀取聊天訊息,並且能夠持久性地標記已消費的位置。它具有以下方法來檢索聊天訊息

  • messages:從最後一個訊息繼續消費聊天訊息。
  • messages_from_beginning:將消費者重置到歷史記錄的開頭並消費訊息。選用參數
    1. max_message_count:要讀取的最大訊息數。
    2. max_time_sec:讀取訊息的最大時間,以秒為單位。
  • messages_from_latest:將消費者重置到聊天歷史記錄的結尾,並嘗試消費訊息。選用參數與上述相同。
  • messages_from_last_consumed:傳回從最後消費的訊息繼續的訊息,與 messages 類似,但具有選用參數。

max_message_countmax_time_sec 用於避免在檢索訊息時無限期地封鎖。因此,messages 和其他檢索訊息的方法可能不會傳回聊天歷史記錄中的所有訊息。您需要指定 max_message_countmax_time_sec 以在單一批次中檢索所有聊天歷史記錄。

新增訊息並檢索。

history.add_user_message("hi!")
history.add_ai_message("whats up?")

history.messages
[HumanMessage(content='hi!'), AIMessage(content='whats up?')]

再次呼叫 messages 會傳回一個空列表,因為消費者位於聊天歷史記錄的結尾。

history.messages
[]

新增新訊息並繼續消費。

history.add_user_message("hi again!")
history.add_ai_message("whats up again?")
history.messages
[HumanMessage(content='hi again!'), AIMessage(content='whats up again?')]

要重置消費者並從頭開始讀取

history.messages_from_beginning()
[HumanMessage(content='hi again!'),
AIMessage(content='whats up again?'),
HumanMessage(content='hi!'),
AIMessage(content='whats up?')]

將消費者設定到聊天歷史記錄的結尾,新增幾個新訊息,然後消費

history.messages_from_latest()
history.add_user_message("HI!")
history.add_ai_message("WHATS UP?")
history.messages
[HumanMessage(content='HI!'), AIMessage(content='WHATS UP?')]

此頁面是否對您有幫助?