agent-skill-creator/scripts/staleness_check.py
francylisboacharuto cb66ad17cc feat: Add skill staleness detection (review tracking, dependency health, schema drift)
Skills go stale as APIs change and data sources move. This adds a three-layer
staleness detection system: review date tracking with git fallback, HTTP health
checks for declared dependencies, and top-level key comparison for schema drift.
All new frontmatter fields are optional — existing skills work unchanged.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 09:09:48 -03:00

796 lines
26 KiB
Python

#!/usr/bin/env python3
"""
Staleness Detection for Agent Skills.
Checks whether a skill is overdue for review, validates dependency health,
and detects schema drift in declared API endpoints. Designed to surface
skills that may have gone stale as APIs change, compliance rules update,
and data sources move.
Usage:
python3 scripts/staleness_check.py <skill-path> [--json] [--check-deps] [--check-drift]
Exit codes:
0 - Fresh (no staleness issues)
1 - Stale (overdue for review)
2 - Degraded (dependency failures or schema drift)
"""
import json
import subprocess
import sys
from datetime import date, timedelta
from pathlib import Path
from typing import Optional
from urllib.error import URLError
from urllib.request import Request, urlopen
# --- Import sibling scripts ---
_SCRIPTS_DIR = Path(__file__).resolve().parent
if str(_SCRIPTS_DIR) not in sys.path:
sys.path.insert(0, str(_SCRIPTS_DIR))
from validate import _parse_frontmatter, _parse_subfield_value # noqa: E402
# --- Constants ---
DEFAULT_REVIEW_INTERVAL_DAYS = 90
STALENESS_WARNING_THRESHOLD_DAYS = 60
HTTP_TIMEOUT_SECONDS = 10
DATE_PATTERN_RE = None # Lazy-compiled below
def _date_pattern():
"""Return compiled regex for YYYY-MM-DD date format."""
global DATE_PATTERN_RE
if DATE_PATTERN_RE is None:
import re
DATE_PATTERN_RE = re.compile(r"^\d{4}-\d{2}-\d{2}$")
return DATE_PATTERN_RE
def _parse_date(value: str) -> Optional[date]:
"""
Parse a YYYY-MM-DD string into a date object.
Args:
value: Date string in YYYY-MM-DD format.
Returns:
A date object, or None if parsing fails.
"""
if not value or not _date_pattern().match(value.strip()):
return None
try:
parts = value.strip().split("-")
return date(int(parts[0]), int(parts[1]), int(parts[2]))
except (ValueError, IndexError):
return None
def _get_git_last_modified(skill_path: str) -> Optional[date]:
"""
Get the last git commit date for a skill directory.
Runs ``git log -1 --format=%aI`` on the SKILL.md file as a fallback
for skills without explicit review dates.
Args:
skill_path: Path to the skill directory.
Returns:
The date of the last git commit touching SKILL.md, or None
if git is unavailable or the file is untracked.
"""
skill_md = Path(skill_path).resolve() / "SKILL.md"
if not skill_md.exists():
return None
try:
result = subprocess.run(
["git", "log", "-1", "--format=%aI", "--", str(skill_md)],
capture_output=True,
text=True,
timeout=10,
cwd=str(Path(skill_path).resolve()),
)
if result.returncode != 0 or not result.stdout.strip():
return None
# ISO format: 2025-01-15T10:30:00+00:00 -- take the date part
iso_str = result.stdout.strip()
return _parse_date(iso_str[:10])
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
return None
def _check_review_staleness(frontmatter: str, git_last_modified: Optional[date]) -> list[dict]:
"""
Check whether a skill is overdue for review.
Compares ``metadata.last_reviewed`` against ``metadata.review_interval_days``.
Falls back to the git commit date when explicit review dates are absent.
Args:
frontmatter: The frontmatter text (without delimiters).
git_last_modified: Fallback date from git log.
Returns:
List of issue dicts with keys: level, message, detail.
"""
issues: list[dict] = []
today = date.today()
# Extract temporal fields
created_str = _parse_subfield_value(frontmatter, "metadata", "created")
last_reviewed_str = _parse_subfield_value(frontmatter, "metadata", "last_reviewed")
interval_str = _parse_subfield_value(frontmatter, "metadata", "review_interval_days")
# Validate formats if present
if created_str and not _parse_date(created_str):
issues.append({
"level": "warning",
"message": "Invalid 'metadata.created' date format",
"detail": f"Expected YYYY-MM-DD, got: '{created_str}'",
})
if last_reviewed_str and not _parse_date(last_reviewed_str):
issues.append({
"level": "warning",
"message": "Invalid 'metadata.last_reviewed' date format",
"detail": f"Expected YYYY-MM-DD, got: '{last_reviewed_str}'",
})
if interval_str:
try:
int(interval_str)
except ValueError:
issues.append({
"level": "warning",
"message": "Invalid 'metadata.review_interval_days' value",
"detail": f"Expected integer, got: '{interval_str}'",
})
# Determine review interval
interval_days = DEFAULT_REVIEW_INTERVAL_DAYS
if interval_str:
try:
interval_days = int(interval_str)
except ValueError:
pass
# Determine the reference date (last_reviewed > git date > None)
reference_date = None
date_source = "unknown"
last_reviewed = _parse_date(last_reviewed_str) if last_reviewed_str else None
if last_reviewed:
reference_date = last_reviewed
date_source = "last_reviewed"
elif git_last_modified:
reference_date = git_last_modified
date_source = "git_commit"
else:
issues.append({
"level": "info",
"message": "No review date available",
"detail": "No 'metadata.last_reviewed' and no git history found. "
"Consider adding temporal metadata.",
})
# Check staleness
days_since = None
review_status = "unknown"
if reference_date:
days_since = (today - reference_date).days
deadline = reference_date + timedelta(days=interval_days)
warning_date = reference_date + timedelta(days=STALENESS_WARNING_THRESHOLD_DAYS)
if today > deadline:
review_status = "overdue"
issues.append({
"level": "error",
"message": f"Skill is overdue for review ({days_since} days since last review)",
"detail": f"Review interval is {interval_days} days. "
f"Last review: {reference_date} (source: {date_source}). "
f"Deadline was: {deadline}.",
})
elif today > warning_date:
review_status = "due_soon"
days_remaining = (deadline - today).days
issues.append({
"level": "warning",
"message": f"Review due in {days_remaining} days",
"detail": f"Last review: {reference_date} (source: {date_source}). "
f"Deadline: {deadline}.",
})
else:
review_status = "fresh"
# Missing temporal metadata suggestion
has_any_temporal = bool(created_str or last_reviewed_str or interval_str)
if not has_any_temporal:
issues.append({
"level": "info",
"message": "No temporal metadata found",
"detail": "Consider adding metadata.created, metadata.last_reviewed, "
"and metadata.review_interval_days to frontmatter.",
})
return issues, review_status, days_since, date_source
def _parse_yaml_list(frontmatter: str, parent: str, child: str) -> list[dict]:
"""
Parse a YAML list-of-objects under a parent.child path in frontmatter.
Handles the pattern::
metadata:
dependencies:
- url: https://example.com
name: Example
type: api
Args:
frontmatter: The frontmatter text.
parent: Top-level field (e.g. ``metadata``).
child: Second-level field (e.g. ``dependencies``).
Returns:
List of dicts, each representing one list item.
"""
lines = frontmatter.split("\n")
items: list[dict] = []
# Find the parent block
in_parent = False
in_child = False
current_item: Optional[dict] = None
child_indent = -1
for line in lines:
stripped = line.strip()
if not in_parent:
if stripped.startswith(f"{parent}:"):
in_parent = True
continue
# Inside parent -- check if we've left it
if line and line[0] != " " and line[0] != "\t" and stripped:
break
if not in_child:
if stripped.startswith(f"{child}:"):
in_child = True
continue
# Inside child list
if not stripped:
continue
# Detect indent level of list items
raw_indent = len(line) - len(line.lstrip())
if child_indent == -1 and stripped.startswith("- "):
child_indent = raw_indent
# Check if we've left the child block
if raw_indent <= child_indent and not stripped.startswith("- "):
# Check if this is a sibling of child (another metadata key)
if ":" in stripped:
break
if stripped.startswith("- "):
# New list item
if current_item is not None:
items.append(current_item)
current_item = {}
# Parse "- key: value" on the same line
rest = stripped[2:].strip()
if ":" in rest:
key, _, val = rest.partition(":")
current_item[key.strip()] = val.strip()
elif current_item is not None and ":" in stripped:
# Continuation key-value in the same list item
key, _, val = stripped.partition(":")
current_item[key.strip()] = val.strip()
if current_item is not None:
items.append(current_item)
return items
def _check_dependency_health(dependencies: list[dict]) -> list[dict]:
"""
HTTP HEAD each declared dependency URL and report health status.
Args:
dependencies: List of dependency dicts with at least a ``url`` key.
Returns:
List of issue dicts reporting the health of each dependency.
"""
issues: list[dict] = []
for dep in dependencies:
url = dep.get("url", "").strip()
name = dep.get("name", url)
if not url:
issues.append({
"level": "warning",
"message": f"Dependency '{name}' has no URL",
"detail": "Cannot check health without a URL.",
})
continue
if not url.startswith(("http://", "https://")):
issues.append({
"level": "warning",
"message": f"Dependency '{name}' has non-HTTP URL",
"detail": f"Skipping health check for: {url}",
})
continue
try:
req = Request(url, method="HEAD")
req.add_header("User-Agent", "agent-skill-staleness-check/1.0")
with urlopen(req, timeout=HTTP_TIMEOUT_SECONDS) as resp:
status = resp.status
if 200 <= status < 400:
issues.append({
"level": "info",
"message": f"Dependency '{name}' is healthy",
"detail": f"HTTP {status} from {url}",
})
elif 400 <= status < 500:
issues.append({
"level": "warning",
"message": f"Dependency '{name}' returned client error",
"detail": f"HTTP {status} from {url}. "
"The endpoint may have moved or require authentication.",
})
else:
issues.append({
"level": "error",
"message": f"Dependency '{name}' returned server error",
"detail": f"HTTP {status} from {url}",
})
except URLError as exc:
issues.append({
"level": "error",
"message": f"Dependency '{name}' is unreachable",
"detail": f"Failed to connect to {url}: {exc.reason}",
})
except Exception as exc:
issues.append({
"level": "error",
"message": f"Dependency '{name}' check failed",
"detail": f"Error checking {url}: {exc}",
})
return issues
def _parse_schema_expectations(frontmatter: str) -> list[dict]:
"""
Extract ``metadata.schema_expectations`` list from frontmatter.
Each expectation has: url, method (default GET), expected_keys (list).
Args:
frontmatter: The frontmatter text.
Returns:
List of schema expectation dicts.
"""
raw_items = _parse_yaml_list(frontmatter, "metadata", "schema_expectations")
expectations: list[dict] = []
for item in raw_items:
url = item.get("url", "").strip()
method = item.get("method", "GET").strip().upper()
# expected_keys are parsed as a sub-list, but our simple parser
# puts them inline. We need to re-parse from frontmatter directly.
expectations.append({
"url": url,
"method": method,
"expected_keys": [], # Will be filled by deeper parse
})
# Deeper parse for expected_keys (list items under each schema_expectations entry)
expectations = _parse_schema_expectations_deep(frontmatter)
return expectations
def _parse_schema_expectations_deep(frontmatter: str) -> list[dict]:
"""
Deep-parse schema_expectations including expected_keys sub-lists.
Args:
frontmatter: The frontmatter text.
Returns:
List of expectation dicts with url, method, expected_keys.
"""
lines = frontmatter.split("\n")
expectations: list[dict] = []
current: Optional[dict] = None
in_metadata = False
in_schema = False
in_expected_keys = False
for line in lines:
stripped = line.strip()
if not in_metadata:
if stripped.startswith("metadata:"):
in_metadata = True
continue
# Left metadata block?
if line and line[0] != " " and line[0] != "\t" and stripped:
break
if not in_schema:
if stripped.startswith("schema_expectations:"):
in_schema = True
continue
# Detect new list item
if stripped.startswith("- url:") or stripped.startswith("- method:"):
if current is not None:
expectations.append(current)
in_expected_keys = False
current = {"url": "", "method": "GET", "expected_keys": []}
if stripped.startswith("- url:"):
current["url"] = stripped.split(":", 1)[1].strip()
elif stripped.startswith("- method:"):
current["method"] = stripped.split(":", 1)[1].strip().upper()
elif current is not None:
if stripped.startswith("url:"):
current["url"] = stripped.split(":", 1)[1].strip()
elif stripped.startswith("method:"):
current["method"] = stripped.split(":", 1)[1].strip().upper()
elif stripped.startswith("expected_keys:"):
in_expected_keys = True
elif in_expected_keys and stripped.startswith("- "):
current["expected_keys"].append(stripped[2:].strip())
elif not stripped.startswith("-") and ":" in stripped:
# Another key at the same level -- might be leaving schema block
key = stripped.split(":")[0].strip()
if key not in ("url", "method", "expected_keys"):
in_expected_keys = False
if current is not None:
expectations.append(current)
return expectations
def _check_schema_drift(expectations: list[dict]) -> list[dict]:
"""
GET each declared endpoint and compare top-level JSON keys against expected.
Args:
expectations: List of expectation dicts with url, method, expected_keys.
Returns:
List of issue dicts reporting drift status.
"""
issues: list[dict] = []
for exp in expectations:
url = exp.get("url", "").strip()
method = exp.get("method", "GET").upper()
expected_keys = exp.get("expected_keys", [])
if not url:
continue
if not url.startswith(("http://", "https://")):
issues.append({
"level": "warning",
"message": f"Schema check skipped for non-HTTP URL: {url}",
"detail": "Only HTTP/HTTPS URLs are supported.",
})
continue
if not expected_keys:
issues.append({
"level": "info",
"message": f"No expected_keys declared for {url}",
"detail": "Skipping drift check.",
})
continue
try:
req = Request(url, method=method)
req.add_header("User-Agent", "agent-skill-staleness-check/1.0")
req.add_header("Accept", "application/json")
with urlopen(req, timeout=HTTP_TIMEOUT_SECONDS) as resp:
body = resp.read().decode("utf-8", errors="replace")
data = json.loads(body)
if not isinstance(data, dict):
issues.append({
"level": "warning",
"message": f"Response from {url} is not a JSON object",
"detail": f"Got {type(data).__name__}, expected dict. "
"Cannot compare keys.",
})
continue
actual_keys = set(data.keys())
expected_set = set(expected_keys)
missing = expected_set - actual_keys
new_keys = actual_keys - expected_set
if missing:
issues.append({
"level": "error",
"message": f"Schema drift: missing keys from {url}",
"detail": f"Expected keys not found: {sorted(missing)}. "
"The API response structure may have changed.",
})
if new_keys:
issues.append({
"level": "info",
"message": f"Schema drift: new keys in {url}",
"detail": f"Unexpected keys found: {sorted(new_keys)}. "
"The API may have added new fields.",
})
if not missing and not new_keys:
issues.append({
"level": "info",
"message": f"Schema matches for {url}",
"detail": f"All {len(expected_keys)} expected keys present, "
"no unexpected keys.",
})
except json.JSONDecodeError:
issues.append({
"level": "error",
"message": f"Response from {url} is not valid JSON",
"detail": "Cannot perform schema drift check.",
})
except URLError as exc:
issues.append({
"level": "error",
"message": f"Cannot reach {url} for schema check",
"detail": f"Error: {exc.reason}",
})
except Exception as exc:
issues.append({
"level": "error",
"message": f"Schema check failed for {url}",
"detail": f"Error: {exc}",
})
return issues
def staleness_check(
skill_path: str,
check_deps: bool = False,
check_drift: bool = False,
) -> dict:
"""
Main entry point for staleness detection.
Args:
skill_path: Path to the skill directory.
check_deps: If True, HTTP-check declared dependencies.
check_drift: If True, check for schema drift in declared endpoints.
Returns:
Dict with keys:
- fresh (bool): True if no errors found.
- review_status (str): "fresh", "due_soon", "overdue", or "unknown".
- days_since_review (int or None): Days since last review.
- date_source (str): Where the reference date came from.
- issues (list[dict]): All issues found.
"""
all_issues: list[dict] = []
skill_dir = Path(skill_path).resolve()
# --- Check: directory exists ---
if not skill_dir.exists():
return {
"fresh": False,
"review_status": "unknown",
"days_since_review": None,
"date_source": "none",
"issues": [{"level": "error", "message": f"Path does not exist: {skill_dir}", "detail": ""}],
}
if not skill_dir.is_dir():
return {
"fresh": False,
"review_status": "unknown",
"days_since_review": None,
"date_source": "none",
"issues": [{"level": "error", "message": f"Path is not a directory: {skill_dir}", "detail": ""}],
}
# --- Read SKILL.md ---
skill_md = skill_dir / "SKILL.md"
if not skill_md.exists():
return {
"fresh": False,
"review_status": "unknown",
"days_since_review": None,
"date_source": "none",
"issues": [{"level": "error", "message": "SKILL.md not found", "detail": ""}],
}
try:
content = skill_md.read_text(encoding="utf-8")
except Exception as exc:
return {
"fresh": False,
"review_status": "unknown",
"days_since_review": None,
"date_source": "none",
"issues": [{"level": "error", "message": f"Could not read SKILL.md: {exc}", "detail": ""}],
}
frontmatter, _ = _parse_frontmatter(content)
if frontmatter is None:
return {
"fresh": False,
"review_status": "unknown",
"days_since_review": None,
"date_source": "none",
"issues": [{"level": "error", "message": "No valid frontmatter found", "detail": ""}],
}
# --- Phase 1: Review staleness ---
git_date = _get_git_last_modified(skill_path)
review_issues, review_status, days_since, date_source = _check_review_staleness(
frontmatter, git_date
)
all_issues.extend(review_issues)
# --- Phase 2: Dependency health ---
if check_deps:
deps = _parse_yaml_list(frontmatter, "metadata", "dependencies")
if deps:
dep_issues = _check_dependency_health(deps)
all_issues.extend(dep_issues)
else:
all_issues.append({
"level": "info",
"message": "No dependencies declared",
"detail": "Add metadata.dependencies to enable health checks.",
})
# --- Phase 3: Schema drift ---
if check_drift:
expectations = _parse_schema_expectations(frontmatter)
if expectations:
drift_issues = _check_schema_drift(expectations)
all_issues.extend(drift_issues)
else:
all_issues.append({
"level": "info",
"message": "No schema expectations declared",
"detail": "Add metadata.schema_expectations to enable drift detection.",
})
# Determine overall freshness
has_errors = any(i["level"] == "error" for i in all_issues)
fresh = not has_errors
return {
"fresh": fresh,
"review_status": review_status,
"days_since_review": days_since,
"date_source": date_source,
"issues": all_issues,
}
def _print_human_readable(result: dict, skill_path: str) -> None:
"""Print staleness check results in a human-readable format."""
print(f"Staleness check: {skill_path}")
print(f"{'=' * 60}")
status_label = result["review_status"].upper().replace("_", " ")
print(f"Review status: {status_label}")
if result["days_since_review"] is not None:
print(f"Days since review: {result['days_since_review']} (source: {result['date_source']})")
if result["fresh"]:
print("Overall: FRESH")
else:
print("Overall: STALE / DEGRADED")
if result["issues"]:
errors = [i for i in result["issues"] if i["level"] == "error"]
warnings = [i for i in result["issues"] if i["level"] == "warning"]
infos = [i for i in result["issues"] if i["level"] == "info"]
if errors:
print(f"\nErrors ({len(errors)}):")
for issue in errors:
print(f" [ERROR] {issue['message']}")
if issue["detail"]:
print(f" {issue['detail']}")
if warnings:
print(f"\nWarnings ({len(warnings)}):")
for issue in warnings:
print(f" [WARN] {issue['message']}")
if issue["detail"]:
print(f" {issue['detail']}")
if infos:
print(f"\nInfo ({len(infos)}):")
for issue in infos:
print(f" [INFO] {issue['message']}")
if issue["detail"]:
print(f" {issue['detail']}")
print(f"{'=' * 60}")
def main() -> None:
"""CLI entry point for the staleness checker."""
if len(sys.argv) < 2:
print(
"Usage: python3 scripts/staleness_check.py <skill-path> [--json] [--check-deps] [--check-drift]\n"
"\n"
"Arguments:\n"
" skill-path Path to the skill directory to check\n"
"\n"
"Options:\n"
" --json Output results as JSON to stdout\n"
" --check-deps HTTP-check declared dependency URLs\n"
" --check-drift Detect schema drift in declared API endpoints\n"
"\n"
"Exit codes:\n"
" 0 Fresh (no staleness issues)\n"
" 1 Stale (overdue for review)\n"
" 2 Degraded (dependency failures or schema drift)\n",
file=sys.stderr,
)
sys.exit(1)
skill_path = sys.argv[1]
use_json = "--json" in sys.argv
check_deps = "--check-deps" in sys.argv
check_drift = "--check-drift" in sys.argv
result = staleness_check(skill_path, check_deps=check_deps, check_drift=check_drift)
if use_json:
print(json.dumps(result, indent=2))
else:
_print_human_readable(result, skill_path)
# Exit codes: 0=fresh, 1=stale, 2=degraded
if result["review_status"] == "overdue":
sys.exit(1)
has_dep_or_drift_errors = any(
i["level"] == "error" and "review" not in i["message"].lower()
for i in result["issues"]
)
if has_dep_or_drift_errors:
sys.exit(2)
sys.exit(0)
if __name__ == "__main__":
main()