arcade-mcp/libs/arcade-cli/arcade_cli/usage/command_tracker.py
Eric Gustin 113d0d3086
CLI Usage (#593)
TLDR; 

The philosophy of CLI usage is "fire and forget" and "best effort". You
can opt out by setting `ARCADE_USAGE_TRACKING=0`.

We are capturing two events: `CLI execution succeeded` and `CLI
execution failed`. Reporting to PostHog is a short lived (maximum 10
seconds) subprocess that does not block the main CLI execution process.

`~/.arcade/usage.json` persists two values `anon_id` and
`linked_principal_id`. The logged in status of the CLI user determines
which ID is used. Upon `arcade login`, the `anon_id` is aliased with
`linked_principal_id`. Upon `arcade logout` the `linked_principal_id` is
removed and the `anon_id` is rotated.

## CLI Usage Tracking - How It Works

The usage tracking system implements an identity management and event
tracking pipeline. Here's how the pieces work together:

### **Identity State Management (`usage.json`)**

The system maintains a persistent identity file at
`~/.arcade/usage.json` with this structure:
```json
{
  "anon_id": "uuid",
  "linked_principal_id": "uuid" | null
}
```

**Key mechanics:**
- **`anon_id`**: Generated once on first CLI use and persists across
sessions. This UUID tracks all anonymous activity.
- **`linked_principal_id`**: Initially `null`. Once the user logs in and
we successfully alias their identity, this field stores their
`principal_id` to indicate this `anon_id` has been linked.
- **Atomic writes**: All updates use a temp file + atomic rename pattern
to prevent corruption from concurrent CLI processes
- **File locking**: Uses `fcntl` (Unix) to coordinate reads/writes
across multiple simultaneous CLI invocations
- **In-memory cache**: The `UsageIdentity` class caches the loaded data
to avoid repeated file I/O within a single CLI invocation

### **Identity Resolution Flow**

When tracking an event, the system determines the `distinct_id` (who to
attribute the event to) via this waterfall:

1. **Check `linked_principal_id`** in `usage.json`
   - If present → use it (user was previously aliased)
   - This is the fastest path and avoids API calls

2. **Fetch `principal_id` from Arcade Cloud API**
- Makes HTTP request to `/api/v1/auth/validate` with the user's API key
from `~/.arcade/credentials.yaml`
   - If authenticated → returns `principal_id`
   - Has 2s timeout for responsiveness

3. **Fall back to `anon_id`**
   - If not authenticated or API call fails → use anonymous ID
   - Marks event with `is_anon=True` flag

### **The Aliasing Lifecycle**

PostHog aliasing links anonymous activity to authenticated users. Here's
the state machine:

#### **Stage 1: Anonymous User**
```
usage.json: { "anon_id": "abc-123", "linked_principal_id": null }
All events → sent with distinct_id="abc-123" and is_anon=True
```

#### **Stage 2: Login Event**
1. User runs `arcade login`
2. Command completes successfully (auth token saved)
3. `CommandTracker` detects successful login
4. Fetches `principal_id` from API
5. Checks `should_alias()` → returns `True` because
`linked_principal_id` is `null`
6. **Calls `alias()` synchronously** (blocking):
   ```python
   posthog.alias(previous_id="abc-123", distinct_id="zyx-321")
   ```
7. Updates `usage.json`:
   ```json
   { "anon_id": "abc-123", "linked_principal_id": "zyx-321" }
   ```
8. PostHog backend merges all events with `distinct_id="abc-123"` into
the user profile for `"zyx-321"`

#### **Stage 3: Authenticated User**
```
usage.json: { "anon_id": "abc-123", "linked_principal_id": "zyx-321" }
All events → sent with distinct_id="zyx-321" and is_anon=False
```
- Events are directly attributed to the authenticated user
- No more API calls needed (uses cached `linked_principal_id`)

#### **Stage 4: Logout Event**
1. User runs `arcade logout`
2. Logout event is sent with the authenticated `distinct_id`
3. `CommandTracker` detects successful logout
4. **Rotates identity** by calling `reset_to_anonymous()`:
   ```json
   { "anon_id": "xyz-789", "linked_principal_id": null }
   ```
5. New `anon_id` prevents cross-contamination if another user logs in

### **Critical Constraint: Alias Timing**

PostHog requires that `alias()` is called **BEFORE** any events are sent
with the new `distinct_id`. This is why:
- **`alias()` is synchronous (blocking)**: Guarantees it completes
before the login success event is sent
- **Subsequent events use `linked_principal_id`**: Once aliased, all
future events use the authenticated ID
- **Lazy aliasing**: If a user authenticates via another mechanism (not
through `arcade login`), the system detects this on the next command and
performs aliasing before sending that command's event

### **Event Capture Pipeline**

When `CommandTracker.track_command_execution()` is called:

1. **Resolve identity** → determines `distinct_id` and `is_anon` flag
2. **Build event properties**:
   ```python
   {
     "command_name": "toolkit.run",
     "cli_version": "1.2.3",
     "python_version": "3.11.0",
     "os_type": "Darwin",
     "os_release": "23.4.0",
     "duration": 1250.42,  # milliseconds
     "error_message": "..."  # if failed
   }
   ```
3. **Call `UsageService.capture()`**:
   - Serializes event data to JSON
   - Spawns detached subprocess: `python -m arcade_cli.usage`
   - Passes data via `ARCADE_USAGE_EVENT_DATA` env var
   - **Returns immediately** (non-blocking)

4. **Detached subprocess (`__main__.py`)**:
   - Runs independently, survives parent CLI exit
   - Deserializes event data
- If `is_anon=True`, sets `$process_person_profile=False` (tells PostHog
not to create a full profile)
   - Sends event to PostHog with 5s timeout
   - Exits (hard exit after 10s max via timeout thread)

### **Concurrency Handling**

Multiple CLI processes can run simultaneously. The system handles this
via:
- **File locking** on `usage.json` (shared lock for reads, exclusive for
writes)
- **Atomic writes** via temp files ensure incomplete writes never
corrupt the file
- **Idempotent aliasing**: `should_alias()` prevents redundant alias
calls

### **Edge Cases Handled**

1. **Side-channel authentication**: User authenticates outside of
`arcade login` (e.g., manually editing credentials)
   - Detected via "lazy aliasing" check on every command
- Performs alias if `linked_principal_id` doesn't match current
`principal_id`

2. **API failures during identity fetch**: Falls back to anonymous
tracking
   - 2s timeout prevents hanging
   - Silent failure doesn't disrupt CLI

3. **PostHog merge restrictions**: Can't alias returning users who
already have a profile
- System stores `linked_principal_id` to avoid retrying impossible
aliases
   - New users (never logged in before) get full history stitched

4. **Multiple accounts on same machine**: Logout rotates `anon_id`
   - User A's anonymous activity won't leak into User B's profile

### **Privacy & Performance**

- **Opt-out**: `ARCADE_USAGE_TRACKING=0` disables all tracking
- **Non-blocking**: Events never slow down CLI (detached subprocess)
- **Anonymous profiles**: `$process_person_profile=False` for `anon_id`
events minimizes data collection
- **Silent failures**: Network issues or PostHog errors never surface to
users
2025-10-03 10:15:08 -07:00

352 lines
13 KiB
Python

import functools
import os
import platform
import sys
import time
from importlib import metadata
from typing import Any
import typer
from arcade_cli.constants import ARCADE_CONFIG_PATH
from arcade_cli.usage.constants import (
EVENT_CLI_COMMAND_EXECUTED,
EVENT_CLI_COMMAND_FAILED,
PROP_CLI_VERSION,
PROP_COMMAND_NAME,
PROP_DEVICE_MONOTONIC_END,
PROP_DEVICE_MONOTONIC_START,
PROP_DURATION_MS,
PROP_ERROR_MESSAGE,
PROP_OS_RELEASE,
PROP_OS_TYPE,
PROP_RUNTIME_LANGUAGE,
PROP_RUNTIME_VERSION,
)
from arcade_cli.usage.identity import UsageIdentity
from arcade_cli.usage.usage_service import UsageService
from arcade_cli.usage.utils import is_tracking_enabled
from rich.console import Console
from typer.core import TyperCommand, TyperGroup
from typer.models import Context
console = Console()
class CommandTracker:
"""Tracks CLI command execution for usage analytics."""
def __init__(self) -> None:
self.usage_service = UsageService()
self.identity = UsageIdentity()
self._cli_version: str | None = None
self._runtime_version: str | None = None
@property
def cli_version(self) -> str:
"""Get CLI version, cached after first access."""
if self._cli_version is None:
try:
self._cli_version = metadata.version("arcade-mcp")
except Exception:
self._cli_version = "unknown"
return self._cli_version
@property
def runtime_language(self) -> str:
"""Get the runtime language (always 'python' for this CLI)."""
return "python"
@property
def runtime_version(self) -> str:
"""Get runtime version, cached after first access."""
if self._runtime_version is None:
version_info = sys.version_info
self._runtime_version = (
f"{version_info.major}.{version_info.minor}.{version_info.micro}"
)
return self._runtime_version
@property
def user_id(self) -> str:
"""Get distinct_id based on authentication state."""
return self.identity.get_distinct_id()
def get_full_command_path(self, ctx: typer.Context) -> str:
"""Get the full command path by traversing the context hierarchy."""
command_parts = []
current_ctx: Any = ctx
while current_ctx and current_ctx.parent:
if current_ctx.command.name:
command_parts.append(current_ctx.command.name)
current_ctx = current_ctx.parent
return ".".join(reversed(command_parts))
def _handle_successful_login(self) -> None:
"""Handle a successful login event.
Upon a successful login, we retrieve and persist the principal_id for the logged in user.
We then alias the persisted anon_id to the known person with principal_id.
As a result, the previous anonymous events will be attributed to the known person with principal_id.
"""
principal_id = self.identity.get_principal_id()
if principal_id:
if self.identity.should_alias():
# Alias the anon_id to the known person with principal_id
self.usage_service.alias(
previous_id=self.identity.anon_id, distinct_id=principal_id
)
# Always update the linked principal_id on successful login
self.identity.set_linked_principal_id(principal_id)
def _handle_successful_logout(
self,
command_name: str,
duration_ms: float | None = None,
monotonic_start: float | None = None,
monotonic_end: float | None = None,
) -> None:
"""Handle a successful logout event.
Upon a successful logout, we rotate the anon_id and clear the linked principal_id.
"""
# Check if user was authenticated before logout (has linked_principal_id)
data = self.identity.load_or_create()
was_authenticated = data.get("linked_principal_id") is not None
# Send logout event as the authenticated user before resetting to anonymous
properties: dict[str, Any] = {
PROP_COMMAND_NAME: command_name,
PROP_CLI_VERSION: self.cli_version,
PROP_RUNTIME_LANGUAGE: self.runtime_language,
PROP_RUNTIME_VERSION: self.runtime_version,
PROP_OS_TYPE: platform.system(),
PROP_OS_RELEASE: platform.release(),
}
if duration_ms:
properties[PROP_DURATION_MS] = round(duration_ms)
if monotonic_start is not None:
properties[PROP_DEVICE_MONOTONIC_START] = monotonic_start
if monotonic_end is not None:
properties[PROP_DEVICE_MONOTONIC_END] = monotonic_end
# Check if using anon_id
is_anon = self.user_id == self.identity.anon_id
self.usage_service.capture(
EVENT_CLI_COMMAND_EXECUTED, self.user_id, properties=properties, is_anon=is_anon
)
# Only rotate anon_id if user was actually authenticated
if was_authenticated:
self.identity.reset_to_anonymous()
def track_command_execution(
self,
command_name: str,
success: bool,
duration_ms: float | None = None,
error_message: str | None = None,
is_login: bool = False,
is_logout: bool = False,
monotonic_start: float | None = None,
monotonic_end: float | None = None,
) -> None:
"""Track command execution event.
Args:
command_name: The name of the CLI command that was executed.
success: Whether the command was successfully executed.
duration_ms: The duration of the command execution in milliseconds.
error_message: The error message if the command failed.
is_login: Whether this is a login command.
is_logout: Whether this is a logout command.
monotonic_start: Monotonic clock timestamp at command start.
monotonic_end: Monotonic clock timestamp at command end.
"""
if not is_tracking_enabled():
return
if is_login and success:
self._handle_successful_login()
elif is_logout and success:
self._handle_successful_logout(
command_name, duration_ms, monotonic_start, monotonic_end
)
return
# Edge case: Lazy alias check for other commands (if user authenticated via side path)
elif not is_login and not is_logout and self.identity.should_alias():
principal_id = self.identity.get_principal_id()
if principal_id:
self.usage_service.alias(
previous_id=self.identity.anon_id, distinct_id=principal_id
)
self.identity.set_linked_principal_id(principal_id)
event_name = EVENT_CLI_COMMAND_EXECUTED if success else EVENT_CLI_COMMAND_FAILED
properties: dict[str, Any] = {
PROP_COMMAND_NAME: command_name,
PROP_CLI_VERSION: self.cli_version,
PROP_RUNTIME_LANGUAGE: self.runtime_language,
PROP_RUNTIME_VERSION: self.runtime_version,
PROP_OS_TYPE: platform.system(),
PROP_OS_RELEASE: platform.release(),
}
if not success and error_message:
properties[PROP_ERROR_MESSAGE] = error_message
if duration_ms:
properties[PROP_DURATION_MS] = round(duration_ms)
if monotonic_start is not None:
properties[PROP_DEVICE_MONOTONIC_START] = monotonic_start
if monotonic_end is not None:
properties[PROP_DEVICE_MONOTONIC_END] = monotonic_end
# Check if using anon_id (not authenticated)
is_anon = self.user_id == self.identity.anon_id
self.usage_service.capture(event_name, self.user_id, properties=properties, is_anon=is_anon)
# Global tracker instance
command_tracker = CommandTracker()
class TrackedTyperCommand(TyperCommand):
"""Custom TyperCommand that tracks individual command execution."""
def invoke(self, ctx: Any) -> Any:
"""Override invoke to track command execution."""
if not os.path.exists(ARCADE_CONFIG_PATH):
console.print(
"[yellow]Arcade collects CLI usage data to help us debug and improve the service. "
"By continuing to use the Arcade CLI, you agree to the terms of our Privacy Policy. "
"To opt out, set the ARCADE_USAGE_TRACKING environment variable to 0.[/yellow]"
)
command_name = ctx.command.name
is_login = command_name == "login"
is_logout = command_name == "logout"
try:
start_time = time.time()
start_monotonic = time.monotonic()
result = super().invoke(ctx)
end_time = time.time()
end_monotonic = time.monotonic()
duration = end_time - start_time
command_tracker.track_command_execution(
command_tracker.get_full_command_path(ctx),
success=True,
duration_ms=duration * 1000,
is_login=is_login,
is_logout=is_logout,
monotonic_start=start_monotonic,
monotonic_end=end_monotonic,
)
except Exception as e:
end_time = time.time()
end_monotonic = time.monotonic()
duration = end_time - start_time
from arcade_cli.utils import CLIError
error_msg = str(e)[:300]
command_tracker.track_command_execution(
command_tracker.get_full_command_path(ctx),
success=False,
duration_ms=duration * 1000,
error_message=error_msg,
is_login=is_login,
is_logout=is_logout,
monotonic_start=start_monotonic,
monotonic_end=end_monotonic,
)
if isinstance(e, CLIError):
raise typer.Exit(code=1)
else:
raise
else:
return result
class TrackedTyperGroup(TyperGroup):
"""Custom TyperGroup that creates tracked commands."""
def command(self, *args: Any, **kwargs: Any) -> Any:
"""Override command decorator to use TrackedTyperCommand."""
# Set the custom command class
kwargs["cls"] = TrackedTyperCommand
result: Any = super().command(*args, **kwargs)
return result
def list_commands(self, ctx: Context) -> list[str]: # type: ignore[override]
"""Return list of commands in the order appear."""
return list(self.commands)
class TrackedTyper(typer.Typer):
"""Custom Typer that creates tracked commands."""
def command(
self, name: str | None = None, *, cls: type[TyperCommand] | None = None, **kwargs: Any
) -> Any:
"""Override command decorator to use TrackedTyperCommand."""
if cls is None:
cls = TrackedTyperCommand
result: Any = super().command(name, cls=cls, **kwargs)
return result
def callback(self, name: str | None = None, **kwargs: Any) -> Any:
"""Override callback decorator to track callback execution."""
original_callback_decorator: Any = super().callback(name, **kwargs)
def decorator(func: Any) -> Any:
@functools.wraps(func)
def tracked_callback(*args: Any, **cb_kwargs: Any) -> Any:
"""Wrapper that tracks callback execution."""
# Get the context from kwargs (Typer passes it)
ctx = cb_kwargs.get("ctx") or (
args[0] if args and isinstance(args[0], typer.Context) else None
)
command_name = ctx.invoked_subcommand if ctx and ctx.invoked_subcommand else "root"
start_time = time.time()
start_monotonic = time.monotonic()
try:
result = func(*args, **cb_kwargs)
except Exception as e:
# Track callback failure (auth failures, version checks, etc.)
end_time = time.time()
end_monotonic = time.monotonic()
duration = (end_time - start_time) * 1000
from arcade_cli.utils import CLIError
command_tracker.track_command_execution(
command_name,
success=False,
duration_ms=duration,
error_message=str(e)[:300],
monotonic_start=start_monotonic,
monotonic_end=end_monotonic,
)
if isinstance(e, CLIError):
raise typer.Exit(code=1)
else:
raise
else:
return result
result: Any = original_callback_decorator(tracked_callback)
return result
return decorator