openclaw-intelligence-core-.../syncpatch/outcome_logging.py

141 lines
5.1 KiB
Python
Raw Permalink Normal View History

2026-03-21 07:34:09 +00:00
#!/usr/bin/env python3
from __future__ import annotations
import json
from datetime import datetime, timezone
from trajectory_schema import build_trajectory_record
from replay_buffer import append_replay_record, update_policy_stats
from reward_signals import derive_cap_breaches, reward_row
def log_intent_family_shadow(
message: str,
family_info: dict,
before_tools: list[str],
after_tools: list[str],
*,
log_path,
collect_intent_families,
service_hints,
) -> None:
family = str((family_info or {}).get('family') or '')
if not family:
return
try:
log_path.parent.mkdir(parents=True, exist_ok=True)
try:
candidates = [item.get('family') for item in collect_intent_families(message, service_hints) if item.get('family')]
except Exception:
candidates = [family]
row = {
'ts': datetime.now(timezone.utc).isoformat(),
'message': message,
'family': family,
'family_candidates': candidates,
'before_tool': before_tools[0] if before_tools else '',
'after_tool': after_tools[0] if after_tools else '',
'overridden': (before_tools[:1] != after_tools[:1]),
}
with log_path.open('a', encoding='utf-8') as f:
f.write(json.dumps(row, ensure_ascii=False) + '\n')
except Exception:
pass
def log_intent_composition(
message: str,
family_candidates: list[dict],
analysis_before: dict,
analysis_after: dict,
composition: dict,
*,
log_path,
) -> None:
if not composition or not composition.get('composed'):
return
try:
log_path.parent.mkdir(parents=True, exist_ok=True)
row = {
'ts': datetime.now(timezone.utc).isoformat(),
'message': message,
'family_candidates': [item.get('family') for item in family_candidates if item.get('family')],
'before_tools': list(analysis_before.get('tools') or []),
'after_tools': list(analysis_after.get('tools') or []),
'reason': composition.get('reason', ''),
'policy': composition.get('policy', analysis_after.get('composition_policy', '')),
'force_sequential': bool(analysis_after.get('force_sequential')),
}
with log_path.open('a', encoding='utf-8') as f:
f.write(json.dumps(row, ensure_ascii=False) + '\n')
except Exception:
pass
def record_task_outcome(
message: str,
analysis: dict,
final_text: str,
evidence_items: list[dict],
*,
status: str = 'success',
error_text: str = '',
log_path,
classify_intent_family,
collect_intent_families,
service_hints,
refresh_composition_policy_async,
) -> None:
try:
log_path.parent.mkdir(parents=True, exist_ok=True)
grounded_items = [item for item in evidence_items if item.get('grounded')]
try:
candidates = [item.get('family') for item in collect_intent_families(message, service_hints) if item.get('family')]
except Exception:
candidates = []
family = (classify_intent_family(message, service_hints) or {}).get('family', '')
cap_breaches = derive_cap_breaches(error_text, analysis, evidence_items)
row = {
'ts': datetime.now(timezone.utc).isoformat(),
'status': status,
'message': message,
'role': analysis.get('role'),
'task_type': analysis.get('task_type'),
'planned_tools': list(analysis.get('tools') or []),
'used_tools': [item.get('tool') for item in evidence_items],
'family': family,
'family_candidates': candidates,
'grounded_count': len(grounded_items),
'evidence_count': len(evidence_items),
'answer_len': len((final_text or '').strip()),
'needs_memory': bool(analysis.get('needs_memory')),
'needs_setup_context': bool(analysis.get('needs_setup_context')),
'error_text': (error_text or '')[:300],
'composition_reason': str(analysis.get('composition_reason') or ''),
'composition_policy': str(analysis.get('composition_policy') or ''),
'cap_breaches': cap_breaches,
}
reward_info = reward_row(status, analysis, evidence_items, final_text, cap_breaches=cap_breaches)
row.update(reward_info)
with log_path.open('a', encoding='utf-8') as f:
f.write(json.dumps(row, ensure_ascii=False) + '\n')
record = build_trajectory_record(
message=message,
analysis=analysis,
final_text=final_text,
evidence_items=evidence_items,
status=status,
family=family,
family_candidates=candidates,
error_text=error_text,
ts=row['ts'],
cap_breaches=cap_breaches,
).to_dict()
record.update(reward_info)
append_replay_record(record)
update_policy_stats(record)
if row.get('composition_reason') or row.get('composition_policy'):
refresh_composition_policy_async()
except Exception:
pass