跳到主要內容
Open In ColabOpen on GitHub

Valthera

讓 AI 代理能夠在使用者最有可能回應時與其互動。

Overview

Valthera 是一個開源框架,使 LLM 代理能夠以更有意義的方式與使用者互動。它建立在 BJ Fogg 的行為模型 (B=MAT) 之上,並利用來自多個來源(例如 HubSpot、PostHog 和 Snowflake)的數據,在觸發操作之前評估使用者的動機能力

在本指南中,您將學習

  • 核心概念: 組件概述(數據聚合器、評分器、推理引擎和觸發器產生器)。
  • 系統架構: 數據如何在系統中流動以及如何做出決策。
  • 自訂: 如何擴展連接器、評分指標和決策規則以符合您的需求。

讓我們深入了解!

Setup

本節涵蓋 Valthera 的依賴項安裝和自訂數據連接器的設定。

pip install openai langchain langchain_openai valthera langchain_valthera langgraph
from typing import Any, Dict, List

from valthera.connectors.base import BaseConnector


class MockHubSpotConnector(BaseConnector):
"""
Simulates data retrieval from HubSpot. Provides information such as lead score,
lifecycle stage, and marketing metrics.
"""

def get_user_data(self, user_id: str) -> Dict[str, Any]:
"""
Retrieve mock HubSpot data for a given user.

Args:
user_id: The unique identifier for the user

Returns:
A dictionary containing HubSpot user data
"""
return {
"hubspot_contact_id": "999-ZZZ",
"lifecycle_stage": "opportunity",
"lead_status": "engaged",
"hubspot_lead_score": 100,
"company_name": "MaxMotivation Corp.",
"last_contacted_date": "2023-09-20",
"hubspot_marketing_emails_opened": 20,
"marketing_emails_clicked": 10,
}


class MockPostHogConnector(BaseConnector):
"""
Simulates data retrieval from PostHog. Provides session data and engagement events.
"""

def get_user_data(self, user_id: str) -> Dict[str, Any]:
"""
Retrieve mock PostHog data for a given user.

Args:
user_id: The unique identifier for the user

Returns:
A dictionary containing PostHog user data
"""
return {
"distinct_ids": [user_id, f"email_{user_id}"],
"last_event_timestamp": "2023-09-20T12:34:56Z",
"feature_flags": ["beta_dashboard", "early_access"],
"posthog_session_count": 30,
"avg_session_duration_sec": 400,
"recent_event_types": ["pageview", "button_click", "premium_feature_used"],
"posthog_events_count_past_30days": 80,
"posthog_onboarding_steps_completed": 5,
}


class MockSnowflakeConnector(BaseConnector):
"""
Simulates retrieval of additional user profile data from Snowflake.
"""

def get_user_data(self, user_id: str) -> Dict[str, Any]:
"""
Retrieve mock Snowflake data for a given user.

Args:
user_id: The unique identifier for the user

Returns:
A dictionary containing Snowflake user data
"""
return {
"user_id": user_id,
"email": f"{user_id}@example.com",
"subscription_status": "paid",
"plan_tier": "premium",
"account_creation_date": "2023-01-01",
"preferred_language": "en",
"last_login_datetime": "2023-09-20T12:00:00Z",
"behavior_complexity": 3,
}

Instantiation

在本節中,我們將實例化核心組件。首先,我們建立一個數據聚合器以合併來自自訂連接器的數據。然後,我們設定動機和能力的評分指標。

from valthera.aggregator import DataAggregator

# Constants for configuration
LEAD_SCORE_MAX = 100
EVENTS_COUNT_MAX = 50
EMAILS_OPENED_FACTOR = 10.0
SESSION_COUNT_FACTOR_1 = 5.0
ONBOARDING_STEPS_FACTOR = 5.0
SESSION_COUNT_FACTOR_2 = 10.0
BEHAVIOR_COMPLEXITY_MAX = 5.0

# Initialize data aggregator
data_aggregator = DataAggregator(
connectors={
"hubspot": MockHubSpotConnector(),
"posthog": MockPostHogConnector(),
"snowflake": MockSnowflakeConnector(),
}
)

# You can now fetch unified user data by calling data_aggregator.get_user_context(user_id)
from typing import Callable, Union

from valthera.scorer import ValtheraScorer


# Define transform functions with proper type annotations
def transform_lead_score(x: Union[int, float]) -> float:
"""Transform lead score to a value between 0 and 1."""
return min(x, LEAD_SCORE_MAX) / LEAD_SCORE_MAX


def transform_events_count(x: Union[int, float]) -> float:
"""Transform events count to a value between 0 and 1."""
return min(x, EVENTS_COUNT_MAX) / EVENTS_COUNT_MAX


def transform_emails_opened(x: Union[int, float]) -> float:
"""Transform emails opened to a value between 0 and 1."""
return min(x / EMAILS_OPENED_FACTOR, 1.0)


def transform_session_count_1(x: Union[int, float]) -> float:
"""Transform session count for motivation to a value between 0 and 1."""
return min(x / SESSION_COUNT_FACTOR_1, 1.0)


def transform_onboarding_steps(x: Union[int, float]) -> float:
"""Transform onboarding steps to a value between 0 and 1."""
return min(x / ONBOARDING_STEPS_FACTOR, 1.0)


def transform_session_count_2(x: Union[int, float]) -> float:
"""Transform session count for ability to a value between 0 and 1."""
return min(x / SESSION_COUNT_FACTOR_2, 1.0)


def transform_behavior_complexity(x: Union[int, float]) -> float:
"""Transform behavior complexity to a value between 0 and 1."""
return 1 - (min(x, BEHAVIOR_COMPLEXITY_MAX) / BEHAVIOR_COMPLEXITY_MAX)


# Scoring configuration for user motivation
motivation_config = [
{"key": "hubspot_lead_score", "weight": 0.30, "transform": transform_lead_score},
{
"key": "posthog_events_count_past_30days",
"weight": 0.30,
"transform": transform_events_count,
},
{
"key": "hubspot_marketing_emails_opened",
"weight": 0.20,
"transform": transform_emails_opened,
},
{
"key": "posthog_session_count",
"weight": 0.20,
"transform": transform_session_count_1,
},
]

# Scoring configuration for user ability
ability_config = [
{
"key": "posthog_onboarding_steps_completed",
"weight": 0.30,
"transform": transform_onboarding_steps,
},
{
"key": "posthog_session_count",
"weight": 0.30,
"transform": transform_session_count_2,
},
{
"key": "behavior_complexity",
"weight": 0.40,
"transform": transform_behavior_complexity,
},
]

# Instantiate the scorer
scorer = ValtheraScorer(motivation_config, ability_config)

Invocation

接下來,我們設定推理引擎和觸發器產生器,然後透過實例化 Valthera 工具將所有組件整合在一起。最後,我們執行代理工作流程以處理輸入訊息。

import os

from langchain_openai import ChatOpenAI
from valthera.reasoning_engine import ReasoningEngine

# Define threshold as constant
SCORE_THRESHOLD = 0.75


# Function to safely get API key
def get_openai_api_key() -> str:
"""Get OpenAI API key with error handling."""
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key:
raise ValueError("OPENAI_API_KEY not found in environment variables")
return api_key


# Decision rules using constant
decision_rules = [
{
"condition": f"motivation >= {SCORE_THRESHOLD} and ability >= {SCORE_THRESHOLD}",
"action": "trigger",
"description": "Both scores are high enough.",
},
{
"condition": f"motivation < {SCORE_THRESHOLD}",
"action": "improve_motivation",
"description": "User motivation is low.",
},
{
"condition": f"ability < {SCORE_THRESHOLD}",
"action": "improve_ability",
"description": "User ability is low.",
},
{
"condition": "otherwise",
"action": "defer",
"description": "No action needed at this time.",
},
]

try:
api_key = get_openai_api_key()
reasoning_engine = ReasoningEngine(
llm=ChatOpenAI(
model_name="gpt-4-turbo", temperature=0.0, openai_api_key=api_key
),
decision_rules=decision_rules,
)
except ValueError as e:
print(f"Error initializing reasoning engine: {e}")
API Reference:ChatOpenAI
from valthera.trigger_generator import TriggerGenerator

try:
api_key = get_openai_api_key() # Reuse the function for consistency
trigger_generator = TriggerGenerator(
llm=ChatOpenAI(
model_name="gpt-4-turbo", temperature=0.7, openai_api_key=api_key
)
)
except ValueError as e:
print(f"Error initializing trigger generator: {e}")
from langchain_valthera.tools import ValtheraTool
from langgraph.prebuilt import create_react_agent

try:
api_key = get_openai_api_key()

# Initialize Valthera tool
valthera_tool = ValtheraTool(
data_aggregator=data_aggregator,
motivation_config=motivation_config,
ability_config=ability_config,
reasoning_engine=reasoning_engine,
trigger_generator=trigger_generator,
)

# Create agent with LLM
llm = ChatOpenAI(model_name="gpt-4-turbo", temperature=0.0, openai_api_key=api_key)
tools = [valthera_tool]
graph = create_react_agent(llm, tools=tools)

# Define input message for testing
inputs = {
"messages": [("user", "Evaluate behavior for user_12345: Finish Onboarding")]
}

# Process the input and display responses
print("Running Valthera agent workflow...")
for response in graph.stream(inputs, stream_mode="values"):
print(response)

except Exception as e:
print(f"Error running Valthera workflow: {e}")
API Reference:create_react_agent

Chaining

此整合目前不支援鏈接操作。未來版本可能會包含鏈接支援。

API reference

以下是 Valthera 整合提供的主要 API 概述

  • Data Aggregator: 使用 data_aggregator.get_user_context(user_id) 獲取聚合的使用者數據。
  • Scorer: ValtheraScorer 根據提供的設定計算動機和能力分數。
  • Reasoning Engine: ReasoningEngine 評估決策規則以確定適當的操作(觸發、提高動機、提高能力或延遲)。
  • Trigger Generator: 使用 LLM 產生個人化的觸發訊息。
  • Valthera Tool: 整合所有組件以處理輸入並執行代理工作流程。

如需詳細使用方式,請參閱原始碼中的內嵌文件。


這個頁面有幫助嗎?