"""
Orchestrate the Honegumi RAG Assistant workflow.
This module assembles the various nodes defined in :mod:`honegumi_rag_assistant.nodes`
into a LangGraph state machine. The :func:`build_graph` function
constructs the graph and returns a compiled version ready for
execution. The :func:`run` convenience function reads a problem
description from disk, runs the graph and writes the resulting Python
script to the configured output directory.
You can customise the workflow by modifying the nodes or adding new
conditional branches. See the LangGraph documentation for further
details on graph construction.
"""
from __future__ import annotations
import os
import time
from pathlib import Path
from typing import Dict, Any
try:
from langgraph.graph import StateGraph, START, END # type: ignore[import]
from langgraph.constants import Send # type: ignore[import]
from langgraph.checkpoint.memory import MemorySaver # type: ignore[import]
except ImportError:
# These imports are optional to allow type checking without installing
# LangGraph. If LangGraph is not available the graph cannot be
# constructed or run; this will be surfaced when build_graph is called.
StateGraph = None # type: ignore
START = END = None # type: ignore
MemorySaver = None # type: ignore
Send = None # type: ignore
from .states import HonegumiRAGState
from .nodes import (
ParameterSelector,
SkeletonGenerator,
RetrievalPlannerAgent,
RetrieverAgent,
CodeWriterAgent,
ReviewerAgent,
)
from .nodes.retriever import retrieve_single_query
from .app_config import settings
[docs]
def build_graph(skip_review: bool = False, enable_review: bool = False) -> Any:
"""Construct and return the compiled LangGraph for the agentic pipeline.
Parameters
----------
skip_review : bool, optional
DEPRECATED: Use enable_review instead. If True, skip the review step. Defaults to False.
enable_review : bool, optional
If True, include the reviewer step. Defaults to False (reviewer disabled by default).
Returns
-------
StateGraph
A compiled LangGraph instance with:
- Retrieval Planner → Parallel Retrieval (fan-out/fan-in)
- Optional Reviewer (if enable_review=True)
If LangGraph is unavailable, a :class:`RuntimeError` will be raised.
"""
if StateGraph is None:
raise RuntimeError(
"LangGraph is not installed. Please add 'langgraph' to your dependencies."
)
# Determine if reviewer is enabled
# Note: skip_review is deprecated but still supported for backward compatibility
use_reviewer = enable_review and not skip_review
builder = StateGraph(HonegumiRAGState)
# Add nodes
builder.add_node("select_parameters", ParameterSelector.select_parameters)
builder.add_node("generate_skeleton", SkeletonGenerator.generate_skeleton)
builder.add_node("plan_retrieval", RetrievalPlannerAgent.plan_retrieval)
builder.add_node("retrieve_parallel", lambda state: retrieve_single_query(
state["query"], state["index"]
))
builder.add_node("write_code", CodeWriterAgent.write_code)
if use_reviewer:
builder.add_node("review_code", ReviewerAgent.review_code)
# Define routing functions
def continue_to_retrieval(state: HonegumiRAGState) -> list:
"""Fan out to parallel retrieval or skip to code writing.
Uses LangGraph's Send API to dynamically create parallel subgraphs
for each retrieval query generated by the planner.
"""
queries = state.get("retrieval_queries", [])
if not queries:
# No retrieval needed, go directly to code writing
if settings.debug:
print("\n[ORCHESTRATOR] No retrieval queries - skipping to code writer\n")
return [Send("write_code", state)]
# Check if vector store exists before attempting retrieval
from pathlib import Path
vectorstore_exists = (
settings.retrieval_vectorstore_path and
Path(settings.retrieval_vectorstore_path).exists() and
Path(settings.retrieval_vectorstore_path).is_dir()
)
if not vectorstore_exists:
# Vector store not configured or doesn't exist - notify user and skip to code writing
if not settings.debug:
print("\n⚠️ Vector store not found - generating code without documentation retrieval")
print(" To enable RAG, build the vector store with:")
print(" python -m honegumi_rag_assistant.build_vector_store\n")
return [Send("write_code", state)]
if settings.debug:
print(f"\n[ORCHESTRATOR] Fanning out to {len(queries)} parallel retrievers\n")
# Fan out to parallel retrievers
# Each Send creates an independent subgraph execution
return [
Send("retrieve_parallel", {"query": query, "index": i})
for i, query in enumerate(queries)
]
def route_after_review(state: HonegumiRAGState) -> str:
"""Route based on reviewer decision."""
if state.get("final_code"):
# Approved
return END
else:
# Revision requested - Note: In new architecture, we don't loop back
# because retrieval is done upfront. If revision is needed, Code Writer
# works with existing contexts.
return "write_code"
# Wire the graph
builder.add_edge(START, "select_parameters")
builder.add_edge("select_parameters", "generate_skeleton")
builder.add_edge("generate_skeleton", "plan_retrieval")
# Fan-out from planner to parallel retrievers (or skip to code writer)
# This uses Send API which allows dynamic routing
builder.add_conditional_edges(
"plan_retrieval",
continue_to_retrieval,
# No path map needed - Send() handles routing dynamically
)
# All parallel retrievers fan-in to code writer
# Note: LangGraph automatically handles fan-in - when all Send branches complete,
# the state is merged and passed to the next node
builder.add_edge("retrieve_parallel", "write_code")
# Code writer to reviewer or END
if use_reviewer:
# With reviewer: code writer always goes to review
builder.add_edge("write_code", "review_code")
# Reviewer to END or back to code writer (if revision needed)
builder.add_conditional_edges(
"review_code",
route_after_review,
{
END: END,
"write_code": "write_code",
}
)
else:
# No reviewer - code writer goes directly to END
builder.add_edge("write_code", END)
# Compile the graph with in‑memory checkpointing
return builder.compile(checkpointer=MemorySaver())
[docs]
def run(problem_txt_path: str, output_dir: str | None = None, debug: bool = False) -> str:
"""Run the full pipeline on the given problem file and write the code to disk.
Parameters
----------
problem_txt_path : str
Path to a UTF‑8 encoded text file containing the natural
language problem description.
output_dir : str or None, optional
Directory where the generated script should be saved. If
``None``, the value from :data:`settings.output_dir` is used.
debug : bool, optional
Whether to enable debug mode with verbose output. Defaults to False.
Returns
-------
str
The generated Python script. Raises if the pipeline fails.
"""
# Read the problem description from file
with open(problem_txt_path, "r", encoding="utf-8") as f:
problem = f.read().strip()
# Use the stem of the file path as the run ID
run_id = Path(problem_txt_path).stem
return run_from_text(problem, output_dir=output_dir, debug=debug, run_id=run_id)
[docs]
def run_from_text(
problem: str,
output_dir: str | None = None,
debug: bool = False,
run_id: str | None = None,
skip_review: bool = False,
enable_review: bool = False,
) -> str:
"""Run the full pipeline on a problem description string and optionally write the code to disk.
Parameters
----------
problem : str
Natural language problem description.
output_dir : str or None, optional
Directory where the generated script should be saved. If
``None``, the code is NOT saved to disk (only returned).
debug : bool, optional
Whether to enable debug mode with verbose output. Defaults to False.
run_id : str or None, optional
Unique identifier for this run (used for thread ID and output filename).
If None, a hash of the problem text will be used.
skip_review : bool, optional
DEPRECATED: Use enable_review instead. If True, skip the review step. Defaults to False.
enable_review : bool, optional
If True, enable the reviewer step. Defaults to False (reviewer disabled by default).
Returns
-------
str
The generated Python script. Raises if the pipeline fails.
"""
import hashlib
# Set debug mode in settings
settings.debug = debug
# Start timing
start_time = time.time()
# Determine the output directory (only if user provided one)
target_dir = None
if output_dir:
target_dir = Path(output_dir)
target_dir.mkdir(parents=True, exist_ok=True)
# Generate run ID if not provided
if run_id is None:
run_id = hashlib.sha256(problem.encode()).hexdigest()[:12]
graph = build_graph(skip_review=skip_review, enable_review=enable_review)
# Each run of the graph must be associated with a unique thread ID to
# ensure that the state is isolated.
thread_id = f"honegumi_rag_{run_id}"
thread = {"configurable": {"thread_id": thread_id}}
# Provide the initial state with loop counters initialized
inputs: Dict[str, Any] = {
"problem": problem,
"retrieval_count": 0,
"review_count": 0,
"retrieval_queries": [], # Initialize as empty list
"contexts": [], # Initialize contexts as empty list
}
# Stream through the graph. We ignore intermediate values; the
# checkpoint will retain the final state for us to inspect.
retrieval_completed = False
retrieval_started = False
retrieval_start_time = None
vectorstore_missing = False
retrieval_queries_count = 0
last_context_count = 0
for state_update in graph.stream(inputs, thread, stream_mode="values"):
# Track when retrieval starts (when retrieval_queries appear)
if not retrieval_started and state_update.get("retrieval_queries"):
queries = state_update.get("retrieval_queries", [])
if len(queries) > 0:
retrieval_started = True
retrieval_start_time = time.time()
retrieval_queries_count = len(queries)
# Check for vectorstore_missing flag in state (it gets set by retrievers)
if state_update.get("vectorstore_missing") is True:
vectorstore_missing = True
# Check if we just completed retrieval (contexts were added)
# This happens when we move to code_writer stage
if retrieval_started and not retrieval_completed:
contexts = state_update.get("contexts", [])
current_context_count = len(contexts)
# Retrieval is complete when contexts are populated and count changed
if current_context_count > last_context_count:
retrieval_completed = True
# Show appropriate message based on whether vector store was available
if vectorstore_missing:
print("\n⚠️ Vector store not found - generating code without documentation retrieval")
print(" To enable RAG, build the vector store with: python scripts/build_vector_store.py\n")
elif current_context_count > 0:
# Calculate total parallel retrieval time
total_retrieval_time = time.time() - retrieval_start_time if retrieval_start_time else 0
print(f"✓ Retrieved {current_context_count} contexts in {total_retrieval_time:.2f}s\n")
last_context_count = current_context_count
# Retrieve the final state
final_state: HonegumiRAGState = graph.get_state(thread).values # type: ignore[assignment]
if final_state.get("error"):
raise RuntimeError(f"Pipeline failed: {final_state['error']}")
# In debug mode, print summary of retrieved contexts at the end
if debug:
contexts = final_state.get("contexts", [])
if contexts:
print("\n" + "="*80)
print("DEBUG: RETRIEVED CONTEXT SNIPPETS SUMMARY")
print("="*80)
for i, ctx in enumerate(contexts, 1):
ctx_text = ctx.get("text", "") if isinstance(ctx, dict) else str(ctx)
query = ctx.get("query", "Unknown") if isinstance(ctx, dict) else "Unknown"
print(f"\n--- Context Snippet {i} (Query: {query}, {len(ctx_text)} chars) ---")
# Print first 300 chars of each context to keep it readable
preview = ctx_text[:300] + "..." if len(ctx_text) > 300 else ctx_text
print(preview)
print("="*80 + "\n")
print("\n" + "="*80)
print("DEBUG: FINAL APPROVED CODE")
print("="*80)
code = final_state.get("final_code")
if not code:
# Fallback: check if candidate_code is available (in case final_code wasn't set)
code = final_state.get("candidate_code")
if not code:
# Debug: print the entire final state to see what we have
if debug:
print("\n[DEBUG] Final state keys:", list(final_state.keys()))
print("[DEBUG] final_code:", final_state.get("final_code"))
print("[DEBUG] candidate_code:", final_state.get("candidate_code"))
raise RuntimeError("Pipeline did not produce final code.")
if debug:
print(code)
print("="*80 + "\n")
# Write the code to disk only if output_dir was specified
if target_dir:
outfile = target_dir / f"honegumi_generated_{run_id}.py"
with open(outfile, "w", encoding="utf-8") as f:
f.write(code)
if debug:
print(f"Code saved to: {outfile}\n")
# Print total execution time (only in debug mode)
if debug:
total_time = time.time() - start_time
print(f"\n{'='*80}")
print(f"Total execution time: {total_time:.2f}s")
print(f"{'='*80}\n")
return code
[docs]
def run_from_text_with_state(
problem: str,
output_dir: str | None = None,
run_id: str | None = None,
) -> Dict[str, Any]:
"""Run the pipeline and return the full state (for batch processing).
Parameters
----------
problem : str
Natural language problem description.
output_dir : str or None, optional
Directory where the generated script should be saved. If
``None``, the value from :data:`settings.output_dir` is used.
run_id : str or None, optional
Unique identifier for this run. If None, a hash will be used.
Returns
-------
Dict[str, Any]
Dictionary containing:
- problem: Original problem text
- bo_params: Extracted parameters (dict)
- skeleton_code: Generated skeleton
- retrieval_count: Number of retrievals performed
- retrieval_queries: List of questions asked
- review_count: Number of reviews performed
- critique_reports: List of review feedback
- contexts: Retrieved documentation snippets
- final_code: Generated Python code
- error: Error message if failed (or None)
"""
import hashlib
# Always use debug mode for batch processing (but suppress print output)
settings.debug = False # We'll collect data without printing
# Determine the output directory
target_dir = Path(output_dir or settings.output_dir)
target_dir.mkdir(parents=True, exist_ok=True)
# Generate run ID if not provided
if run_id is None:
run_id = hashlib.sha256(problem.encode()).hexdigest()[:12]
graph = build_graph()
thread_id = f"honegumi_rag_{run_id}"
thread = {"configurable": {"thread_id": thread_id}}
inputs: Dict[str, Any] = {
"problem": problem,
"retrieval_count": 0,
"review_count": 0,
"retrieval_query": None,
"contexts": [], # Initialize contexts as empty list
}
# Collect retrieval queries and critique reports
retrieval_queries = []
critique_reports = []
try:
# Stream through the graph and collect intermediate values
for state in graph.stream(inputs, thread, stream_mode="values"):
# Collect retrieval queries
if state.get("retrieval_query"):
retrieval_queries.append(state["retrieval_query"])
# Collect critique reports
if state.get("critique_report"):
critique_reports.extend(state["critique_report"])
# Retrieve the final state
final_state: HonegumiRAGState = graph.get_state(thread).values # type: ignore[assignment]
# Build result dictionary
result = {
"problem": problem,
"run_id": run_id,
"bo_params": final_state.get("bo_params", {}),
"skeleton_code": final_state.get("skeleton_code", ""),
"retrieval_count": final_state.get("retrieval_count", 0),
"retrieval_queries": retrieval_queries,
"review_count": final_state.get("review_count", 0),
"critique_reports": critique_reports,
"contexts": final_state.get("contexts", []),
"final_code": final_state.get("final_code", ""),
"error": final_state.get("error"),
}
# Write the code to disk if successful
if result["final_code"] and not result["error"]:
outfile = target_dir / f"honegumi_generated_{run_id}.py"
with open(outfile, "w", encoding="utf-8") as f:
f.write(result["final_code"])
return result
except Exception as exc:
return {
"problem": problem,
"run_id": run_id,
"bo_params": {},
"skeleton_code": "",
"retrieval_count": 0,
"retrieval_queries": [],
"review_count": 0,
"critique_reports": [],
"contexts": [],
"final_code": "",
"error": str(exc),
}