CLI Usage (#593)

TLDR; 

The philosophy of CLI usage is "fire and forget" and "best effort". You
can opt out by setting `ARCADE_USAGE_TRACKING=0`.

We are capturing two events: `CLI execution succeeded` and `CLI
execution failed`. Reporting to PostHog is a short lived (maximum 10
seconds) subprocess that does not block the main CLI execution process.

`~/.arcade/usage.json` persists two values `anon_id` and
`linked_principal_id`. The logged in status of the CLI user determines
which ID is used. Upon `arcade login`, the `anon_id` is aliased with
`linked_principal_id`. Upon `arcade logout` the `linked_principal_id` is
removed and the `anon_id` is rotated.

## CLI Usage Tracking - How It Works

The usage tracking system implements an identity management and event
tracking pipeline. Here's how the pieces work together:

### **Identity State Management (`usage.json`)**

The system maintains a persistent identity file at
`~/.arcade/usage.json` with this structure:
```json
{
  "anon_id": "uuid",
  "linked_principal_id": "uuid" | null
}
```

**Key mechanics:**
- **`anon_id`**: Generated once on first CLI use and persists across
sessions. This UUID tracks all anonymous activity.
- **`linked_principal_id`**: Initially `null`. Once the user logs in and
we successfully alias their identity, this field stores their
`principal_id` to indicate this `anon_id` has been linked.
- **Atomic writes**: All updates use a temp file + atomic rename pattern
to prevent corruption from concurrent CLI processes
- **File locking**: Uses `fcntl` (Unix) to coordinate reads/writes
across multiple simultaneous CLI invocations
- **In-memory cache**: The `UsageIdentity` class caches the loaded data
to avoid repeated file I/O within a single CLI invocation

### **Identity Resolution Flow**

When tracking an event, the system determines the `distinct_id` (who to
attribute the event to) via this waterfall:

1. **Check `linked_principal_id`** in `usage.json`
   - If present → use it (user was previously aliased)
   - This is the fastest path and avoids API calls

2. **Fetch `principal_id` from Arcade Cloud API**
- Makes HTTP request to `/api/v1/auth/validate` with the user's API key
from `~/.arcade/credentials.yaml`
   - If authenticated → returns `principal_id`
   - Has 2s timeout for responsiveness

3. **Fall back to `anon_id`**
   - If not authenticated or API call fails → use anonymous ID
   - Marks event with `is_anon=True` flag

### **The Aliasing Lifecycle**

PostHog aliasing links anonymous activity to authenticated users. Here's
the state machine:

#### **Stage 1: Anonymous User**
```
usage.json: { "anon_id": "abc-123", "linked_principal_id": null }
All events → sent with distinct_id="abc-123" and is_anon=True
```

#### **Stage 2: Login Event**
1. User runs `arcade login`
2. Command completes successfully (auth token saved)
3. `CommandTracker` detects successful login
4. Fetches `principal_id` from API
5. Checks `should_alias()` → returns `True` because
`linked_principal_id` is `null`
6. **Calls `alias()` synchronously** (blocking):
   ```python
   posthog.alias(previous_id="abc-123", distinct_id="zyx-321")
   ```
7. Updates `usage.json`:
   ```json
   { "anon_id": "abc-123", "linked_principal_id": "zyx-321" }
   ```
8. PostHog backend merges all events with `distinct_id="abc-123"` into
the user profile for `"zyx-321"`

#### **Stage 3: Authenticated User**
```
usage.json: { "anon_id": "abc-123", "linked_principal_id": "zyx-321" }
All events → sent with distinct_id="zyx-321" and is_anon=False
```
- Events are directly attributed to the authenticated user
- No more API calls needed (uses cached `linked_principal_id`)

#### **Stage 4: Logout Event**
1. User runs `arcade logout`
2. Logout event is sent with the authenticated `distinct_id`
3. `CommandTracker` detects successful logout
4. **Rotates identity** by calling `reset_to_anonymous()`:
   ```json
   { "anon_id": "xyz-789", "linked_principal_id": null }
   ```
5. New `anon_id` prevents cross-contamination if another user logs in

### **Critical Constraint: Alias Timing**

PostHog requires that `alias()` is called **BEFORE** any events are sent
with the new `distinct_id`. This is why:
- **`alias()` is synchronous (blocking)**: Guarantees it completes
before the login success event is sent
- **Subsequent events use `linked_principal_id`**: Once aliased, all
future events use the authenticated ID
- **Lazy aliasing**: If a user authenticates via another mechanism (not
through `arcade login`), the system detects this on the next command and
performs aliasing before sending that command's event

### **Event Capture Pipeline**

When `CommandTracker.track_command_execution()` is called:

1. **Resolve identity** → determines `distinct_id` and `is_anon` flag
2. **Build event properties**:
   ```python
   {
     "command_name": "toolkit.run",
     "cli_version": "1.2.3",
     "python_version": "3.11.0",
     "os_type": "Darwin",
     "os_release": "23.4.0",
     "duration": 1250.42,  # milliseconds
     "error_message": "..."  # if failed
   }
   ```
3. **Call `UsageService.capture()`**:
   - Serializes event data to JSON
   - Spawns detached subprocess: `python -m arcade_cli.usage`
   - Passes data via `ARCADE_USAGE_EVENT_DATA` env var
   - **Returns immediately** (non-blocking)

4. **Detached subprocess (`__main__.py`)**:
   - Runs independently, survives parent CLI exit
   - Deserializes event data
- If `is_anon=True`, sets `$process_person_profile=False` (tells PostHog
not to create a full profile)
   - Sends event to PostHog with 5s timeout
   - Exits (hard exit after 10s max via timeout thread)

### **Concurrency Handling**

Multiple CLI processes can run simultaneously. The system handles this
via:
- **File locking** on `usage.json` (shared lock for reads, exclusive for
writes)
- **Atomic writes** via temp files ensure incomplete writes never
corrupt the file
- **Idempotent aliasing**: `should_alias()` prevents redundant alias
calls

### **Edge Cases Handled**

1. **Side-channel authentication**: User authenticates outside of
`arcade login` (e.g., manually editing credentials)
   - Detected via "lazy aliasing" check on every command
- Performs alias if `linked_principal_id` doesn't match current
`principal_id`

2. **API failures during identity fetch**: Falls back to anonymous
tracking
   - 2s timeout prevents hanging
   - Silent failure doesn't disrupt CLI

3. **PostHog merge restrictions**: Can't alias returning users who
already have a profile
- System stores `linked_principal_id` to avoid retrying impossible
aliases
   - New users (never logged in before) get full history stitched

4. **Multiple accounts on same machine**: Logout rotates `anon_id`
   - User A's anonymous activity won't leak into User B's profile

### **Privacy & Performance**

- **Opt-out**: `ARCADE_USAGE_TRACKING=0` disables all tracking
- **Non-blocking**: Events never slow down CLI (detached subprocess)
- **Anonymous profiles**: `$process_person_profile=False` for `anon_id`
events minimizes data collection
- **Silent failures**: Network issues or PostHog errors never surface to
users
This commit is contained in:
Eric Gustin 2025-10-03 10:15:08 -07:00 committed by GitHub
parent a11f79b32d
commit 113d0d3086
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 1737 additions and 80 deletions

11
.vscode/launch.json vendored
View file

@ -11,6 +11,17 @@
"jinja": true,
"justMyCode": true,
"cwd": "${workspaceFolder}/toolkits/<your_server_name>"
},
{
"name": "Debug `arcade login`",
"type": "debugpy",
"request": "launch",
"program": "${workspaceFolder}/libs/arcade-cli/run_cli.py",
"args": ["login"],
"console": "integratedTerminal",
"jinja": true,
"justMyCode": true,
"cwd": "${workspaceFolder}"
}
]
}

View file

@ -11,6 +11,9 @@ ARCADE_CONFIG_PATH = os.path.join(os.path.expanduser(os.getenv("ARCADE_WORK_DIR"
# The path to the file containing the user's Arcade-related credentials (e.g., ARCADE_API_KEY).
CREDENTIALS_FILE_PATH = os.path.join(ARCADE_CONFIG_PATH, "credentials.yaml")
# The path to the file containing usage analytics identity data.
USAGE_FILE_PATH = os.path.join(ARCADE_CONFIG_PATH, "usage.json")
_style_block = b"""
<link rel="icon" href="https://cdn.arcade.dev/favicons/favicon.ico" sizes="any">
<link rel="apple-touch-icon" href="https://cdn.arcade.dev/favicons/apple-touch-icon.png">

View file

@ -29,8 +29,8 @@ from arcade_cli.display import (
)
from arcade_cli.show import show_logic
from arcade_cli.toolkit_docs import generate_toolkit_docs
from arcade_cli.usage.command_tracker import TrackedTyper, TrackedTyperGroup
from arcade_cli.utils import (
OrderCommands,
Provider,
compute_base_url,
compute_login_url,
@ -44,8 +44,8 @@ from arcade_cli.utils import (
version_callback,
)
cli = typer.Typer(
cls=OrderCommands,
cli = TrackedTyper(
cls=TrackedTyperGroup,
add_completion=False,
no_args_is_help=True,
pretty_exceptions_enable=True,
@ -271,16 +271,11 @@ def mcp(
handle_cli_error("Failed to run MCP server")
except KeyboardInterrupt:
console.print("\n[yellow]MCP server stopped[/yellow]")
raise typer.Exit(0)
console.print("\n[yellow]MCP server gracefully shutdown[/yellow]")
except FileNotFoundError:
console.print(
"[red]arcade_mcp_server module not found. Make sure arcade-mcp-server is installed.[/red]"
handle_cli_error(
"arcade_mcp_server module not found. Make sure arcade-mcp-server is installed"
)
raise typer.Exit(1)
except Exception as e:
console.print(f"[red]Error running MCP server: {e}[/red]")
raise typer.Exit(1)
@cli.command(
@ -770,7 +765,10 @@ def docs(
"Please choose one of the 'gpt-4o' and 'gpt-5' series models.",
style="bold red",
)
raise typer.Exit()
handle_cli_error(
f"Attention: '{openai_model}' is not a valid OpenAI model. "
"Please choose one of the 'gpt-4o' and 'gpt-5' series models."
)
try:
success = generate_toolkit_docs(
@ -831,6 +829,4 @@ def main_callback(
return
if not check_existing_login(suppress_message=True):
console.print("Not logged in to Arcade CLI. Use ", style="bold red", end="")
console.print("arcade login", style="bold green")
raise typer.Exit()
handle_cli_error("Not logged in to Arcade CLI. Use `arcade login` to log in.")

View file

@ -243,15 +243,15 @@ def create_new_toolkit_minimal(output_directory: str, toolkit_name: str) -> None
# Check for illegal characters in the toolkit name
if re.match(r"^[a-z0-9_]+$", toolkit_name):
if (toolkit_directory / toolkit_name).exists():
console.print(f"[red]Toolkit '{toolkit_name}' already exists.[/red]")
exit(1)
raise FileExistsError(
f"Server with name '{toolkit_name}' already exists at '{toolkit_directory / toolkit_name}'"
)
else:
console.print(
"[red]Toolkit name contains illegal characters. "
raise ValueError(
f"Server name '{toolkit_name}' contains illegal characters. "
"Only lowercase alphanumeric characters and underscores are allowed. "
"Please try again.[/red]"
"Please try again."
)
exit(1)
context = {
"toolkit_name": toolkit_name,

View file

@ -6,8 +6,8 @@ from rich.table import Table
from arcade_cli.constants import (
PROD_ENGINE_HOST,
)
from arcade_cli.usage.command_tracker import TrackedTyper, TrackedTyperGroup
from arcade_cli.utils import (
OrderCommands,
compute_base_url,
validate_and_get_config,
)
@ -15,8 +15,8 @@ from arcade_cli.utils import (
console = Console()
app = typer.Typer(
cls=OrderCommands,
app = TrackedTyper(
cls=TrackedTyperGroup,
add_completion=False,
no_args_is_help=True,
pretty_exceptions_enable=False,

View file

@ -1,14 +1,14 @@
from typing import Optional
import typer
from rich.console import Console
from rich.markup import escape
from arcade_cli.display import display_tool_details, display_tools_table
from arcade_cli.utils import (
CLIError,
create_cli_catalog,
create_cli_catalog_local,
get_tools_from_engine,
handle_cli_error,
)
@ -26,7 +26,6 @@ def show_logic(
"""Wrapper function for the `arcade show` CLI command
Handles the logic for showing tools/toolkits.
"""
console = Console()
try:
if local:
catalog = create_cli_catalog() if toolkit else create_cli_catalog_local()
@ -46,16 +45,14 @@ def show_logic(
None,
)
if not tool_def:
console.print(f"❌ Tool '{tool}' not found.", style="bold red")
typer.Exit(code=1)
handle_cli_error(f"Tool '{tool}' not found.")
else:
display_tool_details(tool_def, worker=worker)
else:
# Display the list of tools as a table
display_tools_table(tools)
except CLIError:
raise
except Exception as e:
if debug:
raise
error_message = f"❌ Failed to list tools: {escape(str(e))}"
console.print(error_message, style="bold red")
handle_cli_error(f"Failed to list tools: {escape(str(e))}", debug=debug)

View file

@ -0,0 +1,62 @@
"""Entry point for detached usage tracking subprocess.
This module is invoked as `python -m arcade_cli.usage` and expects
event data to be passed via the ARCADE_USAGE_EVENT_DATA environment variable.
"""
import json
import os
import threading
from arcade_cli.usage.constants import (
ARCADE_USAGE_EVENT_DATA,
MAX_RETRIES_POSTHOG,
PROP_PROCESS_PERSON_PROFILE,
TIMEOUT_POSTHOG_CAPTURE,
TIMEOUT_SUBPROCESS_EXIT,
)
from posthog import Posthog
def _timeout_exit() -> None:
"""Force exit after timeout"""
os._exit(1)
def main() -> None:
"""Capture a PostHog event from environment variable."""
timeout_timer = threading.Timer(TIMEOUT_SUBPROCESS_EXIT, _timeout_exit)
timeout_timer.daemon = True
timeout_timer.start()
try:
event_data = json.loads(os.environ[ARCADE_USAGE_EVENT_DATA])
if event_data.get("is_anon", False):
event_data["properties"][PROP_PROCESS_PERSON_PROFILE] = False
posthog = Posthog(
project_api_key=event_data["api_key"],
host=event_data["host"],
timeout=TIMEOUT_POSTHOG_CAPTURE,
max_retries=MAX_RETRIES_POSTHOG,
)
posthog.capture(
event_data["event_name"],
distinct_id=event_data["distinct_id"],
properties=event_data["properties"],
)
posthog.flush()
timeout_timer.cancel()
except Exception:
# Silent failure. We don't want to disrupt anything
timeout_timer.cancel()
pass
if __name__ == "__main__":
main()

View file

@ -0,0 +1,352 @@
import functools
import os
import platform
import sys
import time
from importlib import metadata
from typing import Any
import typer
from arcade_cli.constants import ARCADE_CONFIG_PATH
from arcade_cli.usage.constants import (
EVENT_CLI_COMMAND_EXECUTED,
EVENT_CLI_COMMAND_FAILED,
PROP_CLI_VERSION,
PROP_COMMAND_NAME,
PROP_DEVICE_MONOTONIC_END,
PROP_DEVICE_MONOTONIC_START,
PROP_DURATION_MS,
PROP_ERROR_MESSAGE,
PROP_OS_RELEASE,
PROP_OS_TYPE,
PROP_RUNTIME_LANGUAGE,
PROP_RUNTIME_VERSION,
)
from arcade_cli.usage.identity import UsageIdentity
from arcade_cli.usage.usage_service import UsageService
from arcade_cli.usage.utils import is_tracking_enabled
from rich.console import Console
from typer.core import TyperCommand, TyperGroup
from typer.models import Context
console = Console()
class CommandTracker:
"""Tracks CLI command execution for usage analytics."""
def __init__(self) -> None:
self.usage_service = UsageService()
self.identity = UsageIdentity()
self._cli_version: str | None = None
self._runtime_version: str | None = None
@property
def cli_version(self) -> str:
"""Get CLI version, cached after first access."""
if self._cli_version is None:
try:
self._cli_version = metadata.version("arcade-mcp")
except Exception:
self._cli_version = "unknown"
return self._cli_version
@property
def runtime_language(self) -> str:
"""Get the runtime language (always 'python' for this CLI)."""
return "python"
@property
def runtime_version(self) -> str:
"""Get runtime version, cached after first access."""
if self._runtime_version is None:
version_info = sys.version_info
self._runtime_version = (
f"{version_info.major}.{version_info.minor}.{version_info.micro}"
)
return self._runtime_version
@property
def user_id(self) -> str:
"""Get distinct_id based on authentication state."""
return self.identity.get_distinct_id()
def get_full_command_path(self, ctx: typer.Context) -> str:
"""Get the full command path by traversing the context hierarchy."""
command_parts = []
current_ctx: Any = ctx
while current_ctx and current_ctx.parent:
if current_ctx.command.name:
command_parts.append(current_ctx.command.name)
current_ctx = current_ctx.parent
return ".".join(reversed(command_parts))
def _handle_successful_login(self) -> None:
"""Handle a successful login event.
Upon a successful login, we retrieve and persist the principal_id for the logged in user.
We then alias the persisted anon_id to the known person with principal_id.
As a result, the previous anonymous events will be attributed to the known person with principal_id.
"""
principal_id = self.identity.get_principal_id()
if principal_id:
if self.identity.should_alias():
# Alias the anon_id to the known person with principal_id
self.usage_service.alias(
previous_id=self.identity.anon_id, distinct_id=principal_id
)
# Always update the linked principal_id on successful login
self.identity.set_linked_principal_id(principal_id)
def _handle_successful_logout(
self,
command_name: str,
duration_ms: float | None = None,
monotonic_start: float | None = None,
monotonic_end: float | None = None,
) -> None:
"""Handle a successful logout event.
Upon a successful logout, we rotate the anon_id and clear the linked principal_id.
"""
# Check if user was authenticated before logout (has linked_principal_id)
data = self.identity.load_or_create()
was_authenticated = data.get("linked_principal_id") is not None
# Send logout event as the authenticated user before resetting to anonymous
properties: dict[str, Any] = {
PROP_COMMAND_NAME: command_name,
PROP_CLI_VERSION: self.cli_version,
PROP_RUNTIME_LANGUAGE: self.runtime_language,
PROP_RUNTIME_VERSION: self.runtime_version,
PROP_OS_TYPE: platform.system(),
PROP_OS_RELEASE: platform.release(),
}
if duration_ms:
properties[PROP_DURATION_MS] = round(duration_ms)
if monotonic_start is not None:
properties[PROP_DEVICE_MONOTONIC_START] = monotonic_start
if monotonic_end is not None:
properties[PROP_DEVICE_MONOTONIC_END] = monotonic_end
# Check if using anon_id
is_anon = self.user_id == self.identity.anon_id
self.usage_service.capture(
EVENT_CLI_COMMAND_EXECUTED, self.user_id, properties=properties, is_anon=is_anon
)
# Only rotate anon_id if user was actually authenticated
if was_authenticated:
self.identity.reset_to_anonymous()
def track_command_execution(
self,
command_name: str,
success: bool,
duration_ms: float | None = None,
error_message: str | None = None,
is_login: bool = False,
is_logout: bool = False,
monotonic_start: float | None = None,
monotonic_end: float | None = None,
) -> None:
"""Track command execution event.
Args:
command_name: The name of the CLI command that was executed.
success: Whether the command was successfully executed.
duration_ms: The duration of the command execution in milliseconds.
error_message: The error message if the command failed.
is_login: Whether this is a login command.
is_logout: Whether this is a logout command.
monotonic_start: Monotonic clock timestamp at command start.
monotonic_end: Monotonic clock timestamp at command end.
"""
if not is_tracking_enabled():
return
if is_login and success:
self._handle_successful_login()
elif is_logout and success:
self._handle_successful_logout(
command_name, duration_ms, monotonic_start, monotonic_end
)
return
# Edge case: Lazy alias check for other commands (if user authenticated via side path)
elif not is_login and not is_logout and self.identity.should_alias():
principal_id = self.identity.get_principal_id()
if principal_id:
self.usage_service.alias(
previous_id=self.identity.anon_id, distinct_id=principal_id
)
self.identity.set_linked_principal_id(principal_id)
event_name = EVENT_CLI_COMMAND_EXECUTED if success else EVENT_CLI_COMMAND_FAILED
properties: dict[str, Any] = {
PROP_COMMAND_NAME: command_name,
PROP_CLI_VERSION: self.cli_version,
PROP_RUNTIME_LANGUAGE: self.runtime_language,
PROP_RUNTIME_VERSION: self.runtime_version,
PROP_OS_TYPE: platform.system(),
PROP_OS_RELEASE: platform.release(),
}
if not success and error_message:
properties[PROP_ERROR_MESSAGE] = error_message
if duration_ms:
properties[PROP_DURATION_MS] = round(duration_ms)
if monotonic_start is not None:
properties[PROP_DEVICE_MONOTONIC_START] = monotonic_start
if monotonic_end is not None:
properties[PROP_DEVICE_MONOTONIC_END] = monotonic_end
# Check if using anon_id (not authenticated)
is_anon = self.user_id == self.identity.anon_id
self.usage_service.capture(event_name, self.user_id, properties=properties, is_anon=is_anon)
# Global tracker instance
command_tracker = CommandTracker()
class TrackedTyperCommand(TyperCommand):
"""Custom TyperCommand that tracks individual command execution."""
def invoke(self, ctx: Any) -> Any:
"""Override invoke to track command execution."""
if not os.path.exists(ARCADE_CONFIG_PATH):
console.print(
"[yellow]Arcade collects CLI usage data to help us debug and improve the service. "
"By continuing to use the Arcade CLI, you agree to the terms of our Privacy Policy. "
"To opt out, set the ARCADE_USAGE_TRACKING environment variable to 0.[/yellow]"
)
command_name = ctx.command.name
is_login = command_name == "login"
is_logout = command_name == "logout"
try:
start_time = time.time()
start_monotonic = time.monotonic()
result = super().invoke(ctx)
end_time = time.time()
end_monotonic = time.monotonic()
duration = end_time - start_time
command_tracker.track_command_execution(
command_tracker.get_full_command_path(ctx),
success=True,
duration_ms=duration * 1000,
is_login=is_login,
is_logout=is_logout,
monotonic_start=start_monotonic,
monotonic_end=end_monotonic,
)
except Exception as e:
end_time = time.time()
end_monotonic = time.monotonic()
duration = end_time - start_time
from arcade_cli.utils import CLIError
error_msg = str(e)[:300]
command_tracker.track_command_execution(
command_tracker.get_full_command_path(ctx),
success=False,
duration_ms=duration * 1000,
error_message=error_msg,
is_login=is_login,
is_logout=is_logout,
monotonic_start=start_monotonic,
monotonic_end=end_monotonic,
)
if isinstance(e, CLIError):
raise typer.Exit(code=1)
else:
raise
else:
return result
class TrackedTyperGroup(TyperGroup):
"""Custom TyperGroup that creates tracked commands."""
def command(self, *args: Any, **kwargs: Any) -> Any:
"""Override command decorator to use TrackedTyperCommand."""
# Set the custom command class
kwargs["cls"] = TrackedTyperCommand
result: Any = super().command(*args, **kwargs)
return result
def list_commands(self, ctx: Context) -> list[str]: # type: ignore[override]
"""Return list of commands in the order appear."""
return list(self.commands)
class TrackedTyper(typer.Typer):
"""Custom Typer that creates tracked commands."""
def command(
self, name: str | None = None, *, cls: type[TyperCommand] | None = None, **kwargs: Any
) -> Any:
"""Override command decorator to use TrackedTyperCommand."""
if cls is None:
cls = TrackedTyperCommand
result: Any = super().command(name, cls=cls, **kwargs)
return result
def callback(self, name: str | None = None, **kwargs: Any) -> Any:
"""Override callback decorator to track callback execution."""
original_callback_decorator: Any = super().callback(name, **kwargs)
def decorator(func: Any) -> Any:
@functools.wraps(func)
def tracked_callback(*args: Any, **cb_kwargs: Any) -> Any:
"""Wrapper that tracks callback execution."""
# Get the context from kwargs (Typer passes it)
ctx = cb_kwargs.get("ctx") or (
args[0] if args and isinstance(args[0], typer.Context) else None
)
command_name = ctx.invoked_subcommand if ctx and ctx.invoked_subcommand else "root"
start_time = time.time()
start_monotonic = time.monotonic()
try:
result = func(*args, **cb_kwargs)
except Exception as e:
# Track callback failure (auth failures, version checks, etc.)
end_time = time.time()
end_monotonic = time.monotonic()
duration = (end_time - start_time) * 1000
from arcade_cli.utils import CLIError
command_tracker.track_command_execution(
command_name,
success=False,
duration_ms=duration,
error_message=str(e)[:300],
monotonic_start=start_monotonic,
monotonic_end=end_monotonic,
)
if isinstance(e, CLIError):
raise typer.Exit(code=1)
else:
raise
else:
return result
result: Any = original_callback_decorator(tracked_callback)
return result
return decorator

View file

@ -0,0 +1,41 @@
"""Constants for usage tracking and analytics."""
# Event Names
EVENT_CLI_COMMAND_EXECUTED = "CLI execution succeeded"
EVENT_CLI_COMMAND_FAILED = "CLI execution failed"
# Property Names
PROP_COMMAND_NAME = "command_name"
PROP_CLI_VERSION = "cli_version"
PROP_RUNTIME_LANGUAGE = "runtime_language"
PROP_RUNTIME_VERSION = "runtime_version"
PROP_OS_TYPE = "os_type"
PROP_OS_RELEASE = "os_release"
PROP_DURATION_MS = "duration_ms"
PROP_ERROR_MESSAGE = "error_message"
PROP_DEVICE_MONOTONIC_START = "device_start_timestamp"
PROP_DEVICE_MONOTONIC_END = "device_end_timestamp"
# Only used for anonymous usage
PROP_PROCESS_PERSON_PROFILE = "$process_person_profile"
# Identity Keys
KEY_ANON_ID = "anon_id"
KEY_LINKED_PRINCIPAL_ID = "linked_principal_id"
# File Names
USAGE_FILE_NAME = "usage.json"
# Environment Variables
# how props are passed to the usage tracking subprocess
ARCADE_USAGE_EVENT_DATA = "ARCADE_USAGE_EVENT_DATA"
# whether usage tracking is enabled. 1 is enabled, 0 is disabled.
ARCADE_USAGE_TRACKING = "ARCADE_USAGE_TRACKING"
# Timeouts and Limits (in seconds)
TIMEOUT_POSTHOG_ALIAS = 2
TIMEOUT_POSTHOG_CAPTURE = 5
TIMEOUT_ARCADE_API = 2.0
TIMEOUT_SUBPROCESS_EXIT = 10.0
# Retry Configuration
MAX_RETRIES_POSTHOG = 1

View file

@ -0,0 +1,207 @@
"""
Identity management for PostHog analytics tracking.
Handles anonymous/authenticated identity tracking with PostHog aliasing,
supporting pre-login anonymous tracking, post-login identity stitching,
and logout identity rotation.
"""
import fcntl
import json
import os
import tempfile
import uuid
from typing import Any
import httpx
import yaml
from arcade_cli.constants import ARCADE_CONFIG_PATH, CREDENTIALS_FILE_PATH
from arcade_cli.usage.constants import (
KEY_ANON_ID,
KEY_LINKED_PRINCIPAL_ID,
TIMEOUT_ARCADE_API,
USAGE_FILE_NAME,
)
class UsageIdentity:
"""Manages user identity for PostHog analytics tracking."""
def __init__(self) -> None:
self.usage_file_path = os.path.join(ARCADE_CONFIG_PATH, USAGE_FILE_NAME)
self._data: dict[str, Any] | None = None
def load_or_create(self) -> dict[str, Any]:
"""Load or create usage.json file with atomic writes and file locking.
Returns:
dict: The usage data containing anon_id and optionally linked_email
"""
if self._data is not None:
return self._data
os.makedirs(ARCADE_CONFIG_PATH, exist_ok=True)
if os.path.exists(self.usage_file_path):
try:
with open(self.usage_file_path) as f:
# lock file
if os.name != "nt": # Unix-like systems
fcntl.flock(f.fileno(), fcntl.LOCK_SH)
try:
data = json.load(f)
if isinstance(data, dict) and KEY_ANON_ID in data:
self._data = data
return self._data
finally:
# unlock file
if os.name != "nt":
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
except Exception: # noqa: S110
pass
new_data = {KEY_ANON_ID: str(uuid.uuid4()), KEY_LINKED_PRINCIPAL_ID: None}
self._write_atomic(new_data)
self._data = new_data
return self._data
def _write_atomic(self, data: dict[str, Any]) -> None:
"""Write data atomically to usage.json file
Args:
data: The data to write to the usage file
"""
# Create temp file in same directory for atomic rename
temp_fd, temp_path = tempfile.mkstemp(
dir=ARCADE_CONFIG_PATH, prefix=".usage_", suffix=".tmp"
)
try:
with os.fdopen(temp_fd, "w") as f:
# lock file
if os.name != "nt": # Unix-like systems
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
try:
json.dump(data, f, indent=2)
f.flush()
os.fsync(f.fileno()) # ensure data is written to disk
finally:
if os.name != "nt":
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
os.rename(temp_path, self.usage_file_path)
except Exception:
# clean up
import contextlib
with contextlib.suppress(OSError):
os.unlink(temp_path)
raise
def get_distinct_id(self) -> str:
"""Get distinct_id based on authentication state.
We use principal_id for authenticated users and anon_id for anonymous users.
Returns:
str: Principal ID if authenticated, otherwise anon_id
"""
data = self.load_or_create()
# Check if we have a persisted principal_id first
linked_principal_id = data.get(KEY_LINKED_PRINCIPAL_ID)
if linked_principal_id:
return str(linked_principal_id)
# Try to fetch principal_id from API if not persisted
principal_id = self.get_principal_id()
if principal_id:
return principal_id
# Fall back to anon_id if not authenticated
return str(data[KEY_ANON_ID])
def get_principal_id(self) -> str | None:
"""Fetch principal_id from Arcade Cloud API.
Returns:
str | None: Principal ID if authenticated and API call succeeds, None otherwise
"""
if not os.path.exists(CREDENTIALS_FILE_PATH):
return None
try:
with open(CREDENTIALS_FILE_PATH) as f:
config = yaml.safe_load(f)
cloud_config = config.get("cloud", {})
api_key = cloud_config.get("api", {}).get("key")
if not api_key:
return None
response = httpx.get(
"https://cloud.arcade.dev/api/v1/auth/validate",
headers={"accept": "application/json", "Authorization": f"Bearer {api_key}"},
timeout=TIMEOUT_ARCADE_API,
)
if response.status_code == 200:
data = response.json()
principal_id = data.get("data", {}).get("principal_id")
return str(principal_id) if principal_id else None
except Exception: # noqa: S110
# Silent failure - don't disrupt CLI
pass
return None
def should_alias(self) -> bool:
"""Check if PostHog alias is needed.
Alias is needed when the user is authenticated,
but the retrieved principal_id doesn't match the persisted linked_principal_id
Returns:
bool: True if user is authenticated but not yet aliased
"""
data = self.load_or_create()
principal_id = self.get_principal_id()
return principal_id is not None and principal_id != data.get(KEY_LINKED_PRINCIPAL_ID)
def reset_to_anonymous(self) -> None:
"""Generate new anonymous ID and clear linked principal_id.
Used after logout to prevent cross-contamination between multiple
accounts on the same machine
"""
# Create fresh data with only anon_id
new_data = {KEY_ANON_ID: str(uuid.uuid4()), KEY_LINKED_PRINCIPAL_ID: None}
self._write_atomic(new_data)
self._data = new_data
def set_linked_principal_id(self, principal_id: str) -> None:
"""Update linked_principal_id in usage.json.
Args:
principal_id: The principal_id to link to the current anon_id
"""
data = self.load_or_create()
data[KEY_LINKED_PRINCIPAL_ID] = principal_id
self._write_atomic(data)
self._data = data
@property
def anon_id(self) -> str:
"""Get the current anonymous ID.
Returns:
str: The anonymous ID
"""
data = self.load_or_create()
return str(data[KEY_ANON_ID])

View file

@ -0,0 +1,104 @@
import json
import os
import subprocess
import sys
from arcade_cli.usage.constants import (
ARCADE_USAGE_EVENT_DATA,
MAX_RETRIES_POSTHOG,
TIMEOUT_POSTHOG_ALIAS,
)
from arcade_cli.usage.utils import is_tracking_enabled
class UsageService:
def __init__(self) -> None:
self.api_key = "phc_hIqUQyJpf2TP4COePO5jEpkGeUXipa7KqTEyDeRsTmB"
self.host = "https://us.i.posthog.com"
def alias(self, previous_id: str, distinct_id: str) -> None:
"""Perform PostHog alias synchronously (blocking).
Must be called BEFORE the first event with the new distinct_id.
This is done synchronously to guarantee ordering.
Args:
previous_id: The previous distinct_id (usually anon_id)
distinct_id: The new distinct_id (usually email)
"""
if not is_tracking_enabled():
return
try:
from posthog import Posthog
posthog = Posthog(
project_api_key=self.api_key,
host=self.host,
timeout=TIMEOUT_POSTHOG_ALIAS,
max_retries=MAX_RETRIES_POSTHOG,
)
posthog.alias(previous_id=previous_id, distinct_id=distinct_id)
posthog.flush()
except Exception: # noqa: S110
# Silent failure - don't disrupt CLI
pass
def capture(
self, event_name: str, distinct_id: str, properties: dict, is_anon: bool = False
) -> None:
"""Capture event in a detached subprocess that is non-blocking.
Spawns a completely independent subprocess that continues running
even after the parent CLI process exits. Works cross-platform.
Args:
event_name: Name of the event to capture
distinct_id: The distinct_id for the user
properties: Event properties
is_anon: Whether this is an anonymous user (sets $process_person_profile to false)
"""
if not is_tracking_enabled():
return
event_data = json.dumps({
"event_name": event_name,
"properties": properties,
"distinct_id": distinct_id,
"api_key": self.api_key,
"host": self.host,
"is_anon": is_anon,
})
cmd = [sys.executable, "-m", "arcade_cli.usage"]
# Pass data via environment variable (works on all platforms)
env = os.environ.copy()
env[ARCADE_USAGE_EVENT_DATA] = event_data
if sys.platform == "win32":
# Windows: Use DETACHED_PROCESS to fully detach from parent console
DETACHED_PROCESS = 0x00000008
CREATE_NEW_PROCESS_GROUP = 0x00000200
subprocess.Popen(
cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
creationflags=DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP,
close_fds=True,
env=env,
)
else:
# Unix: Use start_new_session to detach from terminal
subprocess.Popen(
cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
close_fds=True,
env=env,
)

View file

@ -0,0 +1,13 @@
import os
from arcade_cli.usage.constants import ARCADE_USAGE_TRACKING
def is_tracking_enabled() -> bool:
"""Check if usage tracking is enabled via environment variable.
Returns:
bool: True if tracking is enabled (default), False if explicitly disabled.
"""
value = os.environ.get(ARCADE_USAGE_TRACKING, "1")
return value.lower() not in ("false", "0", "no", "off")

View file

@ -15,7 +15,6 @@ from typing import Any, Callable, Union, cast
from urllib.parse import urlencode, urlparse
import idna
import typer
from arcade_core import ToolCatalog, Toolkit
from arcade_core.config_model import Config
from arcade_core.discovery import (
@ -80,6 +79,23 @@ class Provider(str, Enum):
OPENAI = "openai"
class CLIError(Exception):
"""Custom exception for CLI errors that preserves error messages for tracking.
Never use this exception directly. Use handle_cli_error utility function instead.
"""
def __init__(self, message: str, original_error: Exception | None = None) -> None:
self.message = message
self.original_error = original_error
super().__init__(message)
def __str__(self) -> str:
if self.original_error:
return f"{self.message}: {self.original_error!s}"
return self.message
def handle_cli_error(
message: str,
error: Exception | None = None,
@ -95,7 +111,7 @@ def handle_cli_error(
console.print(f"{message}", style="bold red")
if should_exit:
raise typer.Exit(code=1)
raise CLIError(message, error)
def create_cli_catalog(
@ -114,14 +130,12 @@ def create_cli_catalog(
try: # try without prefix
toolkits = [Toolkit.from_package(toolkit)]
except ToolkitLoadError as e:
console.print(f"{e}", style="bold red")
typer.Exit(code=1)
handle_cli_error(f"{e}")
else:
toolkits = Toolkit.find_all_arcade_toolkits()
if not toolkits:
console.print("❌ No toolkits found or specified", style="bold red")
typer.Exit(code=1)
handle_cli_error("No toolkits found or specified")
catalog = ToolCatalog()
for loaded_toolkit in toolkits:
@ -417,18 +431,12 @@ def validate_and_get_config(
handle_cli_error("Not logged in", e, debug=False)
if validate_api and (not config.api or not config.api.key):
console.print(
"❌ API configuration not found or key is missing. Please run `arcade login`.",
style="bold red",
handle_cli_error(
"API configuration not found or key is missing. Please run `arcade login`."
)
raise typer.Exit(code=1)
if validate_user and (not config.user or not config.user.email):
console.print(
"❌ User email not found in configuration. Please run `arcade login`.",
style="bold red",
)
raise typer.Exit(code=1)
handle_cli_error("User email not found in configuration. Please run `arcade login`.")
return config
@ -966,12 +974,7 @@ def require_dependency(
try:
importlib.import_module(package_name.replace("-", "_"))
except ImportError:
console.print(
f"The '{package_name}' package is required to run the '{command_name}' command but is not installed.",
style="bold red",
handle_cli_error(
f"The '{package_name}' package is required to run the '{command_name}' command but is not installed. "
f"To install it, run the following command: {install_command}"
)
console.print(
f"To install it, run the following command:\n* [green]{install_command}[/green]",
style="bold",
)
raise typer.Exit(code=1)

View file

@ -8,17 +8,19 @@ from arcade_cli.constants import (
PROD_CLOUD_HOST,
PROD_ENGINE_HOST,
)
from arcade_cli.usage.command_tracker import TrackedTyper, TrackedTyperGroup
from arcade_cli.utils import (
OrderCommands,
CLIError,
compute_base_url,
handle_cli_error,
validate_and_get_config,
)
console = Console()
app = typer.Typer(
cls=OrderCommands,
app = TrackedTyper(
cls=TrackedTyperGroup,
add_completion=False,
no_args_is_help=True,
pretty_exceptions_enable=False,
@ -107,7 +109,7 @@ def list_workers(
response.raise_for_status()
deployments = response.json()["data"]["workers"]
except Exception as e:
console.log(f"Failed to get cloud deployments: {e}")
handle_cli_error(f"Failed to list deployed servers: {e}")
print_worker_table(client, deployments)
@ -182,8 +184,7 @@ def enable_worker(
try:
arcade.workers.update(worker_id, enabled=True)
except Exception as e:
console.print(f"Error enabling worker: {e}", style="bold red")
raise typer.Exit(code=1)
handle_cli_error(f"Failed to enable worker '{worker_id}': {e}")
@app.command("disable", help="Disable a worker")
@ -196,8 +197,7 @@ def disable_worker(
try:
arcade.workers.update(worker_id, enabled=False)
except Exception as e:
console.print(f"Error disabling worker: {e}", style="bold red")
raise typer.Exit(code=1)
handle_cli_error(f"Failed to disable worker '{worker_id}': {e}")
@app.command("rm", help="Remove a worker")
@ -251,17 +251,13 @@ def rm_worker(
response.raise_for_status()
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
console.print(
"Deployment not found. To deregister the worker from the engine, use the --deregister flag.",
style="bold red",
handle_cli_error(
"Deployment not found. To deregister the worker from the engine, use the --deregister flag."
)
raise typer.Exit(code=1)
else:
console.print(f"Error deleting deployment: {e}", style="bold red")
raise typer.Exit(code=1)
handle_cli_error(f"Error deleting deployment: {e}")
except Exception as e:
console.print(f"Error deleting deployment: {e}", style="bold red")
raise typer.Exit(code=1)
handle_cli_error(f"Error deleting deployment: {e}")
# Then try to delete from the engine
try:
@ -270,8 +266,7 @@ def rm_worker(
except NotFoundError:
console.print("Worker not found", style="bold red")
except Exception as e:
console.print(f"Error deleting worker from engine: {e}", style="bold red")
raise typer.Exit(code=1)
handle_cli_error(f"Error deleting worker from engine: {e}")
@app.command("logs", help="Get logs for a worker")
@ -318,14 +313,15 @@ def worker_logs(
if not line or "[DONE]" in line: # Skip empty lines
continue
if "event: error" in line:
console.print("Could not stream logs", style="bold red")
handle_cli_error("Could not stream logs")
if line.startswith("data:"):
# Extract just the data portion after 'data:'
data = line[5:].strip() # Remove 'data:' prefix and whitespace
console.print(data, markup=False)
except CLIError:
raise
except Exception as e:
console.print(f"Error connecting to log stream: {e}", style="bold red")
raise typer.Exit(code=1)
handle_cli_error(f"Error connecting to log stream: {e}")
def get_toolkits(client: Arcade, worker_id: str | None) -> str:
@ -347,5 +343,5 @@ def get_toolkits(client: Arcade, worker_id: str | None) -> str:
except NotFoundError:
return ""
except Exception as e:
console.print(f"Error getting worker tools: {e}", style="bold red")
raise typer.Exit(code=1)
handle_cli_error(f"Error getting server tools: {e}")
return ""

View file

@ -0,0 +1,48 @@
import pytest
from arcade_cli.usage.utils import is_tracking_enabled
@pytest.mark.parametrize(
"env_value, expected",
[
pytest.param("1", True, id="enabled_with_1"),
pytest.param("true", True, id="enabled_with_true_lowercase"),
pytest.param("TRUE", True, id="enabled_with_true_uppercase"),
pytest.param("True", True, id="enabled_with_true_titlecase"),
pytest.param("yes", True, id="enabled_with_yes_lowercase"),
pytest.param("YES", True, id="enabled_with_yes_uppercase"),
pytest.param("Yes", True, id="enabled_with_yes_titlecase"),
pytest.param("on", True, id="enabled_with_on_lowercase"),
pytest.param("ON", True, id="enabled_with_on_uppercase"),
pytest.param("On", True, id="enabled_with_on_titlecase"),
pytest.param("anything_else", True, id="enabled_with_random_string"),
pytest.param("0", False, id="disabled_with_0"),
pytest.param("false", False, id="disabled_with_false_lowercase"),
pytest.param("FALSE", False, id="disabled_with_false_uppercase"),
pytest.param("False", False, id="disabled_with_false_titlecase"),
pytest.param("no", False, id="disabled_with_no_lowercase"),
pytest.param("NO", False, id="disabled_with_no_uppercase"),
pytest.param("No", False, id="disabled_with_no_titlecase"),
pytest.param("off", False, id="disabled_with_off_lowercase"),
pytest.param("OFF", False, id="disabled_with_off_uppercase"),
pytest.param("Off", False, id="disabled_with_off_titlecase"),
],
)
def test_is_tracking_enabled_with_env_var(
monkeypatch: pytest.MonkeyPatch, env_value: str, expected: bool
) -> None:
"""Test is_tracking_enabled() with various environment variable values."""
monkeypatch.setenv("ARCADE_USAGE_TRACKING", env_value)
assert is_tracking_enabled() == expected
def test_is_tracking_enabled_default_when_not_set(monkeypatch: pytest.MonkeyPatch) -> None:
"""Test is_tracking_enabled() returns True when environment variable is not set."""
monkeypatch.delenv("ARCADE_USAGE_TRACKING", raising=False)
assert is_tracking_enabled() is True
def test_is_tracking_enabled_default_when_empty_string(monkeypatch: pytest.MonkeyPatch) -> None:
"""Test is_tracking_enabled() returns True when environment variable is empty string."""
monkeypatch.setenv("ARCADE_USAGE_TRACKING", "")
assert is_tracking_enabled() is True

View file

@ -0,0 +1,440 @@
import sys
from unittest.mock import MagicMock, patch
import pytest
from arcade_cli.usage.command_tracker import CommandTracker
class MockCommand:
"""Mock Typer command for testing."""
def __init__(self, name: str) -> None:
self.name = name
class MockContext:
"""Mock Typer context for testing."""
def __init__(self, name: str, parent: "MockContext | None" = None) -> None:
self.command = MockCommand(name)
self.parent = parent
@pytest.fixture
def mock_dependencies() -> dict:
"""Mock all external dependencies for CommandTracker."""
with (
patch("arcade_cli.usage.command_tracker.UsageService") as mock_service_cls,
patch("arcade_cli.usage.command_tracker.UsageIdentity") as mock_identity_cls,
patch("arcade_cli.usage.command_tracker.is_tracking_enabled") as mock_tracking,
):
# Setup mock instances
mock_service = MagicMock()
mock_service_cls.return_value = mock_service
mock_identity = MagicMock()
mock_identity_cls.return_value = mock_identity
mock_identity.anon_id = "anon-123"
mock_tracking.return_value = True
yield {
"service_cls": mock_service_cls,
"service": mock_service,
"identity_cls": mock_identity_cls,
"identity": mock_identity,
"tracking": mock_tracking,
}
class TestCliVersion:
"""Tests for cli_version property."""
@patch("arcade_cli.usage.command_tracker.metadata.version")
def test_returns_version_on_success(self, mock_version: MagicMock) -> None:
"""Test that cli_version returns the version from metadata."""
mock_version.return_value = "1.2.3"
tracker = CommandTracker()
assert tracker.cli_version == "1.2.3"
@patch("arcade_cli.usage.command_tracker.metadata.version")
def test_returns_unknown_on_exception(self, mock_version: MagicMock) -> None:
"""Test that cli_version returns 'unknown' when metadata fails."""
mock_version.side_effect = Exception("Package not found")
tracker = CommandTracker()
assert tracker.cli_version == "unknown"
class TestRuntimeLanguage:
"""Tests for runtime_language property."""
def test_returns_python(self) -> None:
"""Test that runtime_language returns 'python'."""
tracker = CommandTracker()
assert tracker.runtime_language == "python"
class TestRuntimeVersion:
"""Tests for runtime_version property."""
def test_matches_sys_version_info(self) -> None:
"""Test that runtime_version matches sys.version_info."""
tracker = CommandTracker()
version = tracker.runtime_version
expected = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
assert version == expected
def test_caches_version(self) -> None:
"""Test that runtime_version caches the version after first access."""
tracker = CommandTracker()
first_access = tracker.runtime_version
second_access = tracker.runtime_version
# Should return the same object b/c it's cached
assert first_access is second_access
class TestUserId:
"""Tests for user_id property."""
def test_delegates_to_identity(self, mock_dependencies: dict) -> None:
"""Test that user_id delegates to identity.get_distinct_id()."""
mock_dependencies["identity"].get_distinct_id.return_value = "user-456"
tracker = CommandTracker()
assert tracker.user_id == "user-456"
mock_dependencies["identity"].get_distinct_id.assert_called_once()
class TestGetFullCommandPath:
"""Tests for get_full_command_path() method."""
def test_single_command(self) -> None:
"""Test get_full_command_path with a single command."""
ctx = MockContext("login", parent=MockContext("root"))
tracker = CommandTracker()
path = tracker.get_full_command_path(ctx)
assert path == "login"
def test_nested_commands(self) -> None:
"""Test get_full_command_path with nested commands."""
root = MockContext("root")
toolkit = MockContext("toolkit", parent=root)
list_cmd = MockContext("list", parent=toolkit)
tracker = CommandTracker()
path = tracker.get_full_command_path(list_cmd)
assert path == "toolkit.list"
def test_deeply_nested_commands(self) -> None:
"""Test get_full_command_path with deeply nested commands."""
root = MockContext("root")
level1 = MockContext("level1", parent=root)
level2 = MockContext("level2", parent=level1)
level3 = MockContext("level3", parent=level2)
tracker = CommandTracker()
path = tracker.get_full_command_path(level3)
assert path == "level1.level2.level3"
def test_root_command_no_parent(self) -> None:
"""Test get_full_command_path with root command (no parent)."""
ctx = MockContext("root", parent=None)
tracker = CommandTracker()
path = tracker.get_full_command_path(ctx)
assert path == ""
class TestHandleSuccessfulLogin:
"""Tests for _handle_successful_login() method."""
def test_aliases_when_should_alias_true(self, mock_dependencies: dict) -> None:
"""Test that login aliases anon_id to principal_id when should_alias is True."""
mock_dependencies["identity"].get_principal_id.return_value = "principal-789"
mock_dependencies["identity"].should_alias.return_value = True
mock_dependencies["identity"].anon_id = "anon-456"
tracker = CommandTracker()
tracker._handle_successful_login()
mock_dependencies["service"].alias.assert_called_once_with(
previous_id="anon-456", distinct_id="principal-789"
)
def test_does_not_alias_when_should_alias_false(self, mock_dependencies: dict) -> None:
"""Test that login does not alias when should_alias is False."""
mock_dependencies["identity"].get_principal_id.return_value = "principal-789"
mock_dependencies["identity"].should_alias.return_value = False
tracker = CommandTracker()
tracker._handle_successful_login()
mock_dependencies["service"].alias.assert_not_called()
def test_always_sets_linked_principal_id(self, mock_dependencies: dict) -> None:
"""Test that login always sets linked_principal_id on success."""
mock_dependencies["identity"].get_principal_id.return_value = "principal-999"
mock_dependencies["identity"].should_alias.return_value = False
tracker = CommandTracker()
tracker._handle_successful_login()
mock_dependencies["identity"].set_linked_principal_id.assert_called_once_with(
"principal-999"
)
def test_does_nothing_when_no_principal_id(self, mock_dependencies: dict) -> None:
"""Test that login does nothing when principal_id is None."""
mock_dependencies["identity"].get_principal_id.return_value = None
tracker = CommandTracker()
tracker._handle_successful_login()
mock_dependencies["service"].alias.assert_not_called()
mock_dependencies["identity"].set_linked_principal_id.assert_not_called()
class TestHandleSuccessfulLogout:
"""Tests for _handle_successful_logout() method."""
@patch("arcade_cli.usage.command_tracker.platform")
def test_sends_logout_event(self, mock_platform: MagicMock, mock_dependencies: dict) -> None:
"""Test that logout sends a logout event."""
mock_platform.system.return_value = "Darwin"
mock_platform.release.return_value = "23.4.0"
mock_dependencies["identity"].load_or_create.return_value = {
"linked_principal_id": "user-123"
}
mock_dependencies["identity"].get_distinct_id.return_value = "user-123"
tracker = CommandTracker()
tracker._handle_successful_logout("logout", duration_ms=100.5)
mock_dependencies["service"].capture.assert_called_once()
call_args = mock_dependencies["service"].capture.call_args
assert call_args[0][0] == "CLI execution succeeded"
assert call_args[0][1] == "user-123"
assert call_args[1]["properties"]["command_name"] == "logout"
assert call_args[1]["properties"]["duration_ms"] == round(100.5)
def test_resets_to_anonymous_when_was_authenticated(self, mock_dependencies: dict) -> None:
"""Test that logout resets to anonymous when user was authenticated."""
mock_dependencies["identity"].load_or_create.return_value = {
"linked_principal_id": "user-123"
}
mock_dependencies["identity"].get_distinct_id.return_value = "user-123"
tracker = CommandTracker()
tracker._handle_successful_logout("logout")
mock_dependencies["identity"].reset_to_anonymous.assert_called_once()
def test_does_not_reset_when_not_authenticated(self, mock_dependencies: dict) -> None:
"""Test that logout does not reset anon_id when user was not authenticated."""
mock_dependencies["identity"].load_or_create.return_value = {"linked_principal_id": None}
mock_dependencies["identity"].get_distinct_id.return_value = "anon-123"
tracker = CommandTracker()
tracker._handle_successful_logout("logout")
mock_dependencies["identity"].reset_to_anonymous.assert_not_called()
@patch("arcade_cli.usage.command_tracker.platform")
def test_includes_duration_when_provided(
self, mock_platform: MagicMock, mock_dependencies: dict
) -> None:
"""Test that logout includes duration in event properties."""
mock_platform.system.return_value = "Darwin"
mock_platform.release.return_value = "23.4.0"
mock_dependencies["identity"].load_or_create.return_value = {"linked_principal_id": None}
mock_dependencies["identity"].get_distinct_id.return_value = "anon-123"
tracker = CommandTracker()
tracker._handle_successful_logout("logout", duration_ms=250.75)
call_args = mock_dependencies["service"].capture.call_args
assert call_args[1]["properties"]["duration_ms"] == round(250.75)
class TestTrackCommandExecution:
"""Tests for track_command_execution() method."""
def test_does_nothing_when_tracking_disabled(self, mock_dependencies: dict) -> None:
"""Test that track_command_execution does nothing when tracking is disabled."""
mock_dependencies["tracking"].return_value = False
tracker = CommandTracker()
tracker.track_command_execution("test.command", success=True)
mock_dependencies["service"].capture.assert_not_called()
@patch("arcade_cli.usage.command_tracker.platform")
def test_tracks_successful_command(
self, mock_platform: MagicMock, mock_dependencies: dict
) -> None:
"""Test that successful command execution is tracked."""
mock_platform.system.return_value = "Darwin"
mock_platform.release.return_value = "23.4.0"
mock_dependencies["identity"].get_distinct_id.return_value = "user-789"
mock_dependencies["identity"].anon_id = "anon-456"
tracker = CommandTracker()
tracker.track_command_execution("test.command", success=True, duration_ms=50.25)
mock_dependencies["service"].capture.assert_called_once()
call_args = mock_dependencies["service"].capture.call_args
assert call_args[0][0] == "CLI execution succeeded"
assert call_args[0][1] == "user-789"
assert call_args[1]["properties"]["command_name"] == "test.command"
assert call_args[1]["properties"]["duration_ms"] == round(50.25)
@patch("arcade_cli.usage.command_tracker.platform")
def test_tracks_failed_command(self, mock_platform: MagicMock, mock_dependencies: dict) -> None:
"""Test that failed command execution is tracked."""
mock_platform.system.return_value = "Linux"
mock_platform.release.return_value = "5.15.0"
mock_dependencies["identity"].get_distinct_id.return_value = "user-999"
mock_dependencies["identity"].anon_id = "anon-888"
tracker = CommandTracker()
tracker.track_command_execution(
"failed.command", success=False, error_message="Something went wrong"
)
mock_dependencies["service"].capture.assert_called_once()
call_args = mock_dependencies["service"].capture.call_args
assert call_args[0][0] == "CLI execution failed"
assert call_args[1]["properties"]["error_message"] == "Something went wrong"
def test_handles_login_command(self, mock_dependencies: dict) -> None:
"""Test that login command triggers _handle_successful_login."""
mock_dependencies["identity"].get_principal_id.return_value = "principal-123"
mock_dependencies["identity"].should_alias.return_value = True
mock_dependencies["identity"].anon_id = "anon-789"
tracker = CommandTracker()
tracker.track_command_execution("login", success=True, is_login=True)
# Should call alias as part of login handling
mock_dependencies["service"].alias.assert_called_once()
def test_handles_logout_command(self, mock_dependencies: dict) -> None:
"""Test that logout command triggers _handle_successful_logout."""
mock_dependencies["identity"].load_or_create.return_value = {
"linked_principal_id": "user-123"
}
mock_dependencies["identity"].get_distinct_id.return_value = "user-123"
tracker = CommandTracker()
tracker.track_command_execution("logout", success=True, is_logout=True)
# Should reset to anonymous as part of logout handling
mock_dependencies["identity"].reset_to_anonymous.assert_called_once()
def test_lazy_alias_check_on_other_commands(self, mock_dependencies: dict) -> None:
"""Test that non-login commands trigger lazy alias check."""
mock_dependencies["identity"].should_alias.return_value = True
mock_dependencies["identity"].get_principal_id.return_value = "principal-456"
mock_dependencies["identity"].anon_id = "anon-999"
mock_dependencies["identity"].get_distinct_id.return_value = "principal-456"
tracker = CommandTracker()
tracker.track_command_execution("other.command", success=True)
# Should call alias due to lazy check
mock_dependencies["service"].alias.assert_called_once_with(
previous_id="anon-999", distinct_id="principal-456"
)
def test_rounds_duration_to_two_decimals(self, mock_dependencies: dict) -> None:
"""Test that duration is rounded to 2 decimal places."""
mock_dependencies["identity"].get_distinct_id.return_value = "user-123"
mock_dependencies["identity"].anon_id = "anon-456"
tracker = CommandTracker()
tracker.track_command_execution("test", success=True, duration_ms=123.456789)
call_args = mock_dependencies["service"].capture.call_args
assert call_args[1]["properties"]["duration_ms"] == round(123.46)
@patch("arcade_cli.usage.command_tracker.platform")
def test_sets_is_anon_flag_correctly(
self, mock_platform: MagicMock, mock_dependencies: dict
) -> None:
"""Test that is_anon flag is set correctly."""
mock_platform.system.return_value = "Darwin"
mock_platform.release.return_value = "23.4.0"
mock_dependencies["identity"].get_distinct_id.return_value = "anon-123"
mock_dependencies["identity"].anon_id = "anon-123"
tracker = CommandTracker()
tracker.track_command_execution("test", success=True)
call_args = mock_dependencies["service"].capture.call_args
assert call_args[1]["is_anon"] is True
@patch("arcade_cli.usage.command_tracker.platform")
def test_is_anon_false_for_authenticated_user(
self, mock_platform: MagicMock, mock_dependencies: dict
) -> None:
"""Test that is_anon is False for authenticated users."""
mock_platform.system.return_value = "Darwin"
mock_platform.release.return_value = "23.4.0"
mock_dependencies["identity"].get_distinct_id.return_value = "user-authenticated"
mock_dependencies["identity"].anon_id = "anon-123"
tracker = CommandTracker()
tracker.track_command_execution("test", success=True)
call_args = mock_dependencies["service"].capture.call_args
assert call_args[1]["is_anon"] is False
@patch("arcade_cli.usage.command_tracker.platform")
def test_includes_os_info_in_properties(
self, mock_platform: MagicMock, mock_dependencies: dict
) -> None:
"""Test that OS information is included in event properties."""
mock_platform.system.return_value = "Windows"
mock_platform.release.return_value = "10"
mock_dependencies["identity"].get_distinct_id.return_value = "user-123"
mock_dependencies["identity"].anon_id = "anon-456"
tracker = CommandTracker()
tracker.track_command_execution("test", success=True)
call_args = mock_dependencies["service"].capture.call_args
properties = call_args[1]["properties"]
assert properties["os_type"] == "Windows"
assert properties["os_release"] == "10"
def test_does_not_include_duration_when_not_provided(self, mock_dependencies: dict) -> None:
"""Test that duration is not included in properties when not provided."""
mock_dependencies["identity"].get_distinct_id.return_value = "user-123"
mock_dependencies["identity"].anon_id = "anon-456"
tracker = CommandTracker()
tracker.track_command_execution("test", success=True)
call_args = mock_dependencies["service"].capture.call_args
properties = call_args[1]["properties"]
assert "duration_ms" not in properties

View file

@ -0,0 +1,383 @@
import json
import uuid
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
import yaml
from arcade_cli.usage.identity import UsageIdentity
@pytest.fixture
def temp_config_path(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path:
"""Setup temporary config directory for testing."""
config_dir = tmp_path / ".arcade"
config_dir.mkdir()
credentials_file = config_dir / "credentials.yaml"
monkeypatch.setattr("arcade_cli.usage.identity.ARCADE_CONFIG_PATH", str(config_dir))
monkeypatch.setattr("arcade_cli.usage.identity.CREDENTIALS_FILE_PATH", str(credentials_file))
return config_dir
@pytest.fixture
def identity(temp_config_path: Path) -> UsageIdentity:
"""Create a UsageIdentity instance with temp config path."""
# NOTE: Although temp_config_path is directly used, it's required to ensure that
# this fixture depends on the temp_config_path fixture to apply the monkeypatch
# before creating the UsageIdentity instance
return UsageIdentity()
class TestLoadOrCreate:
"""Tests for load_or_create() method."""
def test_creates_new_file_when_not_exists(
self, identity: UsageIdentity, temp_config_path: Path
) -> None:
"""Test that load_or_create creates a new usage.json file when it doesn't exist."""
data = identity.load_or_create()
assert "anon_id" in data
assert data["linked_principal_id"] is None
assert uuid.UUID(data["anon_id"]) # Validate UUID format
# Verify file was created
usage_file = temp_config_path / "usage.json"
assert usage_file.exists()
def test_loads_existing_file(self, identity: UsageIdentity, temp_config_path: Path) -> None:
"""Test that load_or_create loads existing usage.json file."""
existing_data = {"anon_id": str(uuid.uuid4()), "linked_principal_id": "user-123"}
usage_file = temp_config_path / "usage.json"
usage_file.write_text(json.dumps(existing_data))
data = identity.load_or_create()
assert data["anon_id"] == existing_data["anon_id"]
assert data["linked_principal_id"] == existing_data["linked_principal_id"]
def test_caches_data_after_first_load(self, identity: UsageIdentity) -> None:
"""Test that load_or_create caches data after first load."""
first_data = identity.load_or_create()
second_data = identity.load_or_create()
# Should return the same object b/c it's cached
assert first_data is second_data
def test_creates_new_data_on_corrupted_json(
self, identity: UsageIdentity, temp_config_path: Path
) -> None:
"""Test that load_or_create creates new data if JSON is corrupted."""
usage_file = temp_config_path / "usage.json"
usage_file.write_text("{ invalid json }")
data = identity.load_or_create()
assert "anon_id" in data
assert data["linked_principal_id"] is None
def test_creates_new_data_on_missing_anon_id(
self, identity: UsageIdentity, temp_config_path: Path
) -> None:
"""Test that load_or_create creates new data if anon_id is missing."""
usage_file = temp_config_path / "usage.json"
usage_file.write_text(json.dumps({"some_other_key": "value"}))
data = identity.load_or_create()
assert "anon_id" in data
assert data["linked_principal_id"] is None
def test_creates_new_data_on_non_dict_json(
self, identity: UsageIdentity, temp_config_path: Path
) -> None:
"""Test that load_or_create creates new data if JSON is not a dict."""
usage_file = temp_config_path / "usage.json"
usage_file.write_text(json.dumps(["not", "a", "dict"]))
data = identity.load_or_create()
assert "anon_id" in data
assert isinstance(data, dict)
class TestWriteAtomic:
"""Tests for _write_atomic() method."""
def test_writes_data_to_file(self, identity: UsageIdentity, temp_config_path: Path) -> None:
"""Test that _write_atomic writes data to usage.json."""
test_data = {"anon_id": str(uuid.uuid4()), "linked_principal_id": "user-456"}
identity._write_atomic(test_data)
usage_file = temp_config_path / "usage.json"
assert usage_file.exists()
with usage_file.open() as f:
loaded_data = json.load(f)
assert loaded_data == test_data
def test_atomic_write_cleans_up_on_failure(
self, identity: UsageIdentity, temp_config_path: Path
) -> None:
"""Test that _write_atomic cleans up temp file on failure."""
with (
patch(
"tempfile.mkstemp", return_value=(999, str(temp_config_path / ".usage_temp.tmp"))
),
patch("os.fdopen", side_effect=Exception("Write failed")),
):
with pytest.raises(Exception, match="Write failed"):
identity._write_atomic({"anon_id": "test"})
# Verify no temp files are left behind
temp_files = list(temp_config_path.glob(".usage_*.tmp"))
assert len(temp_files) == 0
class TestGetDistinctId:
"""Tests for get_distinct_id() method."""
def test_returns_linked_principal_id_when_persisted(
self, identity: UsageIdentity, temp_config_path: Path
) -> None:
"""Test that get_distinct_id returns persisted linked_principal_id."""
usage_file = temp_config_path / "usage.json"
usage_file.write_text(
json.dumps({"anon_id": str(uuid.uuid4()), "linked_principal_id": "persisted-user-123"})
)
distinct_id = identity.get_distinct_id()
assert distinct_id == "persisted-user-123"
@patch("arcade_cli.usage.identity.UsageIdentity.get_principal_id")
def test_returns_principal_id_from_api_when_not_persisted(
self, mock_get_principal: MagicMock, identity: UsageIdentity
) -> None:
"""Test that get_distinct_id fetches principal_id from API when not persisted."""
mock_get_principal.return_value = "api-user-456"
distinct_id = identity.get_distinct_id()
assert distinct_id == "api-user-456"
mock_get_principal.assert_called_once()
@patch("arcade_cli.usage.identity.UsageIdentity.get_principal_id")
def test_returns_anon_id_when_not_authenticated(
self, mock_get_principal: MagicMock, identity: UsageIdentity
) -> None:
"""Test that get_distinct_id returns anon_id when not authenticated."""
mock_get_principal.return_value = None
distinct_id = identity.get_distinct_id()
data = identity.load_or_create()
assert distinct_id == data["anon_id"]
class TestGetPrincipalId:
"""Tests for get_principal_id() method."""
def test_returns_none_when_credentials_file_not_exists(self, identity: UsageIdentity) -> None:
"""Test that get_principal_id returns None when credentials file doesn't exist."""
principal_id = identity.get_principal_id()
assert principal_id is None
@patch("httpx.get")
def test_returns_principal_id_on_successful_api_call(
self, mock_get: MagicMock, identity: UsageIdentity, temp_config_path: Path
) -> None:
"""Test that get_principal_id returns principal_id from API."""
# Create credentials file
credentials_file = temp_config_path / "credentials.yaml"
credentials_file.write_text(yaml.dump({"cloud": {"api": {"key": "test-api-key"}}}))
# Mock API response
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"data": {"principal_id": "api-principal-123"}}
mock_get.return_value = mock_response
principal_id = identity.get_principal_id()
assert principal_id == "api-principal-123"
mock_get.assert_called_once_with(
"https://cloud.arcade.dev/api/v1/auth/validate",
headers={"accept": "application/json", "Authorization": "Bearer test-api-key"},
timeout=2.0,
)
@patch("httpx.get")
def test_returns_none_on_api_failure(
self, mock_get: MagicMock, identity: UsageIdentity, temp_config_path: Path
) -> None:
"""Test that get_principal_id returns None on API failure."""
credentials_file = temp_config_path / "credentials.yaml"
credentials_file.write_text(yaml.dump({"cloud": {"api": {"key": "test-api-key"}}}))
mock_get.side_effect = Exception("Network error")
principal_id = identity.get_principal_id()
assert principal_id is None
def test_returns_none_when_api_key_missing(
self, identity: UsageIdentity, temp_config_path: Path
) -> None:
"""Test that get_principal_id returns None when API key is missing."""
credentials_file = temp_config_path / "credentials.yaml"
credentials_file.write_text(yaml.dump({"cloud": {}}))
principal_id = identity.get_principal_id()
assert principal_id is None
@patch("httpx.get")
def test_returns_none_on_non_200_status(
self, mock_get: MagicMock, identity: UsageIdentity, temp_config_path: Path
) -> None:
"""Test that get_principal_id returns None on non-200 status code."""
credentials_file = temp_config_path / "credentials.yaml"
credentials_file.write_text(yaml.dump({"cloud": {"api": {"key": "test-api-key"}}}))
mock_response = MagicMock()
mock_response.status_code = 401
mock_get.return_value = mock_response
principal_id = identity.get_principal_id()
assert principal_id is None
class TestShouldAlias:
"""Tests for should_alias() method."""
@patch("arcade_cli.usage.identity.UsageIdentity.get_principal_id")
def test_returns_true_when_authenticated_but_not_linked(
self, mock_get_principal: MagicMock, identity: UsageIdentity
) -> None:
"""Test that should_alias returns True when user is authenticated but not yet aliased."""
mock_get_principal.return_value = "new-principal-id"
should_alias = identity.should_alias()
assert should_alias is True
@patch("arcade_cli.usage.identity.UsageIdentity.get_principal_id")
def test_returns_false_when_already_linked(
self, mock_get_principal: MagicMock, identity: UsageIdentity, temp_config_path: Path
) -> None:
"""Test that should_alias returns False when principal_id already linked."""
principal_id = "already-linked-123"
mock_get_principal.return_value = principal_id
usage_file = temp_config_path / "usage.json"
usage_file.write_text(
json.dumps({"anon_id": str(uuid.uuid4()), "linked_principal_id": principal_id})
)
should_alias = identity.should_alias()
assert should_alias is False
@patch("arcade_cli.usage.identity.UsageIdentity.get_principal_id")
def test_returns_false_when_not_authenticated(
self, mock_get_principal: MagicMock, identity: UsageIdentity
) -> None:
"""Test that should_alias returns False when not authenticated."""
mock_get_principal.return_value = None
should_alias = identity.should_alias()
assert should_alias is False
class TestResetToAnonymous:
"""Tests for reset_to_anonymous() method."""
def test_generates_new_anon_id(self, identity: UsageIdentity) -> None:
"""Test that reset_to_anonymous generates a new anon_id."""
original_data = identity.load_or_create()
original_anon_id = original_data["anon_id"]
identity.reset_to_anonymous()
new_data = identity.load_or_create()
assert new_data["anon_id"] != original_anon_id
assert uuid.UUID(new_data["anon_id"]) # Validates UUID format
def test_clears_linked_principal_id(self, identity: UsageIdentity) -> None:
"""Test that reset_to_anonymous clears linked_principal_id."""
identity.set_linked_principal_id("user-to-clear")
identity.reset_to_anonymous()
data = identity.load_or_create()
assert data["linked_principal_id"] is None
def test_persists_to_file(self, identity: UsageIdentity, temp_config_path: Path) -> None:
"""Test that reset_to_anonymous persists changes to file."""
identity.reset_to_anonymous()
usage_file = temp_config_path / "usage.json"
assert usage_file.exists()
with usage_file.open() as f:
file_data = json.load(f)
assert "anon_id" in file_data
assert file_data["linked_principal_id"] is None
class TestSetLinkedPrincipalId:
"""Tests for set_linked_principal_id() method."""
def test_updates_linked_principal_id(self, identity: UsageIdentity) -> None:
"""Test that set_linked_principal_id updates the linked_principal_id."""
identity.load_or_create() # Initialize
identity.set_linked_principal_id("new-user-789")
data = identity.load_or_create()
assert data["linked_principal_id"] == "new-user-789"
def test_persists_to_file(self, identity: UsageIdentity, temp_config_path: Path) -> None:
"""Test that set_linked_principal_id persists changes to file."""
identity.set_linked_principal_id("persisted-user-999")
usage_file = temp_config_path / "usage.json"
with usage_file.open() as f:
file_data = json.load(f)
assert file_data["linked_principal_id"] == "persisted-user-999"
def test_updates_cache(self, identity: UsageIdentity) -> None:
"""Test that set_linked_principal_id updates the cached data."""
identity.load_or_create()
identity.set_linked_principal_id("cached-user-111")
# Access _data directly to verify cache updated
assert identity._data is not None
assert identity._data["linked_principal_id"] == "cached-user-111"
class TestAnonIdProperty:
"""Tests for anon_id property."""
def test_returns_anon_id(self, identity: UsageIdentity) -> None:
"""Test that anon_id property returns the anon_id."""
data = identity.load_or_create()
assert identity.anon_id == data["anon_id"]
def test_returns_valid_uuid(self, identity: UsageIdentity) -> None:
"""Test that anon_id property returns a valid UUID."""
anon_id = identity.anon_id
assert uuid.UUID(anon_id) # Validate UUID format

View file

@ -1,6 +1,6 @@
[project]
name = "arcade-mcp"
version = "1.0.0rc3"
version = "1.0.0rc4"
description = "Arcade.dev - Tool Calling platform for Agents"
readme = "README.md"
license = {file = "LICENSE"}
@ -30,6 +30,7 @@ dependencies = [
"tqdm==4.67.1",
"openai==1.82.1",
"click==8.1.8",
"posthog==6.7.6",
]
[project.optional-dependencies]