Source code for honegumi_rag_assistant.orchestrator

"""
Orchestrate the Honegumi RAG Assistant workflow.

This module assembles the various nodes defined in :mod:`honegumi_rag_assistant.nodes`
into a LangGraph state machine.  The :func:`build_graph` function
constructs the graph and returns a compiled version ready for
execution.  The :func:`run` convenience function reads a problem
description from disk, runs the graph and writes the resulting Python
script to the configured output directory.

You can customise the workflow by modifying the nodes or adding new
conditional branches.  See the LangGraph documentation for further
details on graph construction.
"""

from __future__ import annotations

import os
import time
from pathlib import Path
from typing import Dict, Any

try:
    from langgraph.graph import StateGraph, START, END  # type: ignore[import]
    from langgraph.constants import Send  # type: ignore[import]
    from langgraph.checkpoint.memory import MemorySaver  # type: ignore[import]
except ImportError:
    # These imports are optional to allow type checking without installing
    # LangGraph.  If LangGraph is not available the graph cannot be
    # constructed or run; this will be surfaced when build_graph is called.
    StateGraph = None  # type: ignore
    START = END = None  # type: ignore
    MemorySaver = None  # type: ignore
    Send = None  # type: ignore

from .states import HonegumiRAGState
from .nodes import (
    ParameterSelector,
    SkeletonGenerator,
    RetrievalPlannerAgent,
    RetrieverAgent,
    CodeWriterAgent,
    ReviewerAgent,
)
from .nodes.retriever import retrieve_single_query
from .app_config import settings


[docs] def build_graph(skip_review: bool = False, enable_review: bool = False) -> Any: """Construct and return the compiled LangGraph for the agentic pipeline. Parameters ---------- skip_review : bool, optional DEPRECATED: Use enable_review instead. If True, skip the review step. Defaults to False. enable_review : bool, optional If True, include the reviewer step. Defaults to False (reviewer disabled by default). Returns ------- StateGraph A compiled LangGraph instance with: - Retrieval Planner → Parallel Retrieval (fan-out/fan-in) - Optional Reviewer (if enable_review=True) If LangGraph is unavailable, a :class:`RuntimeError` will be raised. """ if StateGraph is None: raise RuntimeError( "LangGraph is not installed. Please add 'langgraph' to your dependencies." ) # Determine if reviewer is enabled # Note: skip_review is deprecated but still supported for backward compatibility use_reviewer = enable_review and not skip_review builder = StateGraph(HonegumiRAGState) # Add nodes builder.add_node("select_parameters", ParameterSelector.select_parameters) builder.add_node("generate_skeleton", SkeletonGenerator.generate_skeleton) builder.add_node("plan_retrieval", RetrievalPlannerAgent.plan_retrieval) builder.add_node("retrieve_parallel", lambda state: retrieve_single_query( state["query"], state["index"] )) builder.add_node("write_code", CodeWriterAgent.write_code) if use_reviewer: builder.add_node("review_code", ReviewerAgent.review_code) # Define routing functions def continue_to_retrieval(state: HonegumiRAGState) -> list: """Fan out to parallel retrieval or skip to code writing. Uses LangGraph's Send API to dynamically create parallel subgraphs for each retrieval query generated by the planner. """ queries = state.get("retrieval_queries", []) if not queries: # No retrieval needed, go directly to code writing if settings.debug: print("\n[ORCHESTRATOR] No retrieval queries - skipping to code writer\n") return [Send("write_code", state)] # Check if vector store exists before attempting retrieval from pathlib import Path vectorstore_exists = ( settings.retrieval_vectorstore_path and Path(settings.retrieval_vectorstore_path).exists() and Path(settings.retrieval_vectorstore_path).is_dir() ) if not vectorstore_exists: # Vector store not configured or doesn't exist - notify user and skip to code writing if not settings.debug: print("\n⚠️ Vector store not found - generating code without documentation retrieval") print(" To enable RAG, build the vector store with:") print(" python -m honegumi_rag_assistant.build_vector_store\n") return [Send("write_code", state)] if settings.debug: print(f"\n[ORCHESTRATOR] Fanning out to {len(queries)} parallel retrievers\n") # Fan out to parallel retrievers # Each Send creates an independent subgraph execution return [ Send("retrieve_parallel", {"query": query, "index": i}) for i, query in enumerate(queries) ] def route_after_review(state: HonegumiRAGState) -> str: """Route based on reviewer decision.""" if state.get("final_code"): # Approved return END else: # Revision requested - Note: In new architecture, we don't loop back # because retrieval is done upfront. If revision is needed, Code Writer # works with existing contexts. return "write_code" # Wire the graph builder.add_edge(START, "select_parameters") builder.add_edge("select_parameters", "generate_skeleton") builder.add_edge("generate_skeleton", "plan_retrieval") # Fan-out from planner to parallel retrievers (or skip to code writer) # This uses Send API which allows dynamic routing builder.add_conditional_edges( "plan_retrieval", continue_to_retrieval, # No path map needed - Send() handles routing dynamically ) # All parallel retrievers fan-in to code writer # Note: LangGraph automatically handles fan-in - when all Send branches complete, # the state is merged and passed to the next node builder.add_edge("retrieve_parallel", "write_code") # Code writer to reviewer or END if use_reviewer: # With reviewer: code writer always goes to review builder.add_edge("write_code", "review_code") # Reviewer to END or back to code writer (if revision needed) builder.add_conditional_edges( "review_code", route_after_review, { END: END, "write_code": "write_code", } ) else: # No reviewer - code writer goes directly to END builder.add_edge("write_code", END) # Compile the graph with in‑memory checkpointing return builder.compile(checkpointer=MemorySaver())
[docs] def run(problem_txt_path: str, output_dir: str | None = None, debug: bool = False) -> str: """Run the full pipeline on the given problem file and write the code to disk. Parameters ---------- problem_txt_path : str Path to a UTF‑8 encoded text file containing the natural language problem description. output_dir : str or None, optional Directory where the generated script should be saved. If ``None``, the value from :data:`settings.output_dir` is used. debug : bool, optional Whether to enable debug mode with verbose output. Defaults to False. Returns ------- str The generated Python script. Raises if the pipeline fails. """ # Read the problem description from file with open(problem_txt_path, "r", encoding="utf-8") as f: problem = f.read().strip() # Use the stem of the file path as the run ID run_id = Path(problem_txt_path).stem return run_from_text(problem, output_dir=output_dir, debug=debug, run_id=run_id)
[docs] def run_from_text( problem: str, output_dir: str | None = None, debug: bool = False, run_id: str | None = None, skip_review: bool = False, enable_review: bool = False, ) -> str: """Run the full pipeline on a problem description string and optionally write the code to disk. Parameters ---------- problem : str Natural language problem description. output_dir : str or None, optional Directory where the generated script should be saved. If ``None``, the code is NOT saved to disk (only returned). debug : bool, optional Whether to enable debug mode with verbose output. Defaults to False. run_id : str or None, optional Unique identifier for this run (used for thread ID and output filename). If None, a hash of the problem text will be used. skip_review : bool, optional DEPRECATED: Use enable_review instead. If True, skip the review step. Defaults to False. enable_review : bool, optional If True, enable the reviewer step. Defaults to False (reviewer disabled by default). Returns ------- str The generated Python script. Raises if the pipeline fails. """ import hashlib # Set debug mode in settings settings.debug = debug # Start timing start_time = time.time() # Determine the output directory (only if user provided one) target_dir = None if output_dir: target_dir = Path(output_dir) target_dir.mkdir(parents=True, exist_ok=True) # Generate run ID if not provided if run_id is None: run_id = hashlib.sha256(problem.encode()).hexdigest()[:12] graph = build_graph(skip_review=skip_review, enable_review=enable_review) # Each run of the graph must be associated with a unique thread ID to # ensure that the state is isolated. thread_id = f"honegumi_rag_{run_id}" thread = {"configurable": {"thread_id": thread_id}} # Provide the initial state with loop counters initialized inputs: Dict[str, Any] = { "problem": problem, "retrieval_count": 0, "review_count": 0, "retrieval_queries": [], # Initialize as empty list "contexts": [], # Initialize contexts as empty list } # Stream through the graph. We ignore intermediate values; the # checkpoint will retain the final state for us to inspect. retrieval_completed = False retrieval_started = False retrieval_start_time = None vectorstore_missing = False retrieval_queries_count = 0 last_context_count = 0 for state_update in graph.stream(inputs, thread, stream_mode="values"): # Track when retrieval starts (when retrieval_queries appear) if not retrieval_started and state_update.get("retrieval_queries"): queries = state_update.get("retrieval_queries", []) if len(queries) > 0: retrieval_started = True retrieval_start_time = time.time() retrieval_queries_count = len(queries) # Check for vectorstore_missing flag in state (it gets set by retrievers) if state_update.get("vectorstore_missing") is True: vectorstore_missing = True # Check if we just completed retrieval (contexts were added) # This happens when we move to code_writer stage if retrieval_started and not retrieval_completed: contexts = state_update.get("contexts", []) current_context_count = len(contexts) # Retrieval is complete when contexts are populated and count changed if current_context_count > last_context_count: retrieval_completed = True # Show appropriate message based on whether vector store was available if vectorstore_missing: print("\n⚠️ Vector store not found - generating code without documentation retrieval") print(" To enable RAG, build the vector store with: python scripts/build_vector_store.py\n") elif current_context_count > 0: # Calculate total parallel retrieval time total_retrieval_time = time.time() - retrieval_start_time if retrieval_start_time else 0 print(f"✓ Retrieved {current_context_count} contexts in {total_retrieval_time:.2f}s\n") last_context_count = current_context_count # Retrieve the final state final_state: HonegumiRAGState = graph.get_state(thread).values # type: ignore[assignment] if final_state.get("error"): raise RuntimeError(f"Pipeline failed: {final_state['error']}") # In debug mode, print summary of retrieved contexts at the end if debug: contexts = final_state.get("contexts", []) if contexts: print("\n" + "="*80) print("DEBUG: RETRIEVED CONTEXT SNIPPETS SUMMARY") print("="*80) for i, ctx in enumerate(contexts, 1): ctx_text = ctx.get("text", "") if isinstance(ctx, dict) else str(ctx) query = ctx.get("query", "Unknown") if isinstance(ctx, dict) else "Unknown" print(f"\n--- Context Snippet {i} (Query: {query}, {len(ctx_text)} chars) ---") # Print first 300 chars of each context to keep it readable preview = ctx_text[:300] + "..." if len(ctx_text) > 300 else ctx_text print(preview) print("="*80 + "\n") print("\n" + "="*80) print("DEBUG: FINAL APPROVED CODE") print("="*80) code = final_state.get("final_code") if not code: # Fallback: check if candidate_code is available (in case final_code wasn't set) code = final_state.get("candidate_code") if not code: # Debug: print the entire final state to see what we have if debug: print("\n[DEBUG] Final state keys:", list(final_state.keys())) print("[DEBUG] final_code:", final_state.get("final_code")) print("[DEBUG] candidate_code:", final_state.get("candidate_code")) raise RuntimeError("Pipeline did not produce final code.") if debug: print(code) print("="*80 + "\n") # Write the code to disk only if output_dir was specified if target_dir: outfile = target_dir / f"honegumi_generated_{run_id}.py" with open(outfile, "w", encoding="utf-8") as f: f.write(code) if debug: print(f"Code saved to: {outfile}\n") # Print total execution time (only in debug mode) if debug: total_time = time.time() - start_time print(f"\n{'='*80}") print(f"Total execution time: {total_time:.2f}s") print(f"{'='*80}\n") return code
[docs] def run_from_text_with_state( problem: str, output_dir: str | None = None, run_id: str | None = None, ) -> Dict[str, Any]: """Run the pipeline and return the full state (for batch processing). Parameters ---------- problem : str Natural language problem description. output_dir : str or None, optional Directory where the generated script should be saved. If ``None``, the value from :data:`settings.output_dir` is used. run_id : str or None, optional Unique identifier for this run. If None, a hash will be used. Returns ------- Dict[str, Any] Dictionary containing: - problem: Original problem text - bo_params: Extracted parameters (dict) - skeleton_code: Generated skeleton - retrieval_count: Number of retrievals performed - retrieval_queries: List of questions asked - review_count: Number of reviews performed - critique_reports: List of review feedback - contexts: Retrieved documentation snippets - final_code: Generated Python code - error: Error message if failed (or None) """ import hashlib # Always use debug mode for batch processing (but suppress print output) settings.debug = False # We'll collect data without printing # Determine the output directory target_dir = Path(output_dir or settings.output_dir) target_dir.mkdir(parents=True, exist_ok=True) # Generate run ID if not provided if run_id is None: run_id = hashlib.sha256(problem.encode()).hexdigest()[:12] graph = build_graph() thread_id = f"honegumi_rag_{run_id}" thread = {"configurable": {"thread_id": thread_id}} inputs: Dict[str, Any] = { "problem": problem, "retrieval_count": 0, "review_count": 0, "retrieval_query": None, "contexts": [], # Initialize contexts as empty list } # Collect retrieval queries and critique reports retrieval_queries = [] critique_reports = [] try: # Stream through the graph and collect intermediate values for state in graph.stream(inputs, thread, stream_mode="values"): # Collect retrieval queries if state.get("retrieval_query"): retrieval_queries.append(state["retrieval_query"]) # Collect critique reports if state.get("critique_report"): critique_reports.extend(state["critique_report"]) # Retrieve the final state final_state: HonegumiRAGState = graph.get_state(thread).values # type: ignore[assignment] # Build result dictionary result = { "problem": problem, "run_id": run_id, "bo_params": final_state.get("bo_params", {}), "skeleton_code": final_state.get("skeleton_code", ""), "retrieval_count": final_state.get("retrieval_count", 0), "retrieval_queries": retrieval_queries, "review_count": final_state.get("review_count", 0), "critique_reports": critique_reports, "contexts": final_state.get("contexts", []), "final_code": final_state.get("final_code", ""), "error": final_state.get("error"), } # Write the code to disk if successful if result["final_code"] and not result["error"]: outfile = target_dir / f"honegumi_generated_{run_id}.py" with open(outfile, "w", encoding="utf-8") as f: f.write(result["final_code"]) return result except Exception as exc: return { "problem": problem, "run_id": run_id, "bo_params": {}, "skeleton_code": "", "retrieval_count": 0, "retrieval_queries": [], "review_count": 0, "critique_reports": [], "contexts": [], "final_code": "", "error": str(exc), }