64 lines
2.3 KiB
Python
64 lines
2.3 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import json
|
||
|
|
from datetime import datetime, timezone
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import Any
|
||
|
|
|
||
|
|
|
||
|
|
DEFAULT_REPLAY_ROOT = Path('/home/openclaw/.openclaw/workspace/data/replay_buffer')
|
||
|
|
DEFAULT_POLICY_STATS = Path('/home/openclaw/.openclaw/workspace/data/policy_stats.json')
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
def _safe_slug(value: str) -> str:
|
||
|
|
out = ''.join(ch if ch.isalnum() or ch in {'_', '-'} else '_' for ch in (value or 'unknown').strip().lower())
|
||
|
|
return out[:80] or 'unknown'
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
def replay_path(record: dict[str, Any], replay_root: Path = DEFAULT_REPLAY_ROOT) -> Path:
|
||
|
|
ts = str(record.get('ts') or datetime.now(timezone.utc).isoformat())
|
||
|
|
day = ts[:10]
|
||
|
|
task = _safe_slug(str(record.get('normalized_task') or 'unknown'))
|
||
|
|
return replay_root / day / f'{task}.jsonl'
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
def append_replay_record(record: dict[str, Any], replay_root: Path = DEFAULT_REPLAY_ROOT) -> Path:
|
||
|
|
path = replay_path(record, replay_root)
|
||
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
|
with path.open('a', encoding='utf-8') as f:
|
||
|
|
f.write(json.dumps(record, ensure_ascii=False) + '\n')
|
||
|
|
return path
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
def update_policy_stats(record: dict[str, Any], stats_path: Path = DEFAULT_POLICY_STATS) -> dict[str, Any]:
|
||
|
|
stats_path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
|
try:
|
||
|
|
data = json.loads(stats_path.read_text(encoding='utf-8'))
|
||
|
|
except Exception:
|
||
|
|
data = {'plans': {}, 'families': {}, 'updated_at': ''}
|
||
|
|
|
||
|
|
plan_key = str(record.get('chosen_plan') or 'single_tool')
|
||
|
|
family_key = str(record.get('family') or 'unknown')
|
||
|
|
reward = float(record.get('reward') or 0.0)
|
||
|
|
status = str(record.get('outcome_status') or '')
|
||
|
|
|
||
|
|
for bucket_name, key in [('plans', plan_key), ('families', family_key)]:
|
||
|
|
bucket = data.setdefault(bucket_name, {})
|
||
|
|
row = bucket.setdefault(key, {'count': 0, 'success': 0, 'failure': 0, 'clarification': 0, 'reward_sum': 0.0})
|
||
|
|
row['count'] += 1
|
||
|
|
row['reward_sum'] += reward
|
||
|
|
if status == 'success':
|
||
|
|
row['success'] += 1
|
||
|
|
elif status == 'needs_clarification':
|
||
|
|
row['clarification'] += 1
|
||
|
|
else:
|
||
|
|
row['failure'] += 1
|
||
|
|
|
||
|
|
data['updated_at'] = datetime.now(timezone.utc).isoformat()
|
||
|
|
stats_path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding='utf-8')
|
||
|
|
return data
|