feat: Add skill staleness detection (review tracking, dependency health, schema drift)

Skills go stale as APIs change and data sources move. This adds a three-layer
staleness detection system: review date tracking with git fallback, HTTP health
checks for declared dependencies, and top-level key comparison for schema drift.
All new frontmatter fields are optional — existing skills work unchanged.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
francylisboacharuto 2026-02-27 09:09:48 -03:00
parent 6fc2f35d7c
commit cb66ad17cc
7 changed files with 1064 additions and 2 deletions

View file

@ -275,18 +275,78 @@ Every skill goes through automated checks before delivery and on every publish:
|------|---------------|
| **Spec Validation** | SKILL.md structure, frontmatter format, naming rules, file references |
| **Security Scan** | No hardcoded API keys, no credentials, no injection patterns |
| **Staleness Check** | Review dates, dependency health, API schema drift |
Run them independently anytime:
```bash
python3 scripts/validate.py ./my-skill/
python3 scripts/security_scan.py ./my-skill/
python3 scripts/staleness_check.py ./my-skill/
python3 scripts/staleness_check.py ./my-skill/ --check-deps --check-drift
```
Skills that fail validation cannot be published. Skills with high-severity security issues are blocked.
---
## Staleness Detection
Skills go stale. APIs change, compliance rules update, data sources move. A skill that worked six months ago may silently produce wrong results today. Staleness detection surfaces this before users hit it.
Three layers, each opt-in:
**Review tracking** — Every skill can declare when it was last reviewed and how often it should be. The staleness checker compares these dates and flags overdue skills. Skills without explicit dates fall back to the last git commit date on SKILL.md.
```bash
python3 scripts/staleness_check.py ./my-skill/
# Exit code 0 = fresh, 1 = overdue for review
```
**Dependency health** — Skills can declare external URLs they depend on (APIs, data sources). The `--check-deps` flag HTTP-checks each one and reports failures.
```bash
python3 scripts/staleness_check.py ./my-skill/ --check-deps
# Exit code 2 = one or more dependencies unreachable
```
**Schema drift** — Skills can declare the expected top-level keys in API responses. The `--check-drift` flag fetches each endpoint and compares actual keys against expected. Missing keys = the API changed under you.
```bash
python3 scripts/staleness_check.py ./my-skill/ --check-drift
```
All three layers are controlled by optional frontmatter fields. Existing skills work unchanged — the tool just suggests adding the metadata:
```yaml
metadata:
created: 2026-02-27
last_reviewed: 2026-02-27
review_interval_days: 90
dependencies:
- url: https://api.example.com/v1
name: Example API
type: api
schema_expectations:
- url: https://api.example.com/v1/data
method: GET
expected_keys:
- id
- price
- volume
```
For teams using the skill registry, `stale` scans every published skill at once:
```bash
python3 scripts/skill_registry.py stale
# NAME VERSION STATUS DAYS SINCE SOURCE INTERVAL
# sales-report 1.2.0 OVERDUE 127 last_reviewed 90
# deploy-check 2.0.1 FRESH 12 published 90
```
---
## Tools Reference
### Registry Commands
@ -299,15 +359,21 @@ python3 scripts/skill_registry.py search "query" # Search sk
python3 scripts/skill_registry.py info skill-name # Skill details
python3 scripts/skill_registry.py install skill-name # Install a skill
python3 scripts/skill_registry.py remove skill-name --force # Remove a skill
python3 scripts/skill_registry.py stale # Report stale skills
python3 scripts/skill_registry.py stale --json # Machine-readable output
```
### Validation and Security
### Validation, Security, and Staleness
```bash
python3 scripts/validate.py ./skill/ # Spec compliance
python3 scripts/validate.py ./skill/ --json # Machine-readable output
python3 scripts/security_scan.py ./skill/ # Security audit
python3 scripts/security_scan.py ./skill/ --json # Machine-readable output
python3 scripts/staleness_check.py ./skill/ # Review staleness
python3 scripts/staleness_check.py ./skill/ --check-deps # + dependency health
python3 scripts/staleness_check.py ./skill/ --check-drift # + schema drift
python3 scripts/staleness_check.py ./skill/ --json # Machine-readable output
```
### Export
@ -342,6 +408,7 @@ agent-skill-creator/
scripts/
validate.py # Spec compliance checker
security_scan.py # Security scanner
staleness_check.py # Staleness detection (review, deps, drift)
export_utils.py # Cross-platform export
skill_registry.py # Team skill registry
install-template.sh # Template for generated installers

View file

@ -410,6 +410,20 @@ license: MIT # or appropriate license
metadata:
author: Author Name
version: 1.0.0
created: YYYY-MM-DD # When the skill was created
last_reviewed: YYYY-MM-DD # Last time content was verified current
review_interval_days: 90 # Days between required reviews
dependencies: # External URLs the skill depends on (optional)
- url: https://api.example.com/v1
name: Example API
type: api
schema_expectations: # Expected API response shapes (optional)
- url: https://api.example.com/v1/data
method: GET
expected_keys:
- id
- name
- value
---
# /skill-name — Short Description

View file

@ -852,6 +852,13 @@ license: MIT
metadata:
author: Author Name
version: 1.0.0
created: 2026-02-27
last_reviewed: 2026-02-27
review_interval_days: 90
dependencies:
- url: https://api.example.com/v1
name: Example API
type: api
---
```
@ -1281,6 +1288,7 @@ See README.md for complete multi-platform installation instructions.
- [ ] SKILL.md created FIRST with spec-compliant frontmatter
- [ ] SKILL.md is <500 lines
- [ ] Frontmatter has: name, description (<=1024 chars), license, metadata (author, version)
- [ ] Temporal metadata included (metadata.created, metadata.last_reviewed, metadata.review_interval_days)
- [ ] Name is kebab-case, no `-cskill`, matches directory
- [ ] All Python scripts implemented with functional code
- [ ] No TODO, no `pass`, no `NotImplementedError`, no placeholders
@ -1292,6 +1300,7 @@ See README.md for complete multi-platform installation instructions.
- [ ] `requirements.txt` created (if third-party dependencies used)
- [ ] Spec validation passed (`scripts/validate.py`)
- [ ] Security scan passed (`scripts/security_scan.py`)
- [ ] Staleness check passed (`scripts/staleness_check.py`)
- [ ] Results reported to user
---

View file

@ -17,6 +17,14 @@
- Concrete examples, not abstract
- Not just external links
**Current, Not Stale**
- Include `metadata.created` and `metadata.last_reviewed` dates in frontmatter
- Set `metadata.review_interval_days` (default: 90 days)
- Declare external dependencies in `metadata.dependencies` so health can be checked
- Declare expected API response shapes in `metadata.schema_expectations` for drift detection
- Run `python3 scripts/staleness_check.py path/to/skill/` periodically to detect stale skills
- When publishing to a registry, use `python3 scripts/skill_registry.py stale` to audit all skills
---
## Standards by File Type

View file

@ -38,6 +38,7 @@ if str(_SCRIPTS_DIR) not in sys.path:
from validate import validate_skill, _parse_frontmatter, _parse_yaml_field, _parse_subfield_value
from security_scan import security_scan
from staleness_check import DEFAULT_REVIEW_INTERVAL_DAYS
# --- Constants ---
@ -148,12 +149,20 @@ def extract_skill_metadata(skill_path: Path) -> dict:
# Author: try metadata.author first
author = _parse_subfield_value(frontmatter, "metadata", "author") or ""
# Temporal metadata for staleness tracking
created = _parse_subfield_value(frontmatter, "metadata", "created") or ""
last_reviewed = _parse_subfield_value(frontmatter, "metadata", "last_reviewed") or ""
interval = _parse_subfield_value(frontmatter, "metadata", "review_interval_days") or ""
return {
"name": name.strip(),
"description": description.strip(),
"version": version.strip(),
"author": author.strip(),
"license": license_val.strip(),
"created": created.strip(),
"last_reviewed": last_reviewed.strip(),
"review_interval_days": interval.strip(),
}
@ -369,7 +378,18 @@ def cmd_publish(args: argparse.Namespace) -> None:
shutil.rmtree(dest)
shutil.copytree(skill_path, dest, ignore=COPY_IGNORE_PATTERNS)
# Step 7: Add entry
# Step 7: Add entry (including staleness metadata)
staleness_data = {}
if metadata.get("created"):
staleness_data["created"] = metadata["created"]
if metadata.get("last_reviewed"):
staleness_data["last_reviewed"] = metadata["last_reviewed"]
if metadata.get("review_interval_days"):
try:
staleness_data["review_interval_days"] = int(metadata["review_interval_days"])
except ValueError:
pass
entry = {
"name": name,
"description": metadata["description"],
@ -389,6 +409,7 @@ def cmd_publish(args: argparse.Namespace) -> None:
"clean": scan["clean"],
"issues": len(scan["issues"]),
},
"staleness": staleness_data,
}
data["skills"].append(entry)
save_registry(registry_path, data)
@ -589,6 +610,116 @@ def cmd_remove(args: argparse.Namespace) -> None:
print(f"Removed '{args.skill_name}' from registry.")
def cmd_stale(args: argparse.Namespace) -> None:
"""Report skills that are overdue for review."""
from datetime import date, timedelta
registry_path = Path(args.registry).resolve()
data = load_registry(registry_path)
today = date.today()
results: list[dict] = []
for skill in data["skills"]:
staleness = skill.get("staleness", {})
published = skill.get("published", "")
# Determine reference date: last_reviewed > created > published
ref_date = None
date_source = "none"
lr = staleness.get("last_reviewed", "")
cr = staleness.get("created", "")
if lr:
try:
parts = lr.split("-")
ref_date = date(int(parts[0]), int(parts[1]), int(parts[2]))
date_source = "last_reviewed"
except (ValueError, IndexError):
pass
if ref_date is None and cr:
try:
parts = cr.split("-")
ref_date = date(int(parts[0]), int(parts[1]), int(parts[2]))
date_source = "created"
except (ValueError, IndexError):
pass
if ref_date is None and published:
try:
parts = published[:10].split("-")
ref_date = date(int(parts[0]), int(parts[1]), int(parts[2]))
date_source = "published"
except (ValueError, IndexError):
pass
interval = staleness.get("review_interval_days", DEFAULT_REVIEW_INTERVAL_DAYS)
if not isinstance(interval, int):
try:
interval = int(interval)
except (ValueError, TypeError):
interval = DEFAULT_REVIEW_INTERVAL_DAYS
days_since = None
status = "unknown"
if ref_date:
days_since = (today - ref_date).days
deadline = ref_date + timedelta(days=interval)
if today > deadline:
status = "overdue"
elif (deadline - today).days <= 30:
status = "due_soon"
else:
status = "fresh"
results.append({
"name": skill.get("name", ""),
"version": skill.get("version", ""),
"status": status,
"days_since_review": days_since,
"date_source": date_source,
"review_interval_days": interval,
})
if getattr(args, "json", False):
print(json.dumps(results, indent=2))
return
# Text table output
if not results:
print("No skills in registry.")
return
headers = ["NAME", "VERSION", "STATUS", "DAYS SINCE", "SOURCE", "INTERVAL"]
rows = []
for r in results:
rows.append([
r["name"],
r["version"],
r["status"].upper(),
str(r["days_since_review"]) if r["days_since_review"] is not None else "N/A",
r["date_source"],
str(r["review_interval_days"]),
])
widths = [len(h) for h in headers]
for row in rows:
for i, cell in enumerate(row):
widths[i] = max(widths[i], len(cell))
header_line = " ".join(h.ljust(widths[i]) for i, h in enumerate(headers))
print(header_line)
for row in rows:
print(" ".join(cell.ljust(widths[i]) for i, cell in enumerate(row)))
# Summary
overdue = sum(1 for r in results if r["status"] == "overdue")
due_soon = sum(1 for r in results if r["status"] == "due_soon")
if overdue or due_soon:
print(f"\nSummary: {overdue} overdue, {due_soon} due soon, {len(results)} total")
# --- CLI ---
def _add_registry_arg(parser: argparse.ArgumentParser) -> None:
@ -652,6 +783,11 @@ def build_parser() -> argparse.ArgumentParser:
_add_registry_arg(p_remove)
p_remove.add_argument("--force", action="store_true", help="Confirm removal")
# stale
p_stale = subparsers.add_parser("stale", help="Report skills overdue for review")
_add_registry_arg(p_stale)
p_stale.add_argument("--json", action="store_true", help="Output as JSON")
return parser
@ -672,6 +808,7 @@ def main() -> None:
"install": cmd_install,
"info": cmd_info,
"remove": cmd_remove,
"stale": cmd_stale,
}
cmd_func = commands.get(args.command)

796
scripts/staleness_check.py Normal file
View file

@ -0,0 +1,796 @@
#!/usr/bin/env python3
"""
Staleness Detection for Agent Skills.
Checks whether a skill is overdue for review, validates dependency health,
and detects schema drift in declared API endpoints. Designed to surface
skills that may have gone stale as APIs change, compliance rules update,
and data sources move.
Usage:
python3 scripts/staleness_check.py <skill-path> [--json] [--check-deps] [--check-drift]
Exit codes:
0 - Fresh (no staleness issues)
1 - Stale (overdue for review)
2 - Degraded (dependency failures or schema drift)
"""
import json
import subprocess
import sys
from datetime import date, timedelta
from pathlib import Path
from typing import Optional
from urllib.error import URLError
from urllib.request import Request, urlopen
# --- Import sibling scripts ---
_SCRIPTS_DIR = Path(__file__).resolve().parent
if str(_SCRIPTS_DIR) not in sys.path:
sys.path.insert(0, str(_SCRIPTS_DIR))
from validate import _parse_frontmatter, _parse_subfield_value # noqa: E402
# --- Constants ---
DEFAULT_REVIEW_INTERVAL_DAYS = 90
STALENESS_WARNING_THRESHOLD_DAYS = 60
HTTP_TIMEOUT_SECONDS = 10
DATE_PATTERN_RE = None # Lazy-compiled below
def _date_pattern():
"""Return compiled regex for YYYY-MM-DD date format."""
global DATE_PATTERN_RE
if DATE_PATTERN_RE is None:
import re
DATE_PATTERN_RE = re.compile(r"^\d{4}-\d{2}-\d{2}$")
return DATE_PATTERN_RE
def _parse_date(value: str) -> Optional[date]:
"""
Parse a YYYY-MM-DD string into a date object.
Args:
value: Date string in YYYY-MM-DD format.
Returns:
A date object, or None if parsing fails.
"""
if not value or not _date_pattern().match(value.strip()):
return None
try:
parts = value.strip().split("-")
return date(int(parts[0]), int(parts[1]), int(parts[2]))
except (ValueError, IndexError):
return None
def _get_git_last_modified(skill_path: str) -> Optional[date]:
"""
Get the last git commit date for a skill directory.
Runs ``git log -1 --format=%aI`` on the SKILL.md file as a fallback
for skills without explicit review dates.
Args:
skill_path: Path to the skill directory.
Returns:
The date of the last git commit touching SKILL.md, or None
if git is unavailable or the file is untracked.
"""
skill_md = Path(skill_path).resolve() / "SKILL.md"
if not skill_md.exists():
return None
try:
result = subprocess.run(
["git", "log", "-1", "--format=%aI", "--", str(skill_md)],
capture_output=True,
text=True,
timeout=10,
cwd=str(Path(skill_path).resolve()),
)
if result.returncode != 0 or not result.stdout.strip():
return None
# ISO format: 2025-01-15T10:30:00+00:00 -- take the date part
iso_str = result.stdout.strip()
return _parse_date(iso_str[:10])
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
return None
def _check_review_staleness(frontmatter: str, git_last_modified: Optional[date]) -> list[dict]:
"""
Check whether a skill is overdue for review.
Compares ``metadata.last_reviewed`` against ``metadata.review_interval_days``.
Falls back to the git commit date when explicit review dates are absent.
Args:
frontmatter: The frontmatter text (without delimiters).
git_last_modified: Fallback date from git log.
Returns:
List of issue dicts with keys: level, message, detail.
"""
issues: list[dict] = []
today = date.today()
# Extract temporal fields
created_str = _parse_subfield_value(frontmatter, "metadata", "created")
last_reviewed_str = _parse_subfield_value(frontmatter, "metadata", "last_reviewed")
interval_str = _parse_subfield_value(frontmatter, "metadata", "review_interval_days")
# Validate formats if present
if created_str and not _parse_date(created_str):
issues.append({
"level": "warning",
"message": "Invalid 'metadata.created' date format",
"detail": f"Expected YYYY-MM-DD, got: '{created_str}'",
})
if last_reviewed_str and not _parse_date(last_reviewed_str):
issues.append({
"level": "warning",
"message": "Invalid 'metadata.last_reviewed' date format",
"detail": f"Expected YYYY-MM-DD, got: '{last_reviewed_str}'",
})
if interval_str:
try:
int(interval_str)
except ValueError:
issues.append({
"level": "warning",
"message": "Invalid 'metadata.review_interval_days' value",
"detail": f"Expected integer, got: '{interval_str}'",
})
# Determine review interval
interval_days = DEFAULT_REVIEW_INTERVAL_DAYS
if interval_str:
try:
interval_days = int(interval_str)
except ValueError:
pass
# Determine the reference date (last_reviewed > git date > None)
reference_date = None
date_source = "unknown"
last_reviewed = _parse_date(last_reviewed_str) if last_reviewed_str else None
if last_reviewed:
reference_date = last_reviewed
date_source = "last_reviewed"
elif git_last_modified:
reference_date = git_last_modified
date_source = "git_commit"
else:
issues.append({
"level": "info",
"message": "No review date available",
"detail": "No 'metadata.last_reviewed' and no git history found. "
"Consider adding temporal metadata.",
})
# Check staleness
days_since = None
review_status = "unknown"
if reference_date:
days_since = (today - reference_date).days
deadline = reference_date + timedelta(days=interval_days)
warning_date = reference_date + timedelta(days=STALENESS_WARNING_THRESHOLD_DAYS)
if today > deadline:
review_status = "overdue"
issues.append({
"level": "error",
"message": f"Skill is overdue for review ({days_since} days since last review)",
"detail": f"Review interval is {interval_days} days. "
f"Last review: {reference_date} (source: {date_source}). "
f"Deadline was: {deadline}.",
})
elif today > warning_date:
review_status = "due_soon"
days_remaining = (deadline - today).days
issues.append({
"level": "warning",
"message": f"Review due in {days_remaining} days",
"detail": f"Last review: {reference_date} (source: {date_source}). "
f"Deadline: {deadline}.",
})
else:
review_status = "fresh"
# Missing temporal metadata suggestion
has_any_temporal = bool(created_str or last_reviewed_str or interval_str)
if not has_any_temporal:
issues.append({
"level": "info",
"message": "No temporal metadata found",
"detail": "Consider adding metadata.created, metadata.last_reviewed, "
"and metadata.review_interval_days to frontmatter.",
})
return issues, review_status, days_since, date_source
def _parse_yaml_list(frontmatter: str, parent: str, child: str) -> list[dict]:
"""
Parse a YAML list-of-objects under a parent.child path in frontmatter.
Handles the pattern::
metadata:
dependencies:
- url: https://example.com
name: Example
type: api
Args:
frontmatter: The frontmatter text.
parent: Top-level field (e.g. ``metadata``).
child: Second-level field (e.g. ``dependencies``).
Returns:
List of dicts, each representing one list item.
"""
lines = frontmatter.split("\n")
items: list[dict] = []
# Find the parent block
in_parent = False
in_child = False
current_item: Optional[dict] = None
child_indent = -1
for line in lines:
stripped = line.strip()
if not in_parent:
if stripped.startswith(f"{parent}:"):
in_parent = True
continue
# Inside parent -- check if we've left it
if line and line[0] != " " and line[0] != "\t" and stripped:
break
if not in_child:
if stripped.startswith(f"{child}:"):
in_child = True
continue
# Inside child list
if not stripped:
continue
# Detect indent level of list items
raw_indent = len(line) - len(line.lstrip())
if child_indent == -1 and stripped.startswith("- "):
child_indent = raw_indent
# Check if we've left the child block
if raw_indent <= child_indent and not stripped.startswith("- "):
# Check if this is a sibling of child (another metadata key)
if ":" in stripped:
break
if stripped.startswith("- "):
# New list item
if current_item is not None:
items.append(current_item)
current_item = {}
# Parse "- key: value" on the same line
rest = stripped[2:].strip()
if ":" in rest:
key, _, val = rest.partition(":")
current_item[key.strip()] = val.strip()
elif current_item is not None and ":" in stripped:
# Continuation key-value in the same list item
key, _, val = stripped.partition(":")
current_item[key.strip()] = val.strip()
if current_item is not None:
items.append(current_item)
return items
def _check_dependency_health(dependencies: list[dict]) -> list[dict]:
"""
HTTP HEAD each declared dependency URL and report health status.
Args:
dependencies: List of dependency dicts with at least a ``url`` key.
Returns:
List of issue dicts reporting the health of each dependency.
"""
issues: list[dict] = []
for dep in dependencies:
url = dep.get("url", "").strip()
name = dep.get("name", url)
if not url:
issues.append({
"level": "warning",
"message": f"Dependency '{name}' has no URL",
"detail": "Cannot check health without a URL.",
})
continue
if not url.startswith(("http://", "https://")):
issues.append({
"level": "warning",
"message": f"Dependency '{name}' has non-HTTP URL",
"detail": f"Skipping health check for: {url}",
})
continue
try:
req = Request(url, method="HEAD")
req.add_header("User-Agent", "agent-skill-staleness-check/1.0")
with urlopen(req, timeout=HTTP_TIMEOUT_SECONDS) as resp:
status = resp.status
if 200 <= status < 400:
issues.append({
"level": "info",
"message": f"Dependency '{name}' is healthy",
"detail": f"HTTP {status} from {url}",
})
elif 400 <= status < 500:
issues.append({
"level": "warning",
"message": f"Dependency '{name}' returned client error",
"detail": f"HTTP {status} from {url}. "
"The endpoint may have moved or require authentication.",
})
else:
issues.append({
"level": "error",
"message": f"Dependency '{name}' returned server error",
"detail": f"HTTP {status} from {url}",
})
except URLError as exc:
issues.append({
"level": "error",
"message": f"Dependency '{name}' is unreachable",
"detail": f"Failed to connect to {url}: {exc.reason}",
})
except Exception as exc:
issues.append({
"level": "error",
"message": f"Dependency '{name}' check failed",
"detail": f"Error checking {url}: {exc}",
})
return issues
def _parse_schema_expectations(frontmatter: str) -> list[dict]:
"""
Extract ``metadata.schema_expectations`` list from frontmatter.
Each expectation has: url, method (default GET), expected_keys (list).
Args:
frontmatter: The frontmatter text.
Returns:
List of schema expectation dicts.
"""
raw_items = _parse_yaml_list(frontmatter, "metadata", "schema_expectations")
expectations: list[dict] = []
for item in raw_items:
url = item.get("url", "").strip()
method = item.get("method", "GET").strip().upper()
# expected_keys are parsed as a sub-list, but our simple parser
# puts them inline. We need to re-parse from frontmatter directly.
expectations.append({
"url": url,
"method": method,
"expected_keys": [], # Will be filled by deeper parse
})
# Deeper parse for expected_keys (list items under each schema_expectations entry)
expectations = _parse_schema_expectations_deep(frontmatter)
return expectations
def _parse_schema_expectations_deep(frontmatter: str) -> list[dict]:
"""
Deep-parse schema_expectations including expected_keys sub-lists.
Args:
frontmatter: The frontmatter text.
Returns:
List of expectation dicts with url, method, expected_keys.
"""
lines = frontmatter.split("\n")
expectations: list[dict] = []
current: Optional[dict] = None
in_metadata = False
in_schema = False
in_expected_keys = False
for line in lines:
stripped = line.strip()
if not in_metadata:
if stripped.startswith("metadata:"):
in_metadata = True
continue
# Left metadata block?
if line and line[0] != " " and line[0] != "\t" and stripped:
break
if not in_schema:
if stripped.startswith("schema_expectations:"):
in_schema = True
continue
# Detect new list item
if stripped.startswith("- url:") or stripped.startswith("- method:"):
if current is not None:
expectations.append(current)
in_expected_keys = False
current = {"url": "", "method": "GET", "expected_keys": []}
if stripped.startswith("- url:"):
current["url"] = stripped.split(":", 1)[1].strip()
elif stripped.startswith("- method:"):
current["method"] = stripped.split(":", 1)[1].strip().upper()
elif current is not None:
if stripped.startswith("url:"):
current["url"] = stripped.split(":", 1)[1].strip()
elif stripped.startswith("method:"):
current["method"] = stripped.split(":", 1)[1].strip().upper()
elif stripped.startswith("expected_keys:"):
in_expected_keys = True
elif in_expected_keys and stripped.startswith("- "):
current["expected_keys"].append(stripped[2:].strip())
elif not stripped.startswith("-") and ":" in stripped:
# Another key at the same level -- might be leaving schema block
key = stripped.split(":")[0].strip()
if key not in ("url", "method", "expected_keys"):
in_expected_keys = False
if current is not None:
expectations.append(current)
return expectations
def _check_schema_drift(expectations: list[dict]) -> list[dict]:
"""
GET each declared endpoint and compare top-level JSON keys against expected.
Args:
expectations: List of expectation dicts with url, method, expected_keys.
Returns:
List of issue dicts reporting drift status.
"""
issues: list[dict] = []
for exp in expectations:
url = exp.get("url", "").strip()
method = exp.get("method", "GET").upper()
expected_keys = exp.get("expected_keys", [])
if not url:
continue
if not url.startswith(("http://", "https://")):
issues.append({
"level": "warning",
"message": f"Schema check skipped for non-HTTP URL: {url}",
"detail": "Only HTTP/HTTPS URLs are supported.",
})
continue
if not expected_keys:
issues.append({
"level": "info",
"message": f"No expected_keys declared for {url}",
"detail": "Skipping drift check.",
})
continue
try:
req = Request(url, method=method)
req.add_header("User-Agent", "agent-skill-staleness-check/1.0")
req.add_header("Accept", "application/json")
with urlopen(req, timeout=HTTP_TIMEOUT_SECONDS) as resp:
body = resp.read().decode("utf-8", errors="replace")
data = json.loads(body)
if not isinstance(data, dict):
issues.append({
"level": "warning",
"message": f"Response from {url} is not a JSON object",
"detail": f"Got {type(data).__name__}, expected dict. "
"Cannot compare keys.",
})
continue
actual_keys = set(data.keys())
expected_set = set(expected_keys)
missing = expected_set - actual_keys
new_keys = actual_keys - expected_set
if missing:
issues.append({
"level": "error",
"message": f"Schema drift: missing keys from {url}",
"detail": f"Expected keys not found: {sorted(missing)}. "
"The API response structure may have changed.",
})
if new_keys:
issues.append({
"level": "info",
"message": f"Schema drift: new keys in {url}",
"detail": f"Unexpected keys found: {sorted(new_keys)}. "
"The API may have added new fields.",
})
if not missing and not new_keys:
issues.append({
"level": "info",
"message": f"Schema matches for {url}",
"detail": f"All {len(expected_keys)} expected keys present, "
"no unexpected keys.",
})
except json.JSONDecodeError:
issues.append({
"level": "error",
"message": f"Response from {url} is not valid JSON",
"detail": "Cannot perform schema drift check.",
})
except URLError as exc:
issues.append({
"level": "error",
"message": f"Cannot reach {url} for schema check",
"detail": f"Error: {exc.reason}",
})
except Exception as exc:
issues.append({
"level": "error",
"message": f"Schema check failed for {url}",
"detail": f"Error: {exc}",
})
return issues
def staleness_check(
skill_path: str,
check_deps: bool = False,
check_drift: bool = False,
) -> dict:
"""
Main entry point for staleness detection.
Args:
skill_path: Path to the skill directory.
check_deps: If True, HTTP-check declared dependencies.
check_drift: If True, check for schema drift in declared endpoints.
Returns:
Dict with keys:
- fresh (bool): True if no errors found.
- review_status (str): "fresh", "due_soon", "overdue", or "unknown".
- days_since_review (int or None): Days since last review.
- date_source (str): Where the reference date came from.
- issues (list[dict]): All issues found.
"""
all_issues: list[dict] = []
skill_dir = Path(skill_path).resolve()
# --- Check: directory exists ---
if not skill_dir.exists():
return {
"fresh": False,
"review_status": "unknown",
"days_since_review": None,
"date_source": "none",
"issues": [{"level": "error", "message": f"Path does not exist: {skill_dir}", "detail": ""}],
}
if not skill_dir.is_dir():
return {
"fresh": False,
"review_status": "unknown",
"days_since_review": None,
"date_source": "none",
"issues": [{"level": "error", "message": f"Path is not a directory: {skill_dir}", "detail": ""}],
}
# --- Read SKILL.md ---
skill_md = skill_dir / "SKILL.md"
if not skill_md.exists():
return {
"fresh": False,
"review_status": "unknown",
"days_since_review": None,
"date_source": "none",
"issues": [{"level": "error", "message": "SKILL.md not found", "detail": ""}],
}
try:
content = skill_md.read_text(encoding="utf-8")
except Exception as exc:
return {
"fresh": False,
"review_status": "unknown",
"days_since_review": None,
"date_source": "none",
"issues": [{"level": "error", "message": f"Could not read SKILL.md: {exc}", "detail": ""}],
}
frontmatter, _ = _parse_frontmatter(content)
if frontmatter is None:
return {
"fresh": False,
"review_status": "unknown",
"days_since_review": None,
"date_source": "none",
"issues": [{"level": "error", "message": "No valid frontmatter found", "detail": ""}],
}
# --- Phase 1: Review staleness ---
git_date = _get_git_last_modified(skill_path)
review_issues, review_status, days_since, date_source = _check_review_staleness(
frontmatter, git_date
)
all_issues.extend(review_issues)
# --- Phase 2: Dependency health ---
if check_deps:
deps = _parse_yaml_list(frontmatter, "metadata", "dependencies")
if deps:
dep_issues = _check_dependency_health(deps)
all_issues.extend(dep_issues)
else:
all_issues.append({
"level": "info",
"message": "No dependencies declared",
"detail": "Add metadata.dependencies to enable health checks.",
})
# --- Phase 3: Schema drift ---
if check_drift:
expectations = _parse_schema_expectations(frontmatter)
if expectations:
drift_issues = _check_schema_drift(expectations)
all_issues.extend(drift_issues)
else:
all_issues.append({
"level": "info",
"message": "No schema expectations declared",
"detail": "Add metadata.schema_expectations to enable drift detection.",
})
# Determine overall freshness
has_errors = any(i["level"] == "error" for i in all_issues)
fresh = not has_errors
return {
"fresh": fresh,
"review_status": review_status,
"days_since_review": days_since,
"date_source": date_source,
"issues": all_issues,
}
def _print_human_readable(result: dict, skill_path: str) -> None:
"""Print staleness check results in a human-readable format."""
print(f"Staleness check: {skill_path}")
print(f"{'=' * 60}")
status_label = result["review_status"].upper().replace("_", " ")
print(f"Review status: {status_label}")
if result["days_since_review"] is not None:
print(f"Days since review: {result['days_since_review']} (source: {result['date_source']})")
if result["fresh"]:
print("Overall: FRESH")
else:
print("Overall: STALE / DEGRADED")
if result["issues"]:
errors = [i for i in result["issues"] if i["level"] == "error"]
warnings = [i for i in result["issues"] if i["level"] == "warning"]
infos = [i for i in result["issues"] if i["level"] == "info"]
if errors:
print(f"\nErrors ({len(errors)}):")
for issue in errors:
print(f" [ERROR] {issue['message']}")
if issue["detail"]:
print(f" {issue['detail']}")
if warnings:
print(f"\nWarnings ({len(warnings)}):")
for issue in warnings:
print(f" [WARN] {issue['message']}")
if issue["detail"]:
print(f" {issue['detail']}")
if infos:
print(f"\nInfo ({len(infos)}):")
for issue in infos:
print(f" [INFO] {issue['message']}")
if issue["detail"]:
print(f" {issue['detail']}")
print(f"{'=' * 60}")
def main() -> None:
"""CLI entry point for the staleness checker."""
if len(sys.argv) < 2:
print(
"Usage: python3 scripts/staleness_check.py <skill-path> [--json] [--check-deps] [--check-drift]\n"
"\n"
"Arguments:\n"
" skill-path Path to the skill directory to check\n"
"\n"
"Options:\n"
" --json Output results as JSON to stdout\n"
" --check-deps HTTP-check declared dependency URLs\n"
" --check-drift Detect schema drift in declared API endpoints\n"
"\n"
"Exit codes:\n"
" 0 Fresh (no staleness issues)\n"
" 1 Stale (overdue for review)\n"
" 2 Degraded (dependency failures or schema drift)\n",
file=sys.stderr,
)
sys.exit(1)
skill_path = sys.argv[1]
use_json = "--json" in sys.argv
check_deps = "--check-deps" in sys.argv
check_drift = "--check-drift" in sys.argv
result = staleness_check(skill_path, check_deps=check_deps, check_drift=check_drift)
if use_json:
print(json.dumps(result, indent=2))
else:
_print_human_readable(result, skill_path)
# Exit codes: 0=fresh, 1=stale, 2=degraded
if result["review_status"] == "overdue":
sys.exit(1)
has_dep_or_drift_errors = any(
i["level"] == "error" and "review" not in i["message"].lower()
for i in result["issues"]
)
if has_dep_or_drift_errors:
sys.exit(2)
sys.exit(0)
if __name__ == "__main__":
main()

View file

@ -31,6 +31,9 @@ MAX_BODY_LINES_WARNING = 500
NAME_PATTERN = re.compile(r"^[a-z0-9]([a-z0-9-]*[a-z0-9])?$")
CONSECUTIVE_HYPHENS_PATTERN = re.compile(r"--")
# Pattern for YYYY-MM-DD date format
DATE_FORMAT_PATTERN = re.compile(r"^\d{4}-\d{2}-\d{2}$")
# Pattern for local file references in markdown: [text](path) excluding http/https/mailto/#
LOCAL_LINK_PATTERN = re.compile(
r"\[([^\]]*)\]\(([^)]+)\)"
@ -345,6 +348,34 @@ def validate_skill(skill_path: str) -> dict:
if not _subfield_exists(frontmatter, "metadata", "version"):
warnings.append("'metadata.version' sub-field is missing")
# Temporal metadata validation (optional, warnings only)
created_val = _parse_subfield_value(frontmatter, "metadata", "created")
reviewed_val = _parse_subfield_value(frontmatter, "metadata", "last_reviewed")
interval_val = _parse_subfield_value(frontmatter, "metadata", "review_interval_days")
if created_val and not DATE_FORMAT_PATTERN.match(created_val.strip()):
warnings.append(
f"'metadata.created' should be YYYY-MM-DD format (found: '{created_val}')"
)
if reviewed_val and not DATE_FORMAT_PATTERN.match(reviewed_val.strip()):
warnings.append(
f"'metadata.last_reviewed' should be YYYY-MM-DD format (found: '{reviewed_val}')"
)
if interval_val:
try:
int(interval_val.strip())
except ValueError:
warnings.append(
f"'metadata.review_interval_days' should be an integer (found: '{interval_val}')"
)
has_temporal = bool(created_val or reviewed_val or interval_val)
if not has_temporal:
warnings.append(
"Consider adding temporal metadata (metadata.created, metadata.last_reviewed, "
"metadata.review_interval_days) for staleness tracking"
)
# Referenced local files
if body is not None:
local_links = _extract_local_links(body)