Source code for honegumi_rag_assistant.nodes.retriever

"""
Node: Retrieve relevant documentation contexts.

This node provides the code writer with additional grounding by
extracting snippets from the Ax documentation.  When configured with
``settings.retrieval_vectorstore_path``, it attempts to load a vector
store (for example a FAISS index) created from the documentation and
performs a similarity search using a query derived from the problem
description and optimisation parameters.  If the vector store or
dependencies are unavailable, it returns an empty list of contexts.

Each returned context should be a dictionary containing at least a
``text`` key with the content.  Additional metadata such as ``source``
and ``score`` may be included depending on the vector store
implementation.
"""

from __future__ import annotations

from typing import Dict, Any, List
import json
import time

from ..states import HonegumiRAGState
from ..app_config import settings
from ..timing_utils import time_node

# Optional dependencies.  LangChain is not always installed.
try:
    from langchain_community.vectorstores import FAISS  # type: ignore[import]
    from langchain_openai import OpenAIEmbeddings  # type: ignore[import]
except ImportError:  # pragma: no cover - optional
    FAISS = None  # type: ignore
    OpenAIEmbeddings = None  # type: ignore


[docs] def retrieve_single_query(query: str, query_index: int) -> Dict[str, Any]: """Retrieve contexts for a single query (used in parallel fan-out). This is a simpler version designed for parallel execution via Send API. Each parallel retriever handles one query independently. Parameters ---------- query : str The retrieval query to execute query_index : int Index of this query (for debugging) Returns ------- Dict[str, Any] Dictionary with 'contexts' key containing retrieved docs """ start_time = time.time() if settings.debug: print(f"\n[PARALLEL RETRIEVER {query_index + 1}] Query: {query}") # If no vector store is configured, return empty with error marker if not settings.retrieval_vectorstore_path: if settings.debug: print(f"[PARALLEL RETRIEVER {query_index + 1}] No vector store configured\n") # Only set the flag on the first retriever to avoid duplicates return {"contexts": [], "vectorstore_missing": True if query_index == 0 else None} # Check dependencies if FAISS is None or OpenAIEmbeddings is None: if settings.debug: print(f"[PARALLEL RETRIEVER {query_index + 1}] Dependencies not available\n") # Only set the flag on the first retriever to avoid duplicates return {"contexts": [], "vectorstore_missing": True if query_index == 0 else None} try: # Load the vector store and embeddings embeddings = OpenAIEmbeddings( openai_api_key=settings.openai_api_key, model="text-embedding-3-large" ) vectorstore = FAISS.load_local( settings.retrieval_vectorstore_path, embeddings, allow_dangerous_deserialization=True ) # Search using the query docs = vectorstore.similarity_search(query, k=settings.retrieval_top_k) # Convert documents to dictionaries contexts: List[Dict[str, Any]] = [] for doc in docs: contexts.append({ "text": doc.page_content, "metadata": doc.metadata, "query": query, # Track which query retrieved this "query_index": query_index, # Track which parallel retriever this came from }) elapsed_time = time.time() - start_time if settings.debug: print(f"[PARALLEL RETRIEVER {query_index + 1}] Retrieved {len(contexts)} contexts") print(f"[PARALLEL RETRIEVER {query_index + 1}] Time: {elapsed_time:.2f}s\n") return {"contexts": contexts} except Exception as exc: if settings.debug: print(f"[PARALLEL RETRIEVER {query_index + 1}] ERROR: {exc}\n") import traceback traceback.print_exc() # Mark as missing if we can't load the vector store (only on first retriever) return {"contexts": [], "vectorstore_missing": True if query_index == 0 else None}
[docs] class RetrieverAgent: """Retrieve relevant documentation snippets for the current problem. The retriever constructs a query by concatenating the natural language problem description with a JSON representation of the optimisation parameters. It then executes a similarity search against the configured vector store to return the top ``k`` most relevant chunks. If no vector store is configured or the required dependencies are missing the retriever returns an empty list. """
[docs] @staticmethod @time_node("Retriever Agent") def retrieve_context(state: HonegumiRAGState) -> Dict[str, Any]: """Retrieve documentation contexts based on a specific question. Parameters ---------- state : HonegumiRAGState The current pipeline state containing ``retrieval_query`` with a specific question from the Code Writer agent. Returns ------- Dict[str, Any] A dictionary with keys: - ``contexts``: list of context snippets (empty if retrieval fails) - ``retrieval_count``: incremented counter - ``retrieval_query``: cleared (set to None) """ # If no vector store is configured, return empty if not settings.retrieval_vectorstore_path: return { "contexts": state.get("contexts", []), "retrieval_count": state.get("retrieval_count", 0), "retrieval_query": None, } # Check that the necessary dependencies are present if FAISS is None or OpenAIEmbeddings is None: return { "contexts": state.get("contexts", []), "retrieval_count": state.get("retrieval_count", 0), "retrieval_query": None, "error": "Retrieval dependencies not installed; unable to load vector store." } # Get the specific question from the Code Writer retrieval_query = state.get("retrieval_query") if not retrieval_query: # No query provided, return current state return { "contexts": state.get("contexts", []), "retrieval_count": state.get("retrieval_count", 0), "retrieval_query": None, } retrieval_count = state.get("retrieval_count", 0) existing_contexts = state.get("contexts", []) if settings.debug: # DEBUG: Print retrieval query print("\n" + "="*80) print(f"DEBUG: RETRIEVER PROCESSING QUERY (Attempt {retrieval_count + 1}/3)") print("="*80) print(f"Query: {retrieval_query}") print(f"Existing contexts from state: {len(existing_contexts)}") print("="*80 + "\n") try: # Load the vector store and embeddings # Use text-embedding-3-large to match the vector store creation embeddings = OpenAIEmbeddings( openai_api_key=settings.openai_api_key, model="text-embedding-3-large" ) vectorstore = FAISS.load_local( settings.retrieval_vectorstore_path, embeddings, allow_dangerous_deserialization=True ) # Search using the specific question from Code Writer docs = vectorstore.similarity_search(retrieval_query, k=settings.retrieval_top_k) # Convert documents to dictionaries new_contexts: List[Dict[str, Any]] = [] for doc in docs: new_contexts.append({ "text": doc.page_content, "metadata": doc.metadata, }) # Append new contexts to existing ones (accumulate across retrieval loops) all_contexts = existing_contexts + new_contexts if settings.debug: # DEBUG: Print retrieval results print(f"Retrieved {len(new_contexts)} new context snippets") print(f"Total contexts accumulated: {len(all_contexts)}") print(f"Incrementing retrieval_count from {retrieval_count} to {retrieval_count + 1}\n") return { "contexts": all_contexts, "retrieval_count": retrieval_count + 1, "retrieval_query": None, # Clear the query after use } except Exception as exc: # On error, return current state without incrementing counter if settings.debug: print(f"ERROR in retriever: {exc}") import traceback traceback.print_exc() return { "contexts": existing_contexts, "retrieval_count": retrieval_count, "retrieval_query": None, "error": f"Failed to retrieve contexts: {exc}", }