Building a Research Assistant Agent with Bifrost: A Complete Guide to Tool Calling

Building a Research Assistant Agent with Bifrost: A Complete Guide to Tool Calling

Introduction

Tool calling transforms static AI models into dynamic, action-capable agents. Instead of just generating text, AI models can interact with external systems - search the web, query databases, read files, and execute business logic. In this comprehensive guide, you'll learn how to build a production-ready Research Assistant Agent using Bifrost's Model Context Protocol (MCP) integration.

By the end of this tutorial, you'll have built an agent that can:

  • Search the web for current information
  • Read and analyze files from your filesystem
  • Execute Python code for data analysis
  • Operate with proper governance controls and observability

What is Bifrost?

Bifrost is an open-source LLM gateway built in Go that provides a unified interface for multiple AI providers (OpenAI, Anthropic, Bedrock, and more). It acts as an intelligent routing layer with built-in features like load balancing, semantic caching, governance controls, and comprehensive observability.

Why Use Bifrost for Tool Calling Agents?

  • Security-First Design: Bifrost never automatically executes tool calls - you maintain explicit control over every action
  • Multi-Provider Support: Use any LLM provider with the same tool definitions
  • Built-in Governance: Virtual keys, budget controls, and rate limiting
  • Production-Ready Observability: Request tracing, metrics, and real-time monitoring
  • Zero Code Changes: Drop-in replacement for existing AI SDKs

Prerequisites

Before we begin, ensure you have:

  • Node.js (for NPX installation) or Docker
  • API Keys for at least one AI provider (OpenAI, Anthropic, etc.)
  • Basic understanding of REST APIs and command-line tools
  • Python 3.8+ (optional, for testing code execution tools)

Part 1: Setting Up Bifrost Gateway

Installation

Bifrost offers two installation methods. Choose the one that fits your workflow:

Option 1: NPX (Recommended for Quick Start)

# Install and run Bifrost locally
npx -y @maximhq/bifrost

# Or install a specific version
npx -y @maximhq/bifrost --transport-version v1.3.9

Option 2: Docker

# Pull and run Bifrost
docker pull maximhq/bifrost
docker run -p 8080:8080 maximhq/bifrost

# For configuration persistence across restarts
docker run -p 8080:8080 -v $(pwd)/data:/app/data maximhq/bifrost

Bifrost launches with zero configuration needed. It automatically creates a web interface at http://localhost:8080 where you can configure providers, MCP tools, and monitor requests in real-time.

Understanding Bifrost's Configuration Modes

Bifrost supports two configuration approaches that cannot be used simultaneously:

Mode 1: Web UI Configuration (Recommended for Getting Started)

When no config.json exists, Bifrost automatically creates a SQLite database for configuration storage. This enables:

  • Real-time configuration through the web UI
  • Dynamic updates without restarts
  • Visual provider and tool management
  • Built-in request logging and analytics

Mode 2: File-Based Configuration (For Advanced Users)

Create a config.json file in your app directory for GitOps workflows or when UI is not needed. Without config_store enabled in the file, Bifrost runs in read-only mode and requires restarts for configuration changes.

For this tutorial, we'll use the Web UI approach for easier visualization and real-time feedback.

Configuring Your First Provider

Open http://localhost:8080 in your browser and add your AI provider:

  1. Navigate to Providers in the sidebar
  2. Click Add Provider
  3. Select your provider (e.g., OpenAI)
  4. Add your API key
  5. Configure which models to enable

Via API (Alternative)

curl -X POST <http://localhost:8080/api/providers> \\
  -H "Content-Type: application/json" \\
  -d '{
    "provider": "openai",
    "keys": [
      {
        "name": "openai-key-1",
        "value": "sk-your-actual-api-key-here",
        "models": ["gpt-4o-mini", "gpt-4o"],
        "weight": 1.0
      }
    ]
  }'

Test Your Setup

Verify Bifrost is working with a simple API call:

curl -X POST <http://localhost:8080/v1/chat/completions> \\
  -H "Content-Type: application/json" \\
  -d '{
    "model": "openai/gpt-4o-mini",
    "messages": [{"role": "user", "content": "Hello, Bifrost!"}]
  }'

You should receive a response from the AI model. Notice the model format: openai/gpt-4o-mini - Bifrost uses the pattern provider/model for routing.

Part 2: Understanding MCP (Model Context Protocol)

Before connecting tools, let's understand how Bifrost implements tool calling:

The MCP Architecture

Model Context Protocol (MCP) is an open standard that enables AI models to discover and execute external tools at runtime. Bifrost acts as an MCP client that connects to external MCP servers hosting tools.

Key Security Principle: Bifrost follows a stateless, explicit execution pattern:

  1. Discovery: Bifrost connects to MCP servers and discovers available tools
  2. Integration: Tools are added to the AI model's function calling schema
  3. Suggestion: Chat completions return tool call suggestions (NOT executed)
  4. Execution: Separate API calls explicitly execute approved tool calls
  5. Continuation: Your application manages conversation state

This means Bifrost never automatically executes tool calls. You maintain complete control over which tools run and when.

Supported MCP Connection Types

Bifrost supports three connection protocols:

  • STDIO: Run MCP servers as local processes via command line
  • HTTP: Connect to MCP servers over HTTP/HTTPS
  • SSE: Server-Sent Events for streaming tool responses

For this guide, we'll use STDIO connections as they're easiest to set up and test locally.

Part 3: Building the Research Assistant Agent

Architecture Overview

Our Research Assistant will use three MCP tools:

  1. Filesystem Tool: Read and analyze files
  2. Web Search Tool: Fetch current information
  3. Python Execution Tool: Run code for data analysis

Let's connect each tool to Bifrost and build the complete agent.

Tool 1: Filesystem Access

The filesystem MCP server allows the AI to read, write, and navigate directories.

Connect the Filesystem Tool:

curl -X POST <http://localhost:8080/api/mcp/client> \\
  -H "Content-Type: application/json" \\
  -d '{
    "name": "filesystem",
    "connection_type": "stdio",
    "stdio_config": {
      "command": ["npx", "@modelcontextprotocol/server-filesystem", "/tmp"],
      "args": []
    }
  }'

This configures the filesystem tool with access to the /tmp directory. You can change this path to match your needs, but be cautious about granting broad filesystem access.

Via Web UI:

  1. Go to MCP Clients in the sidebar
  2. Click Add MCP Client
  3. Fill in the configuration:
    • Name: filesystem
    • Connection Type: STDIO
    • Command: npx
    • Args: @modelcontextprotocol/server-filesystem /tmp
  4. Click Create

For this example, we'll assume you have access to a web search MCP server. Many MCP servers are available in the community, including Brave Search, DuckDuckGo, and custom implementations.

Connect the Web Search Tool:

curl -X POST <http://localhost:8080/api/mcp/client> \\
  -H "Content-Type: application/json" \\
  -d '{
    "name": "web-search",
    "connection_type": "http",
    "connection_string": "<http://your-search-mcp-server:8080>"
  }'

Tool 3: Python Execution (Optional but Powerful)

For data analysis capabilities, you can connect a Python execution MCP server.

Security Note: Code execution tools should only be used in controlled environments with proper sandboxing. Never expose code execution to untrusted users.

curl -X POST <http://localhost:8080/api/mcp/client> \\
  -H "Content-Type: application/json" \\
  -d '{
    "name": "python-executor",
    "connection_type": "stdio",
    "stdio_config": {
      "command": ["python", "-m", "mcp_python_server"],
      "args": []
    }
  }'

Verify MCP Client Configuration

List all connected MCP clients to verify your setup:

curl <http://localhost:8080/api/mcp/clients>

You should see all three tools listed with their connection details and available functions.

Part 4: Implementing the Agent Logic

Now that our tools are connected, let's implement the agent's conversation flow.

Understanding the Stateless Tool Flow

Bifrost's tool execution follows a stateless pattern:

1. POST /v1/chat/completions → Get tool call suggestions (stateless)
2. Your App Reviews Tool Calls → Decides which to execute
3. POST /v1/mcp/tool/execute → Execute specific tool calls (stateless)
4. Your App Assembles History → Continue with complete conversation

This pattern ensures explicit control while providing responses optimized for conversation continuity.

Example: Research Assistant Conversation

Here's a complete Python implementation of the agent logic:

import requests
import json
from typing import List, Dict, Any

BIFROST_BASE_URL = "<http://localhost:8080>"

class ResearchAssistant:
    def __init__(self, model: str = "openai/gpt-4o-mini"):
        self.model = model
        self.conversation_history: List[Dict[str, Any]] = []

    def chat(self, user_message: str) -> str:
        """Send a message and handle tool execution automatically."""

        # Add user message to history
        self.conversation_history.append({
            "role": "user",
            "content": user_message
        })

        # Step 1: Get AI response (possibly with tool calls)
        response = self._make_completion_request()
        assistant_message = response["choices"][0]["message"]

        # Step 2: Check if AI wants to use tools
        if "tool_calls" in assistant_message and assistant_message["tool_calls"]:
            print(f"🔧 AI wants to use {len(assistant_message['tool_calls'])} tools")

            # Add assistant's tool call request to history
            self.conversation_history.append(assistant_message)

            # Step 3: Execute each tool call
            for tool_call in assistant_message["tool_calls"]:
                print(f"   Executing: {tool_call['function']['name']}")
                tool_result = self._execute_tool(tool_call)

                # Add tool result to history
                self.conversation_history.append({
                    "role": "tool",
                    "tool_call_id": tool_call["id"],
                    "name": tool_call["function"]["name"],
                    "content": json.dumps(tool_result)
                })

            # Step 4: Get final response with tool results
            response = self._make_completion_request()
            assistant_message = response["choices"][0]["message"]

        # Add final assistant response to history
        self.conversation_history.append({
            "role": "assistant",
            "content": assistant_message["content"]
        })

        return assistant_message["content"]

    def _make_completion_request(self) -> Dict[str, Any]:
        """Make a chat completion request to Bifrost."""
        response = requests.post(
            f"{BIFROST_BASE_URL}/v1/chat/completions",
            headers={"Content-Type": "application/json"},
            json={
                "model": self.model,
                "messages": self.conversation_history
            }
        )
        response.raise_for_status()
        return response.json()

    def _execute_tool(self, tool_call: Dict[str, Any]) -> Dict[str, Any]:
        """Execute a single tool call via Bifrost's MCP endpoint."""
        response = requests.post(
            f"{BIFROST_BASE_URL}/v1/mcp/tool/execute",
            headers={"Content-Type": "application/json"},
            json={
                "tool_call": tool_call
            }
        )
        response.raise_for_status()
        return response.json()

# Usage Example
if __name__ == "__main__":
    assistant = ResearchAssistant()

    # Example: Research query that requires multiple tools
    response = assistant.chat(
        "Can you search for the latest news about AI safety, "
        "then save a summary to /tmp/ai_safety_summary.txt?"
    )

    print(f"\\n🤖 Assistant: {response}")

    # Continue the conversation
    response = assistant.chat(
        "Now read that file and tell me the key points"
    )

    print(f"\\n🤖 Assistant: {response}")

How the Agent Works

  1. User Query: The user asks a question that requires external tools
  2. AI Analysis: Bifrost forwards the request to the LLM with all available tools in the schema
  3. Tool Suggestions: The LLM responds with structured tool calls (NOT executed)
  4. Explicit Execution: Your code reviews and executes approved tools via /v1/mcp/tool/execute
  5. Result Integration: Tool results are added to conversation history
  6. Final Response: The LLM generates a natural language response using tool results

Testing the Agent

Let's test with a real research query:

assistant = ResearchAssistant(model="openai/gpt-4o-mini")

# Complex multi-step research task
response = assistant.chat("""
I need to research recent developments in quantum computing.
1. Search for the latest news about quantum computing breakthroughs
2. Save the top 3 findings to a file called quantum_research.txt
3. Analyze the findings and tell me which one is most significant
""")

print(response)

The agent will automatically:

  • Use the web search tool to find recent news
  • Use the filesystem tool to save results
  • Analyze and synthesize the information into a coherent response

Part 5: Adding Governance & Security

Production agents need proper access controls, budget management, and rate limiting. Bifrost provides comprehensive governance through Virtual Keys.

Understanding Virtual Keys

Virtual Keys (VKs) are Bifrost's primary governance mechanism. They provide:

  • Access Control: Specify which providers and models can be used
  • Budget Management: Set spending limits with automatic resets
  • Rate Limiting: Control token and request rates
  • Tool Filtering: Restrict which MCP tools are available

Creating a Virtual Key for the Research Agent

Via Web UI:

  1. Navigate to Virtual Keys
  2. Click Add Virtual Key
  3. Configure:
    • Name: research-assistant-key
    • Allowed Providers: OpenAI (50% weight), Anthropic (50% weight)
    • Allowed Models: gpt-4o-mini, claude-3-sonnet
    • Budget: $50.00 per month
    • Rate Limits: 10,000 tokens/hour, 100 requests/minute
  4. Click Create

Via API:

curl -X POST <http://localhost:8080/api/governance/virtual-keys> \\
  -H "Content-Type: application/json" \\
  -d '{
    "name": "research-assistant-key",
    "description": "Governance key for research assistant agent",
    "provider_configs": [
      {
        "provider": "openai",
        "weight": 0.5,
        "allowed_models": ["gpt-4o-mini"]
      },
      {
        "provider": "anthropic",
        "weight": 0.5,
        "allowed_models": ["claude-3-sonnet-20240229"]
      }
    ],
    "budget": {
      "max_limit": 50.00,
      "reset_duration": "1M"
    },
    "rate_limit": {
      "token_max_limit": 10000,
      "token_reset_duration": "1h",
      "request_max_limit": 100,
      "request_reset_duration": "1m"
    },
    "is_active": true
  }'

This creates a virtual key with ID format sk-bf-* that you'll use in requests.

Restricting MCP Tools per Virtual Key

Control which tools the research agent can access:

curl -X PUT <http://localhost:8080/api/governance/virtual-keys/{vk_id}> \\
  -H "Content-Type: application/json" \\
  -d '{
    "mcp_configs": [
      {
        "mcp_client_name": "filesystem",
        "tools_to_execute": ["read_file", "write_file"]
      },
      {
        "mcp_client_name": "web-search",
        "tools_to_execute": ["*"]
      }
    ]
  }'

This configuration:

  • Allows only read_file and write_file from the filesystem tool
  • Allows all tools from web-search (using wildcard)
  • Blocks all other MCP clients not listed

Using Virtual Keys in Your Agent

Update your agent code to include the virtual key header:

class ResearchAssistant:
    def __init__(self, model: str = "openai/gpt-4o-mini", virtual_key: str = None):
        self.model = model
        self.virtual_key = virtual_key
        self.conversation_history: List[Dict[str, Any]] = []

    def _get_headers(self) -> Dict[str, str]:
        """Get request headers including virtual key if provided."""
        headers = {"Content-Type": "application/json"}
        if self.virtual_key:
            headers["x-bf-vk"] = self.virtual_key
        return headers

    def _make_completion_request(self) -> Dict[str, Any]:
        """Make a chat completion request to Bifrost."""
        response = requests.post(
            f"{BIFROST_BASE_URL}/v1/chat/completions",
            headers=self._get_headers(),
            json={
                "model": self.model,
                "messages": self.conversation_history
            }
        )
        response.raise_for_status()
        return response.json()

# Usage with governance
assistant = ResearchAssistant(
    model="openai/gpt-4o-mini",
    virtual_key="sk-bf-your-virtual-key-here"
)

Making Virtual Keys Mandatory

For production environments, enforce that all requests must include a virtual key:

Via Web UI:

  1. Go to ConfigSecurity
  2. Enable Enforce Virtual Keys

Via API:

curl -X PUT <http://localhost:8080/api/config> \\
  -H "Content-Type: application/json" \\
  -d '{
    "client_config": {
      "enforce_governance_header": true
    }
  }'

Now any request without a virtual key will be rejected with a 400 error.

Handling Governance Errors

Update your agent to handle governance-related errors gracefully:

def chat(self, user_message: str) -> str:
    """Send a message with error handling for governance."""
    try:
        # Add user message to history
        self.conversation_history.append({
            "role": "user",
            "content": user_message
        })

        # Get AI response
        response = self._make_completion_request()
        # ... rest of the logic

    except requests.exceptions.HTTPError as e:
        if e.response.status_code == 429:
            # Rate limit exceeded
            error_data = e.response.json()
            return f"⚠️ Rate limit exceeded: {error_data['error']['message']}"
        elif e.response.status_code == 402:
            # Budget exceeded
            error_data = e.response.json()
            return f"⚠️ Budget exceeded: {error_data['error']['message']}"
        elif e.response.status_code == 403:
            # Model or provider blocked
            error_data = e.response.json()
            return f"⚠️ Access denied: {error_data['error']['message']}"
        else:
            raise

Common governance error codes:

  • 400: Virtual key required but not provided
  • 402: Budget limit exceeded
  • 403: Model/provider/tool not allowed
  • 429: Rate limit exceeded (token or request)

Part 6: Observability & Monitoring

Production agents require comprehensive monitoring to track performance, debug issues, and understand usage patterns.

Built-in Request Tracing

Bifrost automatically captures detailed information about every request when logging is enabled. This includes:

Request Data:

  • Complete conversation history
  • Model parameters (temperature, max_tokens, etc.)
  • Provider and model used

Response Data:

  • AI responses and tool calls
  • Performance metrics (latency, tokens)
  • Success or error details

Tool Execution Data:

  • Which tools were called
  • Tool arguments and results
  • Tool execution latency

Enabling Observability

Via Web UI:

  1. Navigate to Settings
  2. Toggle Enable Logs

Via API:

curl -X PUT <http://localhost:8080/api/config> \\
  -H "Content-Type: application/json" \\
  -d '{
    "client_config": {
      "enable_logging": true,
      "disable_content_logging": false
    }
  }'

Setting disable_content_logging: true logs only metadata (latency, cost, tokens) without request/response content - useful for privacy-sensitive applications.

Accessing Logs via Web UI

Open http://localhost:8080 and navigate to the Logs section. You'll see:

  • Real-time log streaming of all requests
  • Advanced filtering by provider, model, status, time range
  • Detailed inspection of individual requests with full conversation history
  • Performance analytics showing token usage, costs, and latency trends

Querying Logs Programmatically

Use the logs API to build custom dashboards or analytics:

import requests
from datetime import datetime, timedelta

def get_agent_metrics(start_time: datetime, end_time: datetime):
    """Fetch research assistant metrics for a time period."""

    response = requests.get(
        f"{BIFROST_BASE_URL}/api/logs",
        params={
            "start_time": start_time.isoformat(),
            "end_time": end_time.isoformat(),
            "status": "success",
            "limit": 1000
        }
    )

    data = response.json()

    return {
        "total_requests": data["stats"]["total_requests"],
        "success_rate": data["stats"]["success_rate"],
        "average_latency": data["stats"]["average_latency"],
        "total_tokens": data["stats"]["total_tokens"],
        "total_cost": data["stats"]["total_cost"]
    }

# Get last 24 hours of metrics
end_time = datetime.now()
start_time = end_time - timedelta(days=1)

metrics = get_agent_metrics(start_time, end_time)
print(f"Agent Performance (Last 24h):")
print(f"  Requests: {metrics['total_requests']}")
print(f"  Success Rate: {metrics['success_rate']*100:.1f}%")
print(f"  Avg Latency: {metrics['average_latency']}ms")
print(f"  Total Cost: ${metrics['total_cost']:.2f}")

Real-time Monitoring with WebSockets

Subscribe to live log updates for real-time monitoring:

const ws = new WebSocket('ws://localhost:8080/ws');

ws.onmessage = (event) => {
  const logUpdate = JSON.parse(event.data);

  console.log(`New Request: ${logUpdate.model}`);
  console.log(`Latency: ${logUpdate.latency}ms`);
  console.log(`Tokens: ${logUpdate.total_tokens}`);
  console.log(`Cost: $${logUpdate.cost}`);

  // Trigger alerts or update dashboards
  if (logUpdate.latency > 5000) {
    alert('High latency detected!');
  }
};

Cost Tracking and Budgets

Monitor spending in real-time and set up alerts:

def check_budget_status(virtual_key_id: str):
    """Check current budget usage for a virtual key."""

    response = requests.get(
        f"{BIFROST_BASE_URL}/api/governance/virtual-keys/{virtual_key_id}"
    )

    vk_data = response.json()
    budget = vk_data["budget"]

    usage_percent = (budget["current_usage"] / budget["max_limit"]) * 100

    print(f"Budget Status:")
    print(f"  Used: ${budget['current_usage']:.2f}")
    print(f"  Limit: ${budget['max_limit']:.2f}")
    print(f"  Remaining: ${budget['max_limit'] - budget['current_usage']:.2f}")
    print(f"  Usage: {usage_percent:.1f}%")

    if usage_percent > 80:
        print("⚠️  WARNING: Budget is over 80% used!")

    return budget

# Check budget before running expensive operations
budget = check_budget_status("vk-id-here")
if budget["current_usage"] < budget["max_limit"] * 0.9:
    # Safe to proceed
    response = assistant.chat("Perform complex research task...")

Part 7: Advanced Features

Semantic Caching for Cost Reduction

Bifrost includes semantic caching to reduce costs and improve latency for similar queries:

import hashlib

class CachedResearchAssistant(ResearchAssistant):
    def __init__(self, model: str, virtual_key: str, cache_key_prefix: str):
        super().__init__(model, virtual_key)
        self.cache_key_prefix = cache_key_prefix

    def _get_cache_key(self, user_message: str) -> str:
        """Generate a cache key for semantic caching."""
        # Use a session or user ID for the cache key
        return f"{self.cache_key_prefix}-{hashlib.md5(user_message.encode()).hexdigest()[:8]}"

    def _get_headers(self) -> Dict[str, str]:
        """Get headers with cache key for semantic caching."""
        headers = super()._get_headers()

        # Add cache headers if we have user messages
        if self.conversation_history:
            last_message = self.conversation_history[-1]["content"]
            headers["x-bf-cache-key"] = self._get_cache_key(last_message)
            headers["x-bf-cache-threshold"] = "0.85"  # 85% similarity threshold
            headers["x-bf-cache-ttl"] = "1h"  # Cache for 1 hour

        return headers

# Usage - identical queries will use cached responses
assistant = CachedResearchAssistant(
    model="openai/gpt-4o-mini",
    virtual_key="sk-bf-your-key",
    cache_key_prefix="research-session-123"
)

# First call - hits the LLM
response1 = assistant.chat("What are the latest developments in quantum computing?")

# Similar query - uses semantic cache (much faster, no cost)
response2 = assistant.chat("Tell me about recent quantum computing breakthroughs")

Semantic caching can reduce costs by 60-80% for applications with repeated or similar queries.

Provider Fallbacks and Load Balancing

Configure automatic failover between providers:

# Your virtual key configuration already handles this
# With multiple provider configs, Bifrost automatically:
# 1. Load balances based on weights
# 2. Falls back if primary provider fails
# 3. Retries with exponential backoff

# Example: OpenAI primary with Anthropic fallback
{
    "provider_configs": [
        {
            "provider": "openai",
            "weight": 0.7,  # 70% of traffic
            "allowed_models": ["gpt-4o-mini"]
        },
        {
            "provider": "anthropic",
            "weight": 0.3,  # 30% of traffic (automatic fallback)
            "allowed_models": ["claude-3-sonnet-20240229"]
        }
    ]
}

Multi-Turn Conversations with Context

Manage long-running research sessions:

class ResearchSession:
    def __init__(self, session_id: str):
        self.session_id = session_id
        self.assistant = ResearchAssistant(
            model="openai/gpt-4o-mini",
            virtual_key="sk-bf-your-key"
        )

    def research(self, query: str) -> str:
        """Execute a research query with full context."""
        return self.assistant.chat(query)

    def get_conversation_summary(self) -> str:
        """Get a summary of the research session."""
        summary_prompt = """
        Please provide a concise summary of our research session so far,
        including the key questions asked, tools used, and main findings.
        """
        return self.assistant.chat(summary_prompt)

    def save_session(self, filepath: str):
        """Save the conversation history for later."""
        import json
        with open(filepath, 'w') as f:
            json.dump({
                'session_id': self.session_id,
                'history': self.assistant.conversation_history
            }, f, indent=2)

    def load_session(self, filepath: str):
        """Restore a previous conversation."""
        import json
        with open(filepath, 'r') as f:
            data = json.load(f)
            self.assistant.conversation_history = data['history']

# Usage
session = ResearchSession("quantum-research-jan-2025")

# Multi-turn research with context preservation
session.research("Find recent quantum computing papers")
session.research("Which of those papers mentions error correction?")
session.research("Summarize the error correction approaches")

# Save for later
session.save_session("/tmp/quantum_research_session.json")

# Get summary
summary = session.get_conversation_summary()
print(summary)

Part 8: Production Best Practices

Security Considerations

  1. Never expose filesystem tools with broad access
    • Limit to specific directories
    • Use read-only access when possible
    • Validate all file paths
  2. Implement tool approval workflows
    • Review tool calls before execution for sensitive operations
    • Add human-in-the-loop for destructive actions
    • Log all tool executions with full context
  3. Use environment-specific virtual keys
    • Development keys with relaxed limits
    • Staging keys with moderate limits
    • Production keys with strict governance
  4. Secure your API keys
    • Never commit keys to version control
    • Use environment variables or secrets management
    • Rotate keys regularly

Error Handling and Resilience

import time
from typing import Optional

class ResilientResearchAssistant(ResearchAssistant):
    def __init__(self, model: str, virtual_key: str, max_retries: int = 3):
        super().__init__(model, virtual_key)
        self.max_retries = max_retries

    def _make_completion_request_with_retry(self) -> Dict[str, Any]:
        """Make a request with exponential backoff retry."""
        for attempt in range(self.max_retries):
            try:
                return self._make_completion_request()
            except requests.exceptions.HTTPError as e:
                if e.response.status_code == 429:
                    # Rate limited - wait and retry
                    wait_time = 2 ** attempt  # Exponential backoff
                    print(f"Rate limited. Retrying in {wait_time}s...")
                    time.sleep(wait_time)
                    continue
                elif e.response.status_code >= 500:
                    # Server error - retry
                    wait_time = 2 ** attempt
                    print(f"Server error. Retrying in {wait_time}s...")
                    time.sleep(wait_time)
                    continue
                else:
                    # Client error - don't retry
                    raise

        raise Exception("Max retries exceeded")

    def chat(self, user_message: str) -> Optional[str]:
        """Send a message with comprehensive error handling."""
        try:
            return super().chat(user_message)
        except Exception as e:
            print(f"Error in chat: {e}")
            # Log error for debugging
            # Return graceful fallback
            return "I encountered an error processing your request. Please try again."

Performance Optimization

  1. Use appropriate models for tasks
    • Fast models (gpt-4o-mini) for simple queries
    • Powerful models (gpt-4o) for complex analysis
    • Switch models dynamically based on complexity
  2. Batch similar requests
    • Group related tool calls
    • Execute in parallel where possible
  3. Implement request queuing
    • Handle burst traffic gracefully
    • Respect rate limits proactively
  4. Monitor and optimize token usage
    • Track prompt and completion tokens
    • Optimize prompts to reduce costs
    • Use semantic caching for repeated queries

Deployment Configurations

Development:

{
  "client": {
    "enable_logging": true,
    "enable_governance": false,
    "drop_excess_requests": false
  }
}

Staging:

{
  "client": {
    "enable_logging": true,
    "enable_governance": true,
    "enforce_governance_header": true,
    "drop_excess_requests": false
  }
}

Production:

{
  "client": {
    "enable_logging": true,
    "disable_content_logging": true,
    "enable_governance": true,
    "enforce_governance_header": true,
    "drop_excess_requests": true
  }
}

Part 9: Testing Your Agent

Unit Testing Tool Execution

import unittest
from unittest.mock import patch, MagicMock

class TestResearchAssistant(unittest.TestCase):
    def setUp(self):
        self.assistant = ResearchAssistant(
            model="openai/gpt-4o-mini",
            virtual_key="test-key"
        )

    @patch('requests.post')
    def test_tool_execution(self, mock_post):
        """Test that tools are executed correctly."""
        # Mock the completion response with tool call
        mock_post.return_value.json.return_value = {
            "choices": [{
                "message": {
                    "role": "assistant",
                    "tool_calls": [{
                        "id": "call_123",
                        "type": "function",
                        "function": {
                            "name": "read_file",
                            "arguments": '{"path": "/tmp/test.txt"}'
                        }
                    }]
                }
            }]
        }

        # Test the chat method
        response = self.assistant.chat("Read the test file")

        # Verify tool execution was called
        self.assertTrue(mock_post.called)
        self.assertEqual(mock_post.call_count, 2)  # Initial + after tool execution

    def test_error_handling(self):
        """Test governance error handling."""
        with patch('requests.post') as mock_post:
            # Mock a budget exceeded error
            mock_response = MagicMock()
            mock_response.status_code = 402
            mock_response.json.return_value = {
                "error": {
                    "type": "budget_exceeded",
                    "message": "Budget exceeded"
                }
            }
            mock_post.return_value = mock_response
            mock_post.return_value.raise_for_status.side_effect = \\
                requests.exceptions.HTTPError(response=mock_response)

            # Should handle error gracefully
            response = self.assistant.chat("Test query")
            self.assertIn("Budget exceeded", response)

if __name__ == '__main__':
    unittest.run()

Integration Testing

def test_full_research_workflow():
    """Integration test for complete research workflow."""
    assistant = ResearchAssistant(
        model="openai/gpt-4o-mini",
        virtual_key=os.getenv("BIFROST_VK")
    )

    # Test multi-step research
    response = assistant.chat("""
    Research the latest Python release:
    1. Find the current version
    2. Save it to /tmp/python_version.txt
    3. Read it back and confirm
    """)

    assert "Python" in response
    assert os.path.exists("/tmp/python_version.txt")

    # Verify file content
    with open("/tmp/python_version.txt", 'r') as f:
        content = f.read()
        assert "3." in content  # Python 3.x version

    print("✅ Integration test passed!")

if __name__ == "__main__":
    test_full_research_workflow()

Conclusion

You've now built a production-ready Research Assistant Agent with Bifrost that demonstrates:

Core Capabilities:

  • ✅ Multi-tool integration via MCP (filesystem, web search, code execution)
  • ✅ Stateless, explicit tool execution pattern
  • ✅ Natural language conversation flow with context preservation
  • ✅ Error handling and resilience

Production Features:

  • ✅ Governance with virtual keys, budgets, and rate limits
  • ✅ Tool filtering and access control
  • ✅ Comprehensive observability and monitoring
  • ✅ Semantic caching for cost reduction
  • ✅ Provider fallbacks and load balancing

Security:

  • ✅ Explicit tool execution (no automatic execution)
  • ✅ Granular access control per virtual key
  • ✅ Budget and rate limiting
  • ✅ Complete audit trail of all operations

Next Steps

  1. Explore Additional MCP Servers: The MCP ecosystem includes tools for databases, APIs, cloud services, and more
  2. Implement Agent Mode: Bifrost supports automatic tool execution for trusted tools with tools_to_auto_execute
  3. Try Code Mode: For 3+ MCP servers, use Code Mode to reduce token usage by 50%+
  4. Deploy to Production: Use Kubernetes deployment guides for scaling
  5. Add Custom Tools: Build your own MCP servers for business-specific functionality

Resources


Ready to build more advanced agents? Explore Bifrost's enterprise features including guardrails, clustering, adaptive load balancing, and federated authentication for MCP tools. Visit getmaxim.ai/bifrost/enterprise for a free 14-day trial.