Building an AI Product Review Analyzer: Structured Outputs with Together AI and Maxim Observability

Building an AI Product Review  Analyzer: Structured Outputs with Together AI and Maxim Observability

In today's data-driven world, businesses need to extract structured insights from unstructured text at scale. Whether it's analyzing customer reviews, processing support tickets, or extracting key information from documents, the ability to get consistent, structured outputs from Large Language Models (LLMs) has become crucial.

In this comprehensive guide, we'll build a Product Review Analyzer that transforms messy customer reviews into structured data using Together AI's JSON mode, while implementing robust observability with Maxim to monitor our AI application's performance.

Learn about the Single Line Integration by Maxim for Together AI -

Why Structured Outputs Matter

Traditional LLM responses are often inconsistent in format, making them difficult to process programmatically. Structured outputs solve this by ensuring responses follow a predefined schema.

Our Use Case: E-commerce Review Analysis

Imagine you're running an e-commerce platform with thousands of product reviews. You need to:

  1. Extract sentiment (positive/negative/neutral)
  2. Identify key themes (quality, shipping, customer service)
  3. Assign priority scores for customer service follow-up
  4. Categorize feedback by product aspects
  5. Generate actionable insights for product teams

Let's build this step by step!

Resources

Cookbook showing the single line Maxim Integration for Together AI -

maxim-cookbooks/python/observability-online-eval/together/example.ipynb at main Β· maximhq/maxim-cookbooks
Maxim is an end-to-end AI evaluation and observability platform that empowers modern AI teams to ship agents with quality, reliability, and speed. - maximhq/maxim-cookbooks

Step 1: Project Setup and Dependencies

First, let's set up our project with the necessary dependencies:

pip install together python-dotenv maxim-py

Create a .env file with your API keys:

TOGETHER_API_KEY=your_together_api_key_here
MAXIM_API_KEY=your_maxim_api_key_here
MAXIM_LOG_REPO_ID=your_maxim_log_repo_id_here

Step 2: Basic Configuration and Imports

import os
import json
import asyncio
from datetime import datetime
from typing import List, Dict, Optional
from dataclasses import dataclass
from together import Together, AsyncTogether
from dotenv import load_dotenv
from maxim import Maxim
from maxim.logger.together import instrument_together

# Load environment variables
load_dotenv()
TOGETHER_API_KEY = os.getenv('TOGETHER_API_KEY')
MAXIM_API_KEY = os.getenv('MAXIM_API_KEY')
MAXIM_LOG_REPO_ID = os.getenv('MAXIM_LOG_REPO_ID')

# Configure Maxim for observability
instrument_together(Maxim().logger())

# Initialize Together AI client
client = Together(api_key=TOGETHER_API_KEY)

Step 3: Define Our Structured Output Schema

The key to successful structured outputs is defining a clear, comprehensive JSON schema. For our review analyzer, we'll create a schema that captures all the insights we need:

REVIEW_ANALYSIS_SCHEMA = {
    "type": "object",
    "properties": {
        "sentiment": {
            "type": "object",
            "properties": {
                "overall": {
                    "type": "string",
                    "enum": ["positive", "negative", "neutral"]
                },
                "confidence": {
                    "type": "number",
                    "minimum": 0,
                    "maximum": 1,
                    "description": "Confidence score for sentiment classification"
                }
            },
            "required": ["overall", "confidence"]
        },
        "themes": {
            "type": "array",
            "items": {
                "type": "object",
                "properties": {
                    "category": {
                        "type": "string",
                        "enum": ["product_quality", "shipping", "customer_service", "pricing", "user_experience", "packaging", "other"]
                    },
                    "sentiment": {
                        "type": "string",
                        "enum": ["positive", "negative", "neutral"]
                    },
                    "keywords": {
                        "type": "array",
                        "items": {"type": "string"}
                    },
                    "importance": {
                        "type": "number",
                        "minimum": 1,
                        "maximum": 5,
                        "description": "Importance score (1-5) of this theme"
                    }
                },
                "required": ["category", "sentiment", "keywords", "importance"]
            },
            "minItems": 1,
            "maxItems": 10
        },
        "priority_score": {
            "type": "number",
            "minimum": 1,
            "maximum": 10,
            "description": "Priority score for customer service follow-up (1=low, 10=urgent)"
        },
        "actionable_insights": {
            "type": "array",
            "items": {
                "type": "object",
                "properties": {
                    "insight": {"type": "string"},
                    "suggested_action": {"type": "string"},
                    "department": {
                        "type": "string",
                        "enum": ["product_team", "customer_service", "shipping", "marketing", "management"]
                    }
                },
                "required": ["insight", "suggested_action", "department"]
            },
            "maxItems": 5
        },
        "key_phrases": {
            "type": "array",
            "items": {"type": "string"},
            "maxItems": 10,
            "description": "Important phrases or quotes from the review"
        }
    },
    "required": ["sentiment", "themes", "priority_score", "actionable_insights", "key_phrases"]
}

# Data class for type safety
@dataclass
class ReviewAnalysis:
    sentiment: Dict
    themes: List[Dict]
    priority_score: int
    actionable_insights: List[Dict]
    key_phrases: List[str]
    raw_response: str
    processing_time: float

Step 4: Building the Review Analyzer

Now let's create our main analyzer class with proper error handling and observability:

class ProductReviewAnalyzer:
    def __init__(self, model="meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo"):
        self.client = Together(api_key=TOGETHER_API_KEY)
        self.async_client = AsyncTogether(api_key=TOGETHER_API_KEY)
        self.model = model
        self.maxim_client = maxim_client
    
    def _create_analysis_prompt(self, review_text: str, product_name: str = None) -> str:
        """Create a detailed prompt for review analysis."""
        product_context = f" for the product '{product_name}'" if product_name else ""
        
        return f"""You are an expert e-commerce analyst. Analyze this customer review{product_context} and provide a comprehensive structured analysis.

IMPORTANT: Respond ONLY in valid JSON format following the exact schema provided.

Customer Review:
"{review_text}"

Analyze this review and provide:
1. Overall sentiment with confidence score
2. Key themes with categories, sentiment, keywords, and importance scores
3. Priority score for customer service follow-up (1-10, where 10 is urgent)
4. Actionable insights with suggested actions for relevant departments
5. Important key phrases from the review

Be thorough in your analysis and ensure all fields are properly filled according to the schema."""

    async def analyze_review_async(self, review_text: str, product_name: str = None) -> ReviewAnalysis:
        """Analyze a single review asynchronously with structured output."""
        start_time = datetime.now()
        
        try:
            prompt = self._create_analysis_prompt(review_text, product_name)
            
            # Log the request to Maxim
            self.maxim_client.logger().log_event("review_analysis_start", {
                "review_length": len(review_text),
                "product_name": product_name,
                "model": self.model
            })
            
            response = await self.async_client.chat.completions.create(
                model=self.model,
                messages=[
                    {"role": "system", "content": "You are an expert e-commerce data analyst. Always respond in valid JSON format only."},
                    {"role": "user", "content": prompt}
                ],
                response_format={
                    "type": "json_object",
                    "schema": REVIEW_ANALYSIS_SCHEMA
                },
                temperature=0.3,  # Lower temperature for more consistent outputs
                max_tokens=2000
            )
            
            processing_time = (datetime.now() - start_time).total_seconds()
            raw_response = response.choices[0].message.content
            
            # Parse and validate the JSON response
            parsed_data = json.loads(raw_response)
            
            # Log successful analysis
            self.maxim_client.logger().log_event("review_analysis_success", {
                "processing_time": processing_time,
                "sentiment": parsed_data["sentiment"]["overall"],
                "num_themes": len(parsed_data["themes"]),
                "priority_score": parsed_data["priority_score"]
            })
            
            return ReviewAnalysis(
                sentiment=parsed_data["sentiment"],
                themes=parsed_data["themes"],
                priority_score=parsed_data["priority_score"],
                actionable_insights=parsed_data["actionable_insights"],
                key_phrases=parsed_data["key_phrases"],
                raw_response=raw_response,
                processing_time=processing_time
            )
            
        except json.JSONDecodeError as e:
            self.maxim_client.logger().log_event("json_parse_error", {
                "error": str(e),
                "raw_response": response.choices[0].message.content if 'response' in locals() else "No response"
            })
            raise ValueError(f"Failed to parse JSON response: {e}")
        
        except Exception as e:
            self.maxim_client.logger().log_event("analysis_error", {
                "error_type": type(e).__name__,
                "error_message": str(e)
            })
            raise

    def analyze_review(self, review_text: str, product_name: str = None) -> ReviewAnalysis:
        """Synchronous wrapper for review analysis."""
        return asyncio.run(self.analyze_review_async(review_text, product_name))

    async def batch_analyze_reviews(self, reviews: List[Dict[str, str]], max_concurrent: int = 5) -> List[ReviewAnalysis]:
        """Analyze multiple reviews concurrently."""
        semaphore = asyncio.Semaphore(max_concurrent)
        
        async def analyze_with_semaphore(review_data):
            async with semaphore:
                return await self.analyze_review_async(
                    review_data["text"], 
                    review_data.get("product_name")
                )
        
        tasks = [analyze_with_semaphore(review) for review in reviews]
        return await asyncio.gather(*tasks, return_exceptions=True)

Step 5: Practical Examples and Testing

Let's test our analyzer with real-world examples:

# Sample reviews for testing
SAMPLE_REVIEWS = [
    {
        "text": "This laptop is absolutely fantastic! The build quality is exceptional and the battery lasts all day. Shipping was super fast, arrived in just 2 days. The customer service team was also very helpful when I had questions about the warranty. Highly recommend!",
        "product_name": "UltraBook Pro 15"
    },
    {
        "text": "Very disappointed with this purchase. The product arrived damaged and the packaging was terrible. When I contacted customer service, they were unhelpful and rude. It took 3 weeks to get a replacement. The product itself is okay but the experience was awful.",
        "product_name": "SmartWatch Series 5"
    },
    {
        "text": "Good value for money. The product works as expected, nothing special but does the job. Shipping was on time. Would buy again if the price stays competitive.",
        "product_name": "Wireless Headphones"
    }
]

async def demo_analysis():
    analyzer = ProductReviewAnalyzer()
    
    print("πŸš€ Starting Product Review Analysis Demo\n")
    
    # Analyze reviews individually
    for i, review_data in enumerate(SAMPLE_REVIEWS, 1):
        print(f"πŸ“ Analyzing Review #{i} for {review_data['product_name']}...")
        print(f"Review: {review_data['text'][:100]}...\n")
        
        try:
            result = await analyzer.analyze_review_async(
                review_data["text"], 
                review_data["product_name"]
            )
            
            print("πŸ“Š Analysis Results:")
            print(f"   Sentiment: {result.sentiment['overall'].upper()} (confidence: {result.sentiment['confidence']:.2f})")
            print(f"   Priority Score: {result.priority_score}/10")
            print(f"   Processing Time: {result.processing_time:.2f}s")
            print(f"   Themes Found: {len(result.themes)}")
            
            print("\n🎯 Key Themes:")
            for theme in result.themes:
                print(f"   β€’ {theme['category'].replace('_', ' ').title()}: {theme['sentiment']} (importance: {theme['importance']}/5)")
                print(f"     Keywords: {', '.join(theme['keywords'])}")
            
            print("\nπŸ’‘ Actionable Insights:")
            for insight in result.actionable_insights:
                print(f"   β€’ {insight['department'].replace('_', ' ').title()}: {insight['insight']}")
                print(f"     Action: {insight['suggested_action']}")
            
            print(f"\nπŸ”‘ Key Phrases: {', '.join(result.key_phrases)}")
            print("="*80 + "\n")
            
        except Exception as e:
            print(f"❌ Error analyzing review: {e}\n")

    # Demonstrate batch processing
    print("πŸ”„ Running Batch Analysis...")
    batch_results = await analyzer.batch_analyze_reviews(SAMPLE_REVIEWS)
    
    successful_analyses = [r for r in batch_results if isinstance(r, ReviewAnalysis)]
    print(f"βœ… Successfully analyzed {len(successful_analyses)} out of {len(SAMPLE_REVIEWS)} reviews")
    
    # Calculate aggregate metrics
    if successful_analyses:
        avg_processing_time = sum(r.processing_time for r in successful_analyses) / len(successful_analyses)
        sentiment_distribution = {}
        for result in successful_analyses:
            sentiment = result.sentiment['overall']
            sentiment_distribution[sentiment] = sentiment_distribution.get(sentiment, 0) + 1
        
        print(f"πŸ“ˆ Batch Processing Metrics:")
        print(f"   Average Processing Time: {avg_processing_time:.2f}s")
        print(f"   Sentiment Distribution: {sentiment_distribution}")

# Run the demo
if __name__ == "__main__":
    asyncio.run(demo_analysis())

Step 6: Integration with Business Workflows

Here's how to integrate this into a typical e-commerce workflow:

class EcommerceIntegration:
    def __init__(self):
        self.analyzer = ProductionReviewAnalyzer()
        self.alerts_threshold = 8  # Priority score threshold for alerts
    
    async def process_new_review(self, review_data: Dict) -> Dict:
        """Process a new review and trigger appropriate workflows."""
        try:
            analysis = await self.analyzer.analyze_with_retry(
                review_data["text"], 
                review_data.get("product_name")
            )
            
            # Store in database (pseudo-code)
            review_id = self._save_analysis_to_db(review_data["id"], analysis)
            
            # Trigger alerts for high-priority issues
            if analysis.priority_score >= self.alerts_threshold:
                await self._send_priority_alert(review_data, analysis)
            
            # Update product metrics
            self._update_product_metrics(review_data.get("product_id"), analysis)
            
            # Generate automated responses for positive reviews
            if analysis.sentiment["overall"] == "positive" and analysis.sentiment["confidence"] > 0.8:
                await self._queue_thank_you_response(review_data["customer_id"], analysis)
            
            return {
                "status": "success",
                "review_id": review_id,
                "analysis": {
                    "sentiment": analysis.sentiment,
                    "priority_score": analysis.priority_score,
                    "processing_time": analysis.processing_time
                }
            }
            
        except Exception as e:
            return {"status": "error", "message": str(e)}
    
    async def _send_priority_alert(self, review_data: Dict, analysis: ReviewAnalysis):
        """Send alert for high-priority reviews."""
        alert_data = {
            "review_id": review_data["id"],
            "customer_id": review_data.get("customer_id"),
            "priority_score": analysis.priority_score,
            "sentiment": analysis.sentiment["overall"],
            "key_issues": [theme for theme in analysis.themes if theme["sentiment"] == "negative"],
            "suggested_actions": analysis.actionable_insights
        }
        
        # Send to customer service team (implement your notification system)
        print(f"🚨 HIGH PRIORITY ALERT: Review {review_data['id']} needs immediate attention!")
        print(f"   Priority Score: {analysis.priority_score}/10")
        print(f"   Key Issues: {[theme['category'] for theme in alert_data['key_issues']]}")
    
    def _save_analysis_to_db(self, review_id: str, analysis: ReviewAnalysis) -> str:
        """Save analysis to database (implement your DB logic)."""
        # Pseudo-code for database integration
        return f"saved_{review_id}"
    
    def _update_product_metrics(self, product_id: str, analysis: ReviewAnalysis):
        """Update product-level metrics (implement your logic)."""
        pass
    
    async def _queue_thank_you_response(self, customer_id: str, analysis: ReviewAnalysis):
        """Queue automated thank you for positive reviews."""
        pass

Conclusion

Structured outputs with Together AI unlock powerful possibilities for automated text analysis. By combining Together AI's JSON mode with Maxim's observability, we've built a production-ready system that can:

  • βœ… Analyze customer reviews consistently and reliably
  • βœ… Extract actionable insights for business teams
  • βœ… Scale to handle thousands of reviews
  • βœ… Provide comprehensive monitoring and alerting
  • βœ… Integrate seamlessly with existing workflows

The key to success is thoughtful schema design, robust error handling, and comprehensive monitoring. With these foundations in place, you can build AI applications that provide real business value while maintaining reliability and observability.

What's Next?

Consider extending this system with:

  • Multi-language support for global e-commerce
  • Real-time streaming analysis for live review feeds
  • Advanced analytics and trend detection
  • Integration with BI tools and dashboards

The structured output capabilities of modern LLMs, combined with proper observability, open up endless possibilities for intelligent automation. Start with a focused use case like this review analyzer, and gradually expand as you gain confidence and see business value.

Liked this?

If you found this use case interesting, you may also want to explore other AI agent use cases we’ve built in the past, leveraging different third-party integrations -

  1. Building a Resume Checker using LLamaIndex and Maxim - Link
  2. Observing Tool Calls made with Fireworks AI - Link
  3. Making a Financial Conversation Agent using Agno - Link
  4. Making an Interview Voice Agent using LiveKit - Link

Happy building! πŸš€