Cassandra 資料庫工具組
Apache Cassandra®
是一個廣泛使用的資料庫,用於儲存交易應用程式資料。大型語言模型中函數和 >工具的引入,為生成式 AI 應用程式中的現有資料開啟了一些令人興奮的用例。
Cassandra Database
工具組使 AI 工程師能夠有效率地將代理程式與 Cassandra 資料整合,並提供以下功能
- 透過最佳化查詢快速存取資料。大多數查詢應在個位數毫秒或更短時間內執行。
- 結構描述內省,以增強 LLM 推理能力
- 與各種 Cassandra 部署的相容性,包括 Apache Cassandra®、DataStax Enterprise™ 和 DataStax Astra™
- 目前,此工具組僅限於 SELECT 查詢和結構描述內省操作。(安全第一)
如需建立 Cassandra DB 代理程式的更多資訊,請參閱 CQL 代理程式食譜
快速開始
- 安裝
cassio
程式庫 - 設定您要連線的 Cassandra 資料庫的環境變數
- 初始化
CassandraDatabase
- 使用
toolkit.get_tools()
將工具傳遞給您的代理程式 - 放鬆並觀看它為您完成所有工作
操作理論
Cassandra Query Language (CQL)
是與 Cassandra 資料庫互動的主要以人為本的方式。雖然在產生查詢時提供了一些彈性,但它需要 Cassandra 資料建模最佳實務的知識。 LLM 函數呼叫使代理程式能夠推理,然後選擇工具來滿足請求。使用 LLM 的代理程式在選擇適當的工具組或工具組鏈時,應使用 Cassandra 特定的邏輯進行推理。這減少了 LLM 被迫提供由上而下解決方案時引入的隨機性。您希望 LLM 能夠完全不受限制地存取您的資料庫嗎? 耶。可能不會。為了實現這一點,我們提供了一個提示,用於在為代理程式建構問題時使用
您是一位 Apache Cassandra 專家查詢分析機器人,具有以下功能和規則
- 您將從最終使用者那裡接收到關於在資料庫中尋找特定資料的問題。
- 您將檢查資料庫的結構描述並建立查詢路徑。
- 您將為使用者提供正確的查詢,以尋找他們正在尋找的資料,並顯示查詢路徑提供的步驟。
- 您將使用最佳實務來查詢 Apache Cassandra,使用分割區鍵和叢集欄。
- 避免在查詢中使用 ALLOW FILTERING。
- 目標是找到查詢路徑,因此可能需要查詢其他表格才能獲得最終答案。
以下是以 JSON 格式呈現的查詢路徑範例
{
"query_paths": [
{
"description": "Direct query to users table using email",
"steps": [
{
"table": "user_credentials",
"query":
"SELECT userid FROM user_credentials WHERE email = 'example@example.com';"
},
{
"table": "users",
"query": "SELECT * FROM users WHERE userid = ?;"
}
]
}
]
}
提供的工具
cassandra_db_schema
收集已連線資料庫或特定結構描述的所有結構描述資訊。對於代理程式決定操作至關重要。
cassandra_db_select_table_data
從特定的 keyspace 和表格中選取資料。代理程式可以傳遞謂詞的參數以及傳回記錄數量的限制。
cassandra_db_query
cassandra_db_select_table_data
的實驗性替代方案,它接受由代理程式完全形成的查詢字串,而不是參數。警告:這可能會導致異常查詢,這些查詢的效能可能不如預期(甚至無法運作)。這可能會在未來版本中移除。如果它做了很酷的事情,我們也想知道。您永遠不知道!
環境設定
安裝以下 Python 模組
pip install ipykernel python-dotenv cassio langchain_openai langchain langchain-community langchainhub
.env 檔案
連線是透過 cassio
使用 auto=True
參數,並且 notebook 使用 OpenAI。您應據此建立 .env
檔案。
對於 Casssandra,設定
CASSANDRA_CONTACT_POINTS
CASSANDRA_USERNAME
CASSANDRA_PASSWORD
CASSANDRA_KEYSPACE
對於 Astra,設定
ASTRA_DB_APPLICATION_TOKEN
ASTRA_DB_DATABASE_ID
ASTRA_DB_KEYSPACE
例如
# Connection to Astra:
ASTRA_DB_DATABASE_ID=a1b2c3d4-...
ASTRA_DB_APPLICATION_TOKEN=AstraCS:...
ASTRA_DB_KEYSPACE=notebooks
# Also set
OPENAI_API_KEY=sk-....
(您也可以修改以下程式碼以直接與 cassio
連線。)
from dotenv import load_dotenv
load_dotenv(override=True)
# Import necessary libraries
import os
import cassio
from langchain import hub
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_community.agent_toolkits.cassandra_database.toolkit import (
CassandraDatabaseToolkit,
)
from langchain_community.tools.cassandra_database.prompt import QUERY_PATH_PROMPT
from langchain_community.utilities.cassandra_database import CassandraDatabase
from langchain_openai import ChatOpenAI
連線到 Cassandra 資料庫
cassio.init(auto=True)
session = cassio.config.resolve_session()
if not session:
raise Exception(
"Check environment configuration or manually configure cassio connection parameters"
)
# Test data pep
session = cassio.config.resolve_session()
session.execute("""DROP KEYSPACE IF EXISTS langchain_agent_test; """)
session.execute(
"""
CREATE KEYSPACE if not exists langchain_agent_test
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
"""
)
session.execute(
"""
CREATE TABLE IF NOT EXISTS langchain_agent_test.user_credentials (
user_email text PRIMARY KEY,
user_id UUID,
password TEXT
);
"""
)
session.execute(
"""
CREATE TABLE IF NOT EXISTS langchain_agent_test.users (
id UUID PRIMARY KEY,
name TEXT,
email TEXT
);"""
)
session.execute(
"""
CREATE TABLE IF NOT EXISTS langchain_agent_test.user_videos (
user_id UUID,
video_id UUID,
title TEXT,
description TEXT,
PRIMARY KEY (user_id, video_id)
);
"""
)
user_id = "522b1fe2-2e36-4cef-a667-cd4237d08b89"
video_id = "27066014-bad7-9f58-5a30-f63fe03718f6"
session.execute(
f"""
INSERT INTO langchain_agent_test.user_credentials (user_id, user_email)
VALUES ({user_id}, 'patrick@datastax.com');
"""
)
session.execute(
f"""
INSERT INTO langchain_agent_test.users (id, name, email)
VALUES ({user_id}, 'Patrick McFadin', 'patrick@datastax.com');
"""
)
session.execute(
f"""
INSERT INTO langchain_agent_test.user_videos (user_id, video_id, title)
VALUES ({user_id}, {video_id}, 'Use Langflow to Build a LangChain LLM Application in 5 Minutes');
"""
)
session.set_keyspace("langchain_agent_test")
# Create a CassandraDatabase instance
# Uses the cassio session to connect to the database
db = CassandraDatabase()
# Choose the LLM that will drive the agent
# Only certain models support this
llm = ChatOpenAI(temperature=0, model="gpt-4-1106-preview")
toolkit = CassandraDatabaseToolkit(db=db)
tools = toolkit.get_tools()
print("Available tools:")
for tool in tools:
print(tool.name + "\t- " + tool.description)
Available tools:
cassandra_db_schema -
Input to this tool is a keyspace name, output is a table description
of Apache Cassandra tables.
If the query is not correct, an error message will be returned.
If an error is returned, report back to the user that the keyspace
doesn't exist and stop.
cassandra_db_query -
Execute a CQL query against the database and get back the result.
If the query is not correct, an error message will be returned.
If an error is returned, rewrite the query, check the query, and try again.
cassandra_db_select_table_data -
Tool for getting data from a table in an Apache Cassandra database.
Use the WHERE clause to specify the predicate for the query that uses the
primary key. A blank predicate will return all rows. Avoid this if possible.
Use the limit to specify the number of rows to return. A blank limit will
return all rows.
prompt = hub.pull("hwchase17/openai-tools-agent")
# Construct the OpenAI Tools agent
agent = create_openai_tools_agent(llm, tools, prompt)
input = (
QUERY_PATH_PROMPT
+ "\n\nHere is your task: Find all the videos that the user with the email address 'patrick@datastax.com' has uploaded to the langchain_agent_test keyspace."
)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
response = agent_executor.invoke({"input": input})
print(response["output"])
[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m
Invoking: `cassandra_db_schema` with `{'keyspace': 'langchain_agent_test'}`
[0m[36;1m[1;3mTable Name: user_credentials
- Keyspace: langchain_agent_test
- Columns
- password (text)
- user_email (text)
- user_id (uuid)
- Partition Keys: (user_email)
- Clustering Keys:
Table Name: user_videos
- Keyspace: langchain_agent_test
- Columns
- description (text)
- title (text)
- user_id (uuid)
- video_id (uuid)
- Partition Keys: (user_id)
- Clustering Keys: (video_id asc)
Table Name: users
- Keyspace: langchain_agent_test
- Columns
- email (text)
- id (uuid)
- name (text)
- Partition Keys: (id)
- Clustering Keys:
[0m[32;1m[1;3m
Invoking: `cassandra_db_select_table_data` with `{'keyspace': 'langchain_agent_test', 'table': 'user_credentials', 'predicate': "user_email = 'patrick@datastax.com'", 'limit': 1}`
[0m[38;5;200m[1;3mRow(user_email='patrick@datastax.com', password=None, user_id=UUID('522b1fe2-2e36-4cef-a667-cd4237d08b89'))[0m[32;1m[1;3m
Invoking: `cassandra_db_select_table_data` with `{'keyspace': 'langchain_agent_test', 'table': 'user_videos', 'predicate': 'user_id = 522b1fe2-2e36-4cef-a667-cd4237d08b89', 'limit': 10}`
[0m[38;5;200m[1;3mRow(user_id=UUID('522b1fe2-2e36-4cef-a667-cd4237d08b89'), video_id=UUID('27066014-bad7-9f58-5a30-f63fe03718f6'), description='DataStax Academy is a free resource for learning Apache Cassandra.', title='DataStax Academy')[0m[32;1m[1;3mTo find all the videos that the user with the email address 'patrick@datastax.com' has uploaded to the `langchain_agent_test` keyspace, we can follow these steps:
1. Query the `user_credentials` table to find the `user_id` associated with the email 'patrick@datastax.com'.
2. Use the `user_id` obtained from the first step to query the `user_videos` table to retrieve all the videos uploaded by the user.
Here is the query path in JSON format:
\`\`\`json
{
"query_paths": [
{
"description": "Find user_id from user_credentials and then query user_videos for all videos uploaded by the user",
"steps": [
{
"table": "user_credentials",
"query": "SELECT user_id FROM user_credentials WHERE user_email = 'patrick@datastax.com';"
},
{
"table": "user_videos",
"query": "SELECT * FROM user_videos WHERE user_id = 522b1fe2-2e36-4cef-a667-cd4237d08b89;"
}
]
}
]
}
\`\`\`
Following this query path, we found that the user with the user_id `522b1fe2-2e36-4cef-a667-cd4237d08b89` has uploaded at least one video with the title 'DataStax Academy' and the description 'DataStax Academy is a free resource for learning Apache Cassandra.' The video_id for this video is `27066014-bad7-9f58-5a30-f63fe03718f6`. If there are more videos, the same query can be used to retrieve them, possibly with an increased limit if necessary.[0m
[1m> Finished chain.[0m
To find all the videos that the user with the email address 'patrick@datastax.com' has uploaded to the `langchain_agent_test` keyspace, we can follow these steps:
1. Query the `user_credentials` table to find the `user_id` associated with the email 'patrick@datastax.com'.
2. Use the `user_id` obtained from the first step to query the `user_videos` table to retrieve all the videos uploaded by the user.
Here is the query path in JSON format:
\`\`\`json
{
"query_paths": [
{
"description": "Find user_id from user_credentials and then query user_videos for all videos uploaded by the user",
"steps": [
{
"table": "user_credentials",
"query": "SELECT user_id FROM user_credentials WHERE user_email = 'patrick@datastax.com';"
},
{
"table": "user_videos",
"query": "SELECT * FROM user_videos WHERE user_id = 522b1fe2-2e36-4cef-a667-cd4237d08b89;"
}
]
}
]
}
\`\`\`
Following this query path, we found that the user with the user_id `522b1fe2-2e36-4cef-a667-cd4237d08b89` has uploaded at least one video with the title 'DataStax Academy' and the description 'DataStax Academy is a free resource for learning Apache Cassandra.' The video_id for this video is `27066014-bad7-9f58-5a30-f63fe03718f6`. If there are more videos, the same query can be used to retrieve them, possibly with an increased limit if necessary.