from __future__ import annotations from dataclasses import dataclass, asdict, field from datetime import datetime, timezone from typing import Any import hashlib import json SCHEMA_VERSION = 1 @dataclass class TrajectoryRecord: schema_version: int trajectory_id: str ts: str user_query: str normalized_task: str role: str task_type: str family: str family_candidates: list[str] = field(default_factory=list) chosen_plan: str = '' chosen_tools: list[str] = field(default_factory=list) used_tools: list[str] = field(default_factory=list) memory_mode: str = '' uncertainty: str = '' grounded_count: int = 0 evidence_count: int = 0 answer_len: int = 0 latency_ms: int | None = None verification_status: str = '' outcome_status: str = '' cap_breaches: list[str] = field(default_factory=list) user_feedback: str = '' error_text: str = '' composition_reason: str = '' composition_policy: str = '' needs_memory: bool = False needs_setup_context: bool = False metadata: dict[str, Any] = field(default_factory=dict) def to_dict(self) -> dict[str, Any]: return asdict(self) def to_json(self) -> str: return json.dumps(self.to_dict(), ensure_ascii=False) def utc_now_iso() -> str: return datetime.now(timezone.utc).isoformat() def make_trajectory_id(message: str, ts: str, planned_tools: list[str], used_tools: list[str]) -> str: digest = hashlib.sha256() digest.update((message or '').encode('utf-8', errors='ignore')) digest.update((ts or '').encode('utf-8', errors='ignore')) digest.update('|'.join(planned_tools or []).encode('utf-8', errors='ignore')) digest.update('|'.join(used_tools or []).encode('utf-8', errors='ignore')) return digest.hexdigest()[:24] def infer_memory_mode(task_type: str, analysis: dict[str, Any], used_tools: list[str]) -> str: if 'memory_profile' in used_tools: if task_type == 'memory': return 'profile' if analysis.get('needs_setup_context'): return 'setup' if analysis.get('needs_memory'): return 'preference' return 'profile' return '' def infer_uncertainty(status: str, analysis: dict[str, Any], evidence_items: list[dict[str, Any]]) -> str: confidence = float(analysis.get('confidence', 0.0) or 0.0) grounded = sum(1 for item in evidence_items if item.get('grounded')) if status in {'tool_failed', 'no_result', 'tool_output_unverified'}: return 'high' if status == 'needs_clarification': return 'medium' if confidence >= 0.93 and grounded >= 1: return 'low' if confidence >= 0.8: return 'medium' return 'high' def build_trajectory_record( *, message: str, analysis: dict[str, Any], final_text: str, evidence_items: list[dict[str, Any]], status: str, family: str, family_candidates: list[str], error_text: str = '', ts: str | None = None, latency_ms: int | None = None, verification_status: str = '', cap_breaches: list[str] | None = None, user_feedback: str = '', ) -> TrajectoryRecord: ts = ts or utc_now_iso() planned_tools = list(analysis.get('tools') or []) used_tools = [str(item.get('tool') or '') for item in evidence_items if item.get('tool')] task_type = str(analysis.get('task_type') or '') return TrajectoryRecord( schema_version=SCHEMA_VERSION, trajectory_id=make_trajectory_id(message, ts, planned_tools, used_tools), ts=ts, user_query=message, normalized_task=f"{analysis.get('role','')}:{task_type}", role=str(analysis.get('role') or ''), task_type=task_type, family=family, family_candidates=list(family_candidates or []), chosen_plan=str(analysis.get('composition_reason') or 'single_tool'), chosen_tools=planned_tools, used_tools=used_tools, memory_mode=infer_memory_mode(task_type, analysis, used_tools), uncertainty=infer_uncertainty(status, analysis, evidence_items), grounded_count=sum(1 for item in evidence_items if item.get('grounded')), evidence_count=len(evidence_items), answer_len=len((final_text or '').strip()), latency_ms=latency_ms, verification_status=verification_status or ('grounded' if any(item.get('grounded') for item in evidence_items) else 'unverified'), outcome_status=status, cap_breaches=list(cap_breaches or []), user_feedback=user_feedback, error_text=(error_text or '')[:300], composition_reason=str(analysis.get('composition_reason') or ''), composition_policy=str(analysis.get('composition_policy') or ''), needs_memory=bool(analysis.get('needs_memory')), needs_setup_context=bool(analysis.get('needs_setup_context')), metadata={ 'force_sequential': bool(analysis.get('force_sequential')), }, )