`n

MongoDB Aggregation Pipeline Examples

Master MongoDB aggregation pipelines with real-world examples. Learn essential stages like $match, $group, $lookup, and advanced aggregation techniques for powerful data analysis.

Pipeline Flow

$match
→
$group
→
$sort
→
$project

Published: September 25, 2024 | Reading time: 20 minutes

🍃 MongoDB Aggregation Quick Start

Pipeline Stages: Data flows through stages like a factory assembly line

Common Stages: $match → $group → $sort → $project → $limit

Performance: Use indexes on $match fields, limit early in pipeline

Understanding MongoDB Aggregation Pipelines

MongoDB aggregation pipelines are powerful tools for data processing and analysis. They work by passing documents through a series of stages, where each stage transforms the documents.

Raw Documents
$match
$group
$sort
$project
Results

Essential Aggregation Stages

$match Basic

Filters documents like WHERE clause in SQL
db.collection.aggregate([ { $match: { status: "active" } } ])

$group Intermediate

Groups documents and performs aggregations
db.collection.aggregate([ { $group: { _id: "$category", count: { $sum: 1 } } } ])

$lookup Intermediate

Performs left outer join between collections
db.collection.aggregate([ { $lookup: { from: "users", localField: "userId", foreignField: "_id", as: "user" } } ])

$project Basic

Reshapes documents, similar to SELECT in SQL
db.collection.aggregate([ { $project: { name: 1, email: 1, _id: 0 } } ])

Real-World Examples

Example 1: E-commerce Analytics

Analyze sales data to find top-selling products by category.

📊
Sales Analytics Pipeline
db.orders.aggregate([ // Stage 1: Filter orders from last 30 days { $match: { orderDate: { $gte: new Date("2024-08-25") }, status: "completed" } }, // Stage 2: Unwind order items { $unwind: "$items" }, // Stage 3: Lookup product details { $lookup: { from: "products", localField: "items.productId", foreignField: "_id", as: "product" } }, // Stage 4: Unwind product array { $unwind: "$product" }, // Stage 5: Group by category and calculate totals { $group: { _id: "$product.category", totalRevenue: { $sum: { $multiply: ["$items.quantity", "$items.price"] } }, totalQuantity: { $sum: "$items.quantity" }, orderCount: { $sum: 1 } } }, // Stage 6: Sort by revenue descending { $sort: { totalRevenue: -1 } }, // Stage 7: Project final results { $project: { category: "$_id", totalRevenue: 1, totalQuantity: 1, orderCount: 1, averageOrderValue: { $divide: ["$totalRevenue", "$orderCount"] }, _id: 0 } } ])

Expected Output:

[
  {
    "category": "Electronics",
    "totalRevenue": 125000,
    "totalQuantity": 250,
    "orderCount": 45,
    "averageOrderValue": 2777.78
  },
  {
    "category": "Clothing",
    "totalRevenue": 89000,
    "totalQuantity": 445,
    "orderCount": 67,
    "averageOrderValue": 1328.36
  }
]

Example 2: User Engagement Analysis

Analyze user activity patterns and engagement metrics.

👥
User Engagement Pipeline
db.userActivities.aggregate([ // Stage 1: Match activities from last 7 days { $match: { timestamp: { $gte: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000) } } }, // Stage 2: Lookup user information { $lookup: { from: "users", localField: "userId", foreignField: "_id", as: "user" } }, // Stage 3: Unwind user array { $unwind: "$user" }, // Stage 4: Group by user and calculate metrics { $group: { _id: "$userId", userEmail: { $first: "$user.email" }, totalActivities: { $sum: 1 }, uniqueDays: { $addToSet: { $dateToString: { format: "%Y-%m-%d", date: "$timestamp" } } }, activityTypes: { $addToSet: "$activityType" }, lastActivity: { $max: "$timestamp" } } }, // Stage 5: Calculate engagement score { $addFields: { engagementScore: { $add: [ { $multiply: ["$totalActivities", 0.1] }, { $multiply: [{ $size: "$uniqueDays" }, 0.5] }, { $multiply: [{ $size: "$activityTypes" }, 0.3] } ] } } }, // Stage 6: Filter active users { $match: { totalActivities: { $gte: 5 } } }, // Stage 7: Sort by engagement score { $sort: { engagementScore: -1 } }, // Stage 8: Project final results { $project: { userId: "$_id", email: "$userEmail", totalActivities: 1, activeDays: { $size: "$uniqueDays" }, activityTypes: { $size: "$activityTypes" }, engagementScore: { $round: ["$engagementScore", 2] }, lastActivity: 1, _id: 0 } }, // Stage 9: Limit to top 100 users { $limit: 100 } ])

Example 3: Time Series Analysis

Analyze website traffic patterns by hour and day of week.

📈
Traffic Analysis Pipeline
db.pageViews.aggregate([ // Stage 1: Match recent page views { $match: { timestamp: { $gte: new Date("2024-09-01") } } }, // Stage 2: Extract time components { $addFields: { hour: { $hour: "$timestamp" }, dayOfWeek: { $dayOfWeek: "$timestamp" }, date: { $dateToString: { format: "%Y-%m-%d", date: "$timestamp" } } } }, // Stage 3: Group by hour and day of week { $group: { _id: { hour: "$hour", dayOfWeek: "$dayOfWeek" }, totalViews: { $sum: 1 }, uniqueUsers: { $addToSet: "$userId" }, uniquePages: { $addToSet: "$page" } } }, // Stage 4: Calculate additional metrics { $addFields: { uniqueUserCount: { $size: "$uniqueUsers" }, uniquePageCount: { $size: "$uniquePages" }, avgViewsPerUser: { $divide: ["$totalViews", { $size: "$uniqueUsers" }] } } }, // Stage 5: Group by hour for daily patterns { $group: { _id: "$_id.hour", hourlyStats: { $push: { dayOfWeek: "$_id.dayOfWeek", totalViews: "$totalViews", uniqueUserCount: "$uniqueUserCount", avgViewsPerUser: { $round: ["$avgViewsPerUser", 2] } } }, totalHourlyViews: { $sum: "$totalViews" } } }, // Stage 6: Sort by hour { $sort: { "_id": 1 } }, // Stage 7: Project final results { $project: { hour: "$_id", totalHourlyViews: 1, dailyBreakdown: "$hourlyStats", _id: 0 } } ])

Advanced Aggregation Techniques

Faceted Search with $facet

Perform multiple aggregations in a single pipeline for faceted search results.

🔍
Faceted Search Pipeline
db.products.aggregate([ // Stage 1: Match products based on search criteria { $match: { $or: [ { name: { $regex: "laptop", $options: "i" } }, { description: { $regex: "laptop", $options: "i" } } ], price: { $gte: 500, $lte: 2000 } } }, // Stage 2: Use $facet for multiple aggregations { $facet: { // Facet 1: Products with pagination "products": [ { $sort: { price: 1 } }, { $skip: 0 }, { $limit: 20 }, { $project: { name: 1, price: 1, category: 1, brand: 1, rating: 1, image: 1 } } ], // Facet 2: Price ranges "priceRanges": [ { $bucket: { groupBy: "$price", boundaries: [0, 500, 1000, 1500, 2000, 3000], default: "3000+", output: { count: { $sum: 1 } } } } ], // Facet 3: Categories "categories": [ { $group: { _id: "$category", count: { $sum: 1 } } }, { $sort: { count: -1 } } ], // Facet 4: Brands "brands": [ { $group: { _id: "$brand", count: { $sum: 1 }, avgPrice: { $avg: "$price" } } }, { $sort: { count: -1 } }, { $limit: 10 } ], // Facet 5: Total count "totalCount": [ { $count: "count" } ] } } ])

Window Functions with $setWindowFields

Use window functions for advanced analytics like running totals and rankings.

🪟
Window Functions Pipeline
db.sales.aggregate([ // Stage 1: Sort by date for window functions { $sort: { date: 1, amount: -1 } }, // Stage 2: Apply window functions { $setWindowFields: { partitionBy: "$region", sortBy: { date: 1 }, output: { runningTotal: { $sum: "$amount", window: { range: ["unboundedPreceding", "currentRow"] } }, monthlyRank: { $rank: {}, window: { range: ["unboundedPreceding", "unboundedFollowing"] } }, previousAmount: { $shift: { output: "$amount", by: -1, default: 0 } } } } }, // Stage 3: Calculate growth rate { $addFields: { growthRate: { $cond: { if: { $eq: ["$previousAmount", 0] }, then: 0, else: { $multiply: [ { $divide: [{ $subtract: ["$amount", "$previousAmount"] }, "$previousAmount"] }, 100 ] } } } } }, // Stage 4: Project final results { $project: { date: 1, region: 1, amount: 1, runningTotal: 1, monthlyRank: 1, growthRate: { $round: ["$growthRate", 2] }, _id: 0 } } ])

Performance Optimization Tips

🚀 Index Optimization

Create indexes on fields used in $match stages early in your pipeline:

// Create compound index for efficient filtering db.orders.createIndex({ "orderDate": 1, "status": 1 }) // Use this in your pipeline { $match: { orderDate: { $gte: new Date() }, status: "completed" } }

âš¡ Pipeline Optimization

  • Filter Early: Use $match as the first stage to reduce document count
  • Limit Results: Use $limit early when you don't need all results
  • Project Early: Use $project to reduce document size
  • Avoid Large Arrays: Use $unwind carefully with large arrays

🔍 Query Optimization

  • Use $expr for Complex Conditions: When you need to compare fields
  • Optimize $lookup: Use indexed foreign fields
  • Consider $facet: For multiple aggregations instead of separate queries
  • Use $sample: Instead of $limit for random sampling

Common Use Cases

📊 Business Intelligence Dashboard

Create comprehensive dashboards with multiple metrics and KPIs using $facet for parallel aggregations.

// Dashboard metrics in one query { $facet: { "revenue": [{ $group: { _id: null, total: { $sum: "$amount" } } }], "customers": [{ $group: { _id: null, count: { $sum: 1 } } }], "trends": [{ $group: { _id: { $dateToString: { format: "%Y-%m", date: "$date" } }, revenue: { $sum: "$amount" } } }] }}

🔍 Real-time Analytics

Process real-time data streams with time-based aggregations and moving averages.

// Real-time metrics { $match: { timestamp: { $gte: new Date(Date.now() - 3600000) } } }, { $group: { _id: { $dateTrunc: { date: "$timestamp", unit: "minute" } }, count: { $sum: 1 } } }, { $sort: { "_id": -1 } }

📈 A/B Testing Analysis

Analyze A/B test results with statistical aggregations and conversion funnels.

// A/B test analysis { $group: { _id: "$variant", totalUsers: { $sum: 1 }, conversions: { $sum: { $cond: ["$converted", 1, 0] } }, avgSessionTime: { $avg: "$sessionTime" } }}, { $addFields: { conversionRate: { $multiply: [{ $divide: ["$conversions", "$totalUsers"] }, 100] } }}

Best Practices

✅ Do's

  • Use $match early to filter documents
  • Create appropriate indexes for your pipeline stages
  • Use $project to limit fields early in the pipeline
  • Test with explain() to understand execution plans
  • Use $facet for multiple parallel aggregations
  • Consider memory usage with large datasets

❌ Don'ts

  • Don't use $unwind on large arrays without filtering first
  • Don't forget to sort before using window functions
  • Don't use $lookup without proper indexes
  • Don't ignore the 100MB pipeline memory limit
  • Don't use $regex without indexes for large collections
  • Don't forget to validate your aggregation results

Summary

MongoDB aggregation pipelines are powerful tools for data analysis and processing:

  • Pipeline Stages: Use $match, $group, $lookup, $project, and others effectively
  • Performance: Optimize with proper indexing and pipeline structure
  • Real-world Applications: Perfect for analytics, reporting, and data transformation
  • Advanced Features: Leverage $facet, window functions, and complex expressions

Need MongoDB Help?

Our MongoDB experts can help you design efficient aggregation pipelines and optimize your database performance.

Get MongoDB Help