MongoDB Aggregation Pipeline Examples
Master MongoDB aggregation pipelines with real-world examples. Learn essential stages like $match, $group, $lookup, and advanced aggregation techniques for powerful data analysis.
Pipeline Flow
$match
→
$group
→
$sort
→
$project
Published: September 25, 2024 | Reading time: 20 minutes
🃠MongoDB Aggregation Quick Start
Pipeline Stages: Data flows through stages like a factory assembly line
Common Stages: $match → $group → $sort → $project → $limit
Performance: Use indexes on $match fields, limit early in pipeline
Understanding MongoDB Aggregation Pipelines
MongoDB aggregation pipelines are powerful tools for data processing and analysis. They work by passing documents through a series of stages, where each stage transforms the documents.
Raw Documents
$match
$group
$sort
$project
Results
Essential Aggregation Stages
$match Basic
Filters documents like WHERE clause in SQL
db.collection.aggregate([
{ $match: { status: "active" } }
])
$group Intermediate
Groups documents and performs aggregations
db.collection.aggregate([
{ $group: { _id: "$category", count: { $sum: 1 } } }
])
$lookup Intermediate
Performs left outer join between collections
db.collection.aggregate([
{ $lookup: { from: "users", localField: "userId", foreignField: "_id", as: "user" } }
])
$project Basic
Reshapes documents, similar to SELECT in SQL
db.collection.aggregate([
{ $project: { name: 1, email: 1, _id: 0 } }
])
Real-World Examples
Example 1: E-commerce Analytics
Analyze sales data to find top-selling products by category.
db.orders.aggregate([
// Stage 1: Filter orders from last 30 days
{
$match: {
orderDate: { $gte: new Date("2024-08-25") },
status: "completed"
}
},
// Stage 2: Unwind order items
{ $unwind: "$items" },
// Stage 3: Lookup product details
{
$lookup: {
from: "products",
localField: "items.productId",
foreignField: "_id",
as: "product"
}
},
// Stage 4: Unwind product array
{ $unwind: "$product" },
// Stage 5: Group by category and calculate totals
{
$group: {
_id: "$product.category",
totalRevenue: { $sum: { $multiply: ["$items.quantity", "$items.price"] } },
totalQuantity: { $sum: "$items.quantity" },
orderCount: { $sum: 1 }
}
},
// Stage 6: Sort by revenue descending
{ $sort: { totalRevenue: -1 } },
// Stage 7: Project final results
{
$project: {
category: "$_id",
totalRevenue: 1,
totalQuantity: 1,
orderCount: 1,
averageOrderValue: { $divide: ["$totalRevenue", "$orderCount"] },
_id: 0
}
}
])
Expected Output:
[
{
"category": "Electronics",
"totalRevenue": 125000,
"totalQuantity": 250,
"orderCount": 45,
"averageOrderValue": 2777.78
},
{
"category": "Clothing",
"totalRevenue": 89000,
"totalQuantity": 445,
"orderCount": 67,
"averageOrderValue": 1328.36
}
]
Example 2: User Engagement Analysis
Analyze user activity patterns and engagement metrics.
db.userActivities.aggregate([
// Stage 1: Match activities from last 7 days
{
$match: {
timestamp: { $gte: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000) }
}
},
// Stage 2: Lookup user information
{
$lookup: {
from: "users",
localField: "userId",
foreignField: "_id",
as: "user"
}
},
// Stage 3: Unwind user array
{ $unwind: "$user" },
// Stage 4: Group by user and calculate metrics
{
$group: {
_id: "$userId",
userEmail: { $first: "$user.email" },
totalActivities: { $sum: 1 },
uniqueDays: { $addToSet: { $dateToString: { format: "%Y-%m-%d", date: "$timestamp" } } },
activityTypes: { $addToSet: "$activityType" },
lastActivity: { $max: "$timestamp" }
}
},
// Stage 5: Calculate engagement score
{
$addFields: {
engagementScore: {
$add: [
{ $multiply: ["$totalActivities", 0.1] },
{ $multiply: [{ $size: "$uniqueDays" }, 0.5] },
{ $multiply: [{ $size: "$activityTypes" }, 0.3] }
]
}
}
},
// Stage 6: Filter active users
{ $match: { totalActivities: { $gte: 5 } } },
// Stage 7: Sort by engagement score
{ $sort: { engagementScore: -1 } },
// Stage 8: Project final results
{
$project: {
userId: "$_id",
email: "$userEmail",
totalActivities: 1,
activeDays: { $size: "$uniqueDays" },
activityTypes: { $size: "$activityTypes" },
engagementScore: { $round: ["$engagementScore", 2] },
lastActivity: 1,
_id: 0
}
},
// Stage 9: Limit to top 100 users
{ $limit: 100 }
])
Example 3: Time Series Analysis
Analyze website traffic patterns by hour and day of week.
db.pageViews.aggregate([
// Stage 1: Match recent page views
{
$match: {
timestamp: { $gte: new Date("2024-09-01") }
}
},
// Stage 2: Extract time components
{
$addFields: {
hour: { $hour: "$timestamp" },
dayOfWeek: { $dayOfWeek: "$timestamp" },
date: { $dateToString: { format: "%Y-%m-%d", date: "$timestamp" } }
}
},
// Stage 3: Group by hour and day of week
{
$group: {
_id: {
hour: "$hour",
dayOfWeek: "$dayOfWeek"
},
totalViews: { $sum: 1 },
uniqueUsers: { $addToSet: "$userId" },
uniquePages: { $addToSet: "$page" }
}
},
// Stage 4: Calculate additional metrics
{
$addFields: {
uniqueUserCount: { $size: "$uniqueUsers" },
uniquePageCount: { $size: "$uniquePages" },
avgViewsPerUser: { $divide: ["$totalViews", { $size: "$uniqueUsers" }] }
}
},
// Stage 5: Group by hour for daily patterns
{
$group: {
_id: "$_id.hour",
hourlyStats: {
$push: {
dayOfWeek: "$_id.dayOfWeek",
totalViews: "$totalViews",
uniqueUserCount: "$uniqueUserCount",
avgViewsPerUser: { $round: ["$avgViewsPerUser", 2] }
}
},
totalHourlyViews: { $sum: "$totalViews" }
}
},
// Stage 6: Sort by hour
{ $sort: { "_id": 1 } },
// Stage 7: Project final results
{
$project: {
hour: "$_id",
totalHourlyViews: 1,
dailyBreakdown: "$hourlyStats",
_id: 0
}
}
])
Advanced Aggregation Techniques
Faceted Search with $facet
Perform multiple aggregations in a single pipeline for faceted search results.
db.products.aggregate([
// Stage 1: Match products based on search criteria
{
$match: {
$or: [
{ name: { $regex: "laptop", $options: "i" } },
{ description: { $regex: "laptop", $options: "i" } }
],
price: { $gte: 500, $lte: 2000 }
}
},
// Stage 2: Use $facet for multiple aggregations
{
$facet: {
// Facet 1: Products with pagination
"products": [
{ $sort: { price: 1 } },
{ $skip: 0 },
{ $limit: 20 },
{
$project: {
name: 1,
price: 1,
category: 1,
brand: 1,
rating: 1,
image: 1
}
}
],
// Facet 2: Price ranges
"priceRanges": [
{
$bucket: {
groupBy: "$price",
boundaries: [0, 500, 1000, 1500, 2000, 3000],
default: "3000+",
output: { count: { $sum: 1 } }
}
}
],
// Facet 3: Categories
"categories": [
{
$group: {
_id: "$category",
count: { $sum: 1 }
}
},
{ $sort: { count: -1 } }
],
// Facet 4: Brands
"brands": [
{
$group: {
_id: "$brand",
count: { $sum: 1 },
avgPrice: { $avg: "$price" }
}
},
{ $sort: { count: -1 } },
{ $limit: 10 }
],
// Facet 5: Total count
"totalCount": [
{ $count: "count" }
]
}
}
])
Window Functions with $setWindowFields
Use window functions for advanced analytics like running totals and rankings.
db.sales.aggregate([
// Stage 1: Sort by date for window functions
{ $sort: { date: 1, amount: -1 } },
// Stage 2: Apply window functions
{
$setWindowFields: {
partitionBy: "$region",
sortBy: { date: 1 },
output: {
runningTotal: {
$sum: "$amount",
window: {
range: ["unboundedPreceding", "currentRow"]
}
},
monthlyRank: {
$rank: {},
window: {
range: ["unboundedPreceding", "unboundedFollowing"]
}
},
previousAmount: {
$shift: {
output: "$amount",
by: -1,
default: 0
}
}
}
}
},
// Stage 3: Calculate growth rate
{
$addFields: {
growthRate: {
$cond: {
if: { $eq: ["$previousAmount", 0] },
then: 0,
else: {
$multiply: [
{ $divide: [{ $subtract: ["$amount", "$previousAmount"] }, "$previousAmount"] },
100
]
}
}
}
}
},
// Stage 4: Project final results
{
$project: {
date: 1,
region: 1,
amount: 1,
runningTotal: 1,
monthlyRank: 1,
growthRate: { $round: ["$growthRate", 2] },
_id: 0
}
}
])
Performance Optimization Tips
Common Use Cases
📊 Business Intelligence Dashboard
Create comprehensive dashboards with multiple metrics and KPIs using $facet for parallel aggregations.
// Dashboard metrics in one query
{ $facet: {
"revenue": [{ $group: { _id: null, total: { $sum: "$amount" } } }],
"customers": [{ $group: { _id: null, count: { $sum: 1 } } }],
"trends": [{ $group: { _id: { $dateToString: { format: "%Y-%m", date: "$date" } }, revenue: { $sum: "$amount" } } }]
}}
🔠Real-time Analytics
Process real-time data streams with time-based aggregations and moving averages.
// Real-time metrics
{ $match: { timestamp: { $gte: new Date(Date.now() - 3600000) } } },
{ $group: { _id: { $dateTrunc: { date: "$timestamp", unit: "minute" } }, count: { $sum: 1 } } },
{ $sort: { "_id": -1 } }
📈 A/B Testing Analysis
Analyze A/B test results with statistical aggregations and conversion funnels.
// A/B test analysis
{ $group: {
_id: "$variant",
totalUsers: { $sum: 1 },
conversions: { $sum: { $cond: ["$converted", 1, 0] } },
avgSessionTime: { $avg: "$sessionTime" }
}},
{ $addFields: {
conversionRate: { $multiply: [{ $divide: ["$conversions", "$totalUsers"] }, 100] }
}}
Best Practices
Summary
MongoDB aggregation pipelines are powerful tools for data analysis and processing:
- Pipeline Stages: Use $match, $group, $lookup, $project, and others effectively
- Performance: Optimize with proper indexing and pipeline structure
- Real-world Applications: Perfect for analytics, reporting, and data transformation
- Advanced Features: Leverage $facet, window functions, and complex expressions
Need MongoDB Help?
Our MongoDB experts can help you design efficient aggregation pipelines and optimize your database performance.
Get MongoDB Help