ryanyen22's picture
feat: add reason_first_program/program_space.py
bee361b verified
"""
Program Space: the set of valid implementations for a stub.
A ProgramSpace is a collection of Program instances that all satisfy the stub's
constraints. It supports:
- Adding/removing programs
- Filtering by execution correctness
- Computing diversity metrics (DA@K from AlgoDiv, 2503.00691)
- Clustering by behavioral equivalence
"""
from __future__ import annotations
import ast
import hashlib
import traceback
import sys
import io
from math import comb, log, exp
from dataclasses import dataclass, field
from typing import Any, Optional, Callable
from collections import defaultdict
from reason_first_program.stub import Stub
@dataclass
class ExecutionTrace:
"""Execution trace for a single program on a single input."""
input_args: dict[str, Any]
output: Any = None
stdout: str = ""
stderr: str = ""
exception: Optional[str] = None
execution_time_ms: float = 0.0
# Behavioral fingerprint components
branch_coverage: Optional[list[bool]] = None
variable_snapshots: Optional[dict[str, list[Any]]] = None
@property
def succeeded(self) -> bool:
return self.exception is None
@property
def output_hash(self) -> str:
"""Hash of the output for functional equivalence checking."""
return hashlib.md5(repr(self.output).encode()).hexdigest()
@dataclass
class Program:
"""
A single implementation in the program space.
Attributes:
source: The implementation source code (function body only)
full_source: The complete function including signature
stub_id: ID of the stub this implements
model_id: Which model generated this (for diversity tracking)
traces: Execution traces from running against test inputs
metadata: Arbitrary metadata (temperature, prompt style, etc.)
"""
source: str
full_source: str
stub_id: str
model_id: str = "unknown"
traces: list[ExecutionTrace] = field(default_factory=list)
metadata: dict[str, Any] = field(default_factory=dict)
concept_scores: dict[str, float] = field(default_factory=dict)
@property
def program_id(self) -> str:
"""Unique identifier based on source content."""
return hashlib.sha256(self.source.encode()).hexdigest()[:16]
@property
def is_valid(self) -> bool:
"""Whether all execution traces succeeded."""
return len(self.traces) > 0 and all(t.succeeded for t in self.traces)
@property
def functional_signature(self) -> str:
"""
A string that identifies the program's functional behavior.
Two programs with the same functional signature produce identical
outputs on all test inputs (functional equivalence, per 2302.05433).
"""
if not self.traces:
return "untested"
return "|".join(t.output_hash for t in self.traces if t.succeeded)
@property
def syntactic_hash(self) -> str:
"""Normalized AST hash for syntactic deduplication."""
try:
tree = ast.parse(self.source)
return hashlib.md5(ast.dump(tree).encode()).hexdigest()[:12]
except SyntaxError:
return hashlib.md5(self.source.encode()).hexdigest()[:12]
def behavioral_vector(self) -> list[float]:
"""
Extract a behavioral feature vector from execution traces.
Based on TRACED (2306.07487): quantized variable values + branch coverage.
"""
features = []
for trace in self.traces:
if trace.succeeded:
# Output type encoding
output_type = type(trace.output).__name__
type_map = {
"int": 0, "float": 1, "str": 2, "list": 3,
"dict": 4, "tuple": 5, "set": 6, "bool": 7, "NoneType": 8
}
features.append(type_map.get(output_type, 9))
# Output magnitude (quantized)
try:
if isinstance(trace.output, (int, float)):
features.append(float(trace.output))
elif isinstance(trace.output, (list, tuple, set, dict)):
features.append(float(len(trace.output)))
else:
features.append(0.0)
except (TypeError, ValueError):
features.append(0.0)
# Execution time quantile
features.append(trace.execution_time_ms)
return features
def execute_program(
program: Program,
stub: Stub,
test_inputs: list[dict[str, Any]],
timeout_seconds: float = 5.0,
) -> Program:
"""
Execute a program against test inputs and record traces.
Args:
program: The program to execute
stub: The stub definition (for function name, signature)
test_inputs: List of input dicts
timeout_seconds: Max execution time per input
Returns:
The program with traces populated
"""
import time
program.traces = []
for inputs in test_inputs:
trace = ExecutionTrace(input_args=inputs)
try:
# Build execution namespace with the program
namespace: dict[str, Any] = {}
# Include imports from the stub's module context
if stub.module_source:
try:
exec(
"\n".join(
line
for line in stub.module_source.split("\n")
if line.strip().startswith(("import ", "from "))
),
namespace,
)
except Exception:
pass
# Execute the full function definition
exec(program.full_source, namespace)
# Call the function
func = namespace[stub.name]
start = time.perf_counter()
result = func(**inputs)
elapsed = (time.perf_counter() - start) * 1000
trace.output = result
trace.execution_time_ms = elapsed
except Exception as e:
trace.exception = f"{type(e).__name__}: {e}"
trace.stderr = traceback.format_exc()
program.traces.append(trace)
return program
class ProgramSpace:
"""
The set of valid implementations for a stub.
This is the central data structure of the framework. It holds all sampled
programs, supports filtering, clustering, and diversity measurement.
"""
def __init__(self, stub: Stub):
self.stub = stub
self._programs: dict[str, Program] = {} # program_id -> Program
self._clusters: Optional[list[list[str]]] = None # Cached clustering
self._cluster_labels: Optional[dict[str, str]] = None
@property
def programs(self) -> list[Program]:
return list(self._programs.values())
@property
def valid_programs(self) -> list[Program]:
return [p for p in self._programs.values() if p.is_valid]
def __len__(self) -> int:
return len(self._programs)
def add(self, program: Program) -> str:
"""Add a program to the space. Returns program_id."""
self._programs[program.program_id] = program
self._clusters = None # Invalidate cache
return program.program_id
def add_many(self, programs: list[Program]) -> list[str]:
"""Add multiple programs. Returns list of program_ids."""
return [self.add(p) for p in programs]
def get(self, program_id: str) -> Optional[Program]:
return self._programs.get(program_id)
def remove(self, program_id: str) -> None:
self._programs.pop(program_id, None)
self._clusters = None
def filter_valid(self) -> ProgramSpace:
"""Return a new ProgramSpace containing only valid programs."""
new_space = ProgramSpace(self.stub)
for p in self.valid_programs:
new_space.add(p)
return new_space
def deduplicate_syntactic(self) -> ProgramSpace:
"""Remove syntactically identical programs (normalized AST)."""
seen: set[str] = set()
new_space = ProgramSpace(self.stub)
for p in self.programs:
h = p.syntactic_hash
if h not in seen:
seen.add(h)
new_space.add(p)
return new_space
def deduplicate_functional(self) -> ProgramSpace:
"""Remove functionally equivalent programs (same outputs on all inputs)."""
seen: set[str] = set()
new_space = ProgramSpace(self.stub)
for p in self.programs:
sig = p.functional_signature
if sig not in seen:
seen.add(sig)
new_space.add(p)
return new_space
# ---- Diversity Metrics (from AlgoDiv, 2503.00691) ----
def functional_clusters(self) -> list[list[Program]]:
"""
Cluster programs by functional equivalence.
Programs with identical output signatures go in the same cluster.
"""
clusters: dict[str, list[Program]] = defaultdict(list)
for p in self.valid_programs:
clusters[p.functional_signature].append(p)
return list(clusters.values())
def da_at_k(self, k: int) -> float:
"""
DA@K: Expected number of distinct algorithms in K samples.
From Definition 3.1 of AlgoDiv (2503.00691):
DA@K = Σ_m (1 - C(N - s_m, K) / C(N, K))
where M = number of clusters, s_m = size of cluster m, N = total programs.
"""
clusters = self.functional_clusters()
n = len(self.valid_programs)
if n == 0 or k == 0 or k > n:
return 0.0
da = 0.0
for cluster in clusters:
s_m = len(cluster)
# Probability that cluster m is represented in a sample of size K
if n - s_m >= k:
prob_missing = comb(n - s_m, k) / comb(n, k)
else:
prob_missing = 0.0
da += (1 - prob_missing)
return da
def entropy_diversity(self) -> float:
"""
EA: Entropy-based algorithmic diversity index.
EA = exp(-Σ p_m log p_m) where p_m = s_m / N
"""
clusters = self.functional_clusters()
n = len(self.valid_programs)
if n == 0:
return 0.0
entropy = 0.0
for cluster in clusters:
p = len(cluster) / n
if p > 0:
entropy -= p * log(p)
return exp(entropy)
def diversity_report(self) -> dict[str, Any]:
"""Comprehensive diversity report for the program space."""
clusters = self.functional_clusters()
n_valid = len(self.valid_programs)
n_total = len(self._programs)
return {
"total_programs": n_total,
"valid_programs": n_valid,
"syntactically_unique": len(
set(p.syntactic_hash for p in self.programs)
),
"functionally_unique_clusters": len(clusters),
"cluster_sizes": sorted(
[len(c) for c in clusters], reverse=True
),
"da_at_5": self.da_at_k(5) if n_valid >= 5 else None,
"da_at_10": self.da_at_k(10) if n_valid >= 10 else None,
"entropy_diversity": self.entropy_diversity(),
"models_used": list(
set(p.model_id for p in self.programs)
),
}
def to_dict(self) -> dict[str, Any]:
"""Serialize the program space for storage."""
return {
"stub_id": self.stub.stub_id,
"stub_name": self.stub.name,
"programs": [
{
"program_id": p.program_id,
"source": p.source,
"full_source": p.full_source,
"model_id": p.model_id,
"is_valid": p.is_valid,
"functional_signature": p.functional_signature,
"concept_scores": p.concept_scores,
"metadata": p.metadata,
}
for p in self.programs
],
"diversity": self.diversity_report(),
}