Cassandra 資料庫工具組
Apache Cassandra®
是一個廣泛使用的資料庫,用於儲存交易應用程式資料。大型語言模型中函數和工具的引入,為生成式 AI 應用程式中現有資料開啟了一些令人興奮的用例。
Cassandra Database
工具組使 AI 工程師能夠有效地將 Agent 與 Cassandra 資料整合,並提供以下功能
- 通過最佳化查詢快速存取資料。大多數查詢應在個位數毫秒或更短時間內執行。
- Schema 自省以增強 LLM 推理能力
- 與各種 Cassandra 部署的相容性,包括 Apache Cassandra®、DataStax Enterprise™ 和 DataStax Astra™
- 目前,此工具組僅限於 SELECT 查詢和 Schema 自省操作。(安全第一)
有關建立 Cassandra DB Agent 的更多資訊,請參閱 CQL Agent 食譜
快速開始
- 安裝
cassio
函式庫 - 設定您要連接的 Cassandra 資料庫的環境變數
- 初始化
CassandraDatabase
- 使用
toolkit.get_tools()
將工具傳遞給您的 Agent - 放鬆一下,觀看它為您完成所有工作
運作原理
Cassandra Query Language (CQL)
是與 Cassandra 資料庫互動的主要以人為本的方式。雖然在產生查詢時提供了一些彈性,但它需要 Cassandra 資料建模最佳實務的知識。 LLM 函數呼叫使 Agent 能夠推理,然後選擇工具來滿足請求。使用 LLM 的 Agent 在選擇適當的工具組或工具組鏈時,應使用 Cassandra 特定的邏輯進行推理。這減少了 LLM 被迫提供由上而下解決方案時引入的隨機性。您是否希望 LLM 完全不受限制地存取您的資料庫?是的。可能不是。為了實現這一點,我們提供了一個提示,用於在為 Agent 構建問題時使用
您是一位 Apache Cassandra 專家查詢分析機器人,具有以下功能和規則
- 您將從最終使用者那裡收到有關在資料庫中尋找特定資料的問題。
- 您將檢查資料庫的 Schema 並建立查詢路徑。
- 您將為使用者提供正確的查詢,以找到他們正在尋找的資料,並顯示查詢路徑提供的步驟。
- 您將使用最佳實務來查詢 Apache Cassandra,使用分割區鍵和叢集欄。
- 避免在查詢中使用 ALLOW FILTERING。
- 目標是找到查詢路徑,因此可能需要查詢其他表才能獲得最終答案。
以下是以 JSON 格式表示的查詢路徑範例
{
"query_paths": [
{
"description": "Direct query to users table using email",
"steps": [
{
"table": "user_credentials",
"query":
"SELECT userid FROM user_credentials WHERE email = 'example@example.com';"
},
{
"table": "users",
"query": "SELECT * FROM users WHERE userid = ?;"
}
]
}
]
}
提供的工具
cassandra_db_schema
收集已連線資料庫或特定 Schema 的所有 Schema 資訊。對於 Agent 確定操作至關重要。
cassandra_db_select_table_data
從特定的 Keyspace 和表選取資料。 Agent 可以傳遞用於謂詞的參數以及傳回記錄數量的限制。
cassandra_db_query
cassandra_db_select_table_data
的實驗性替代方案,它接受由 Agent 完全形成的查詢字串,而不是參數。警告:這可能會導致不尋常的查詢,這些查詢可能效能不佳(甚至無法運作)。這可能會在未來版本中移除。如果它做了什麼很酷的事情,我們也想知道。您永遠不知道!
環境設定
安裝以下 Python 模組
pip install ipykernel python-dotenv cassio langchain_openai langchain langchain-community langchainhub
.env 檔案
連線是透過 cassio
使用 auto=True
參數,並且 Notebook 使用 OpenAI。您應該相應地建立 .env
檔案。
對於 Casssandra,設定
CASSANDRA_CONTACT_POINTS
CASSANDRA_USERNAME
CASSANDRA_PASSWORD
CASSANDRA_KEYSPACE
對於 Astra,設定
ASTRA_DB_APPLICATION_TOKEN
ASTRA_DB_DATABASE_ID
ASTRA_DB_KEYSPACE
範例
# Connection to Astra:
ASTRA_DB_DATABASE_ID=a1b2c3d4-...
ASTRA_DB_APPLICATION_TOKEN=AstraCS:...
ASTRA_DB_KEYSPACE=notebooks
# Also set
OPENAI_API_KEY=sk-....
(您也可以修改以下程式碼以直接與 cassio
連線。)
from dotenv import load_dotenv
load_dotenv(override=True)
# Import necessary libraries
import os
import cassio
from langchain import hub
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_community.agent_toolkits.cassandra_database.toolkit import (
CassandraDatabaseToolkit,
)
from langchain_community.tools.cassandra_database.prompt import QUERY_PATH_PROMPT
from langchain_community.utilities.cassandra_database import CassandraDatabase
from langchain_openai import ChatOpenAI
連接至 Cassandra 資料庫
cassio.init(auto=True)
session = cassio.config.resolve_session()
if not session:
raise Exception(
"Check environment configuration or manually configure cassio connection parameters"
)
# Test data pep
session = cassio.config.resolve_session()
session.execute("""DROP KEYSPACE IF EXISTS langchain_agent_test; """)
session.execute(
"""
CREATE KEYSPACE if not exists langchain_agent_test
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
"""
)
session.execute(
"""
CREATE TABLE IF NOT EXISTS langchain_agent_test.user_credentials (
user_email text PRIMARY KEY,
user_id UUID,
password TEXT
);
"""
)
session.execute(
"""
CREATE TABLE IF NOT EXISTS langchain_agent_test.users (
id UUID PRIMARY KEY,
name TEXT,
email TEXT
);"""
)
session.execute(
"""
CREATE TABLE IF NOT EXISTS langchain_agent_test.user_videos (
user_id UUID,
video_id UUID,
title TEXT,
description TEXT,
PRIMARY KEY (user_id, video_id)
);
"""
)
user_id = "522b1fe2-2e36-4cef-a667-cd4237d08b89"
video_id = "27066014-bad7-9f58-5a30-f63fe03718f6"
session.execute(
f"""
INSERT INTO langchain_agent_test.user_credentials (user_id, user_email)
VALUES ({user_id}, 'patrick@datastax.com');
"""
)
session.execute(
f"""
INSERT INTO langchain_agent_test.users (id, name, email)
VALUES ({user_id}, 'Patrick McFadin', 'patrick@datastax.com');
"""
)
session.execute(
f"""
INSERT INTO langchain_agent_test.user_videos (user_id, video_id, title)
VALUES ({user_id}, {video_id}, 'Use Langflow to Build a LangChain LLM Application in 5 Minutes');
"""
)
session.set_keyspace("langchain_agent_test")
# Create a CassandraDatabase instance
# Uses the cassio session to connect to the database
db = CassandraDatabase()
# Choose the LLM that will drive the agent
# Only certain models support this
llm = ChatOpenAI(temperature=0, model="gpt-4-1106-preview")
toolkit = CassandraDatabaseToolkit(db=db)
tools = toolkit.get_tools()
print("Available tools:")
for tool in tools:
print(tool.name + "\t- " + tool.description)
Available tools:
cassandra_db_schema -
Input to this tool is a keyspace name, output is a table description
of Apache Cassandra tables.
If the query is not correct, an error message will be returned.
If an error is returned, report back to the user that the keyspace
doesn't exist and stop.
cassandra_db_query -
Execute a CQL query against the database and get back the result.
If the query is not correct, an error message will be returned.
If an error is returned, rewrite the query, check the query, and try again.
cassandra_db_select_table_data -
Tool for getting data from a table in an Apache Cassandra database.
Use the WHERE clause to specify the predicate for the query that uses the
primary key. A blank predicate will return all rows. Avoid this if possible.
Use the limit to specify the number of rows to return. A blank limit will
return all rows.
prompt = hub.pull("hwchase17/openai-tools-agent")
# Construct the OpenAI Tools agent
agent = create_openai_tools_agent(llm, tools, prompt)
input = (
QUERY_PATH_PROMPT
+ "\n\nHere is your task: Find all the videos that the user with the email address 'patrick@datastax.com' has uploaded to the langchain_agent_test keyspace."
)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
response = agent_executor.invoke({"input": input})
print(response["output"])
[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m
Invoking: `cassandra_db_schema` with `{'keyspace': 'langchain_agent_test'}`
[0m[36;1m[1;3mTable Name: user_credentials
- Keyspace: langchain_agent_test
- Columns
- password (text)
- user_email (text)
- user_id (uuid)
- Partition Keys: (user_email)
- Clustering Keys:
Table Name: user_videos
- Keyspace: langchain_agent_test
- Columns
- description (text)
- title (text)
- user_id (uuid)
- video_id (uuid)
- Partition Keys: (user_id)
- Clustering Keys: (video_id asc)
Table Name: users
- Keyspace: langchain_agent_test
- Columns
- email (text)
- id (uuid)
- name (text)
- Partition Keys: (id)
- Clustering Keys:
[0m[32;1m[1;3m
Invoking: `cassandra_db_select_table_data` with `{'keyspace': 'langchain_agent_test', 'table': 'user_credentials', 'predicate': "user_email = 'patrick@datastax.com'", 'limit': 1}`
[0m[38;5;200m[1;3mRow(user_email='patrick@datastax.com', password=None, user_id=UUID('522b1fe2-2e36-4cef-a667-cd4237d08b89'))[0m[32;1m[1;3m
Invoking: `cassandra_db_select_table_data` with `{'keyspace': 'langchain_agent_test', 'table': 'user_videos', 'predicate': 'user_id = 522b1fe2-2e36-4cef-a667-cd4237d08b89', 'limit': 10}`
[0m[38;5;200m[1;3mRow(user_id=UUID('522b1fe2-2e36-4cef-a667-cd4237d08b89'), video_id=UUID('27066014-bad7-9f58-5a30-f63fe03718f6'), description='DataStax Academy is a free resource for learning Apache Cassandra.', title='DataStax Academy')[0m[32;1m[1;3mTo find all the videos that the user with the email address 'patrick@datastax.com' has uploaded to the `langchain_agent_test` keyspace, we can follow these steps:
1. Query the `user_credentials` table to find the `user_id` associated with the email 'patrick@datastax.com'.
2. Use the `user_id` obtained from the first step to query the `user_videos` table to retrieve all the videos uploaded by the user.
Here is the query path in JSON format:
\`\`\`json
{
"query_paths": [
{
"description": "Find user_id from user_credentials and then query user_videos for all videos uploaded by the user",
"steps": [
{
"table": "user_credentials",
"query": "SELECT user_id FROM user_credentials WHERE user_email = 'patrick@datastax.com';"
},
{
"table": "user_videos",
"query": "SELECT * FROM user_videos WHERE user_id = 522b1fe2-2e36-4cef-a667-cd4237d08b89;"
}
]
}
]
}
\`\`\`
Following this query path, we found that the user with the user_id `522b1fe2-2e36-4cef-a667-cd4237d08b89` has uploaded at least one video with the title 'DataStax Academy' and the description 'DataStax Academy is a free resource for learning Apache Cassandra.' The video_id for this video is `27066014-bad7-9f58-5a30-f63fe03718f6`. If there are more videos, the same query can be used to retrieve them, possibly with an increased limit if necessary.[0m
[1m> Finished chain.[0m
To find all the videos that the user with the email address 'patrick@datastax.com' has uploaded to the `langchain_agent_test` keyspace, we can follow these steps:
1. Query the `user_credentials` table to find the `user_id` associated with the email 'patrick@datastax.com'.
2. Use the `user_id` obtained from the first step to query the `user_videos` table to retrieve all the videos uploaded by the user.
Here is the query path in JSON format:
\`\`\`json
{
"query_paths": [
{
"description": "Find user_id from user_credentials and then query user_videos for all videos uploaded by the user",
"steps": [
{
"table": "user_credentials",
"query": "SELECT user_id FROM user_credentials WHERE user_email = 'patrick@datastax.com';"
},
{
"table": "user_videos",
"query": "SELECT * FROM user_videos WHERE user_id = 522b1fe2-2e36-4cef-a667-cd4237d08b89;"
}
]
}
]
}
\`\`\`
Following this query path, we found that the user with the user_id `522b1fe2-2e36-4cef-a667-cd4237d08b89` has uploaded at least one video with the title 'DataStax Academy' and the description 'DataStax Academy is a free resource for learning Apache Cassandra.' The video_id for this video is `27066014-bad7-9f58-5a30-f63fe03718f6`. If there are more videos, the same query can be used to retrieve them, possibly with an increased limit if necessary.