MongoDB aggregation pipeline 2026
Пошаговое руководство по использованию MongoDB Aggregation Pipeline: от базового $match/$group до $lookup, $facet и оптимизаций. Примерное время выполнения: 45–90 минут.
Статья была полезной?
Пошаговое руководство по использованию MongoDB Aggregation Pipeline: от базового $match/$group до $lookup, $facet и оптимизаций. Примерное время выполнения: 45–90 минут.
Статья была полезной?
$match, $group, $lookup, $facet, $sort, $limit, $project.mongo:7.2 (~550 MB).К концу руководства вы сможете собрать рабочие aggregation pipeline для типичных аналитических задач и оптимизировать их под MongoDB 7.2. Время выполнения полного руководства: примерно 45–90 минут в зависимости от практики.

Скриншот: схема aggregation pipeline с этапами $match,$group,$lookup
Aggregation pipeline удобнее SQL в сценариях, где данные денормализованы, требуется потоковая трансформация документов или многослойная агрегация на стороне БД без лишних round-trip к приложению. В 2025–2026 годах команды всё чаще используют pipeline для временных рядов, событийной аналитики и ETL-процессов, когда нужно выполнить несколько последовательных преобразований одного документа: фильтрацию, проекции, вычисления, джойны и фасеты. Типичный пример — подсчёт сессионной аналитики по событиям клиента, где один документ содержит массив событий.
Преимущества pipeline перед SQL: встроенные этапы (например, $setWindowFields для оконных функций, появившиеся и стабилизированные к 2025), возможность работать с вложенными массивами без JOIN таблиц и гибкость при изменении схемы. Недостаток — сложность отладки длинных конвейеров и чувствительность к объёму RAM при группировках и фасетах.
Задача: посчитать количество событий по типу и по пользователю за последний месяц. Используем коллекцию events, документы формата {userId, type, ts}.
Команда в mongosh 1.11 (выполнение ~10–50 ms локально на тестовой выборке 100k):
db.events.aggregate([
{ $match: { ts: { $gte: new Date(new Date()-1000*60*60*24*30) } } },
{ $group: { _id: { userId: "$userId", type: "$type" }, count: { $sum: 1 } } },
{ $sort: { "count": -1 } }
])Пояснение: $match должен быть первым для использования индексного сканирования по полю ts. $group агрегирует по составному ключу.
Ожидаемый вывод (пример):
{ "_id" : { "userId" : "user123", "type" : "click" }, "count" : 542 }
{ "_id" : { "userId" : "user456", "type" : "view" }, "count" : 431 }Типичная ошибка: отсутствие индекса по ts приводит к полному скану коллекции и OOM при больших данных.
Ошибка: planExecutor error during aggregation :: caused by :: Cannot allocate memoryФикс: создать индекс и уменьшить рабочую выборку или использовать $limit и периодизацию.
db.events.createIndex({ ts: 1 })
// Создание индекса: ~12s на 1M документов на SSDЕсли у вас ограничено RAM, добавьте $project перед $group, чтобы сократить поля, и используйте опцию allowDiskUse: true:
db.events.aggregate(pipeline, { allowDiskUse: true })Ожидаемое поведение: aggregation сохранит промежуточные данные на диск; производительность упадёт, но OOM уйдёт.
Задача: приписать к каждому событию профиль пользователя из коллекции users по userId. Коллекция users содержит до 500k документов.
db.events.aggregate([
{ $match: { ts: { $gte: ISODate("2026-01-01T00:00:00Z") } } },
{ $lookup: {
from: "users",
localField: "userId",
foreignField: "_id",
as: "user"
} },
{ $unwind: "$user" },
{ $project: { user: { name: 1, email: 1 }, type: 1, ts: 1 } }
])Пояснение: $lookup выполняет left-join эквивалент; предпочтительно, чтобы users._id был индексирован (обычно это primary key).
Ожидаемый вывод (пример):
{ "_id" : ObjectId("..."), "type" : "purchase", "ts" : ISODate("2026-02-10T12:00:00Z"), "user" : { "name" : "Anna", "email" : "anna@example.com" } }Типичная ошибка: если users._id не индексирован, lookup выполняет COLLSCAN по users и время запроса растёт экспоненциально.
Проблема: high latency, план показывает $lookup -> unindexed COLLSCANФикс: убедитесь, что внешний ключ индексирован — обычно это _id. Для кастомного поля создайте индекс:
db.users.createIndex({ _id: 1 }) // обычно уже есть
// или для кастомного ключа
db.users.createIndex({ userId: 1 })Дополнительный приём: если связка много событий -> один пользователь и коллекция users небольшая (<=100MB), можно загрузить users в память приложения и делать join на уровне приложения. См. материалы по кешированию в devops.
Задача: одновременно получить 3 результата — топ событий по типу, распределение по часам и общую статистику — в одном запросе. Используем $facet для параллельных веток в pipeline.
db.events.aggregate([
{ $match: { ts: { $gte: new Date(new Date()-1000*60*60*24*7) } } },
{ $facet: {
topTypes: [ { $group: { _id: "$type", c: { $sum: 1 } } }, { $sort: { c: -1 } }, { $limit: 10 } ],
hourly: [ { $project: { hour: { $hour: "$ts" } } }, { $group: { _id: "$hour", c: { $sum: 1 } } }, { $sort: { _id: 1 } } ],
stats: [ { $group: { _id: null, total: { $sum: 1 }, uniqueUsers: { $addToSet: "$userId" } } }, { $project: { total: 1, uniqueCount: { $size: "$uniqueUsers" } } } ]
} }
])Пояснение: $facet создаёт независимые под-пайплайны и возвращает результаты в одном документе. Это удобно для dashboard-метрик, но потребляет память: сумма промежуточных наборов может быть значительной.
Ожидаемый вывод (упрощённый):
{
"topTypes" : [ { "_id" : "click", "c" : 12450 }, ... ],
"hourly" : [ { "_id" : 0, "c" : 512 }, ... ],
"stats" : [ { "total" : 543210, "uniqueCount" : 12345 } ]
}Типичная ошибка: OOM при сборе больших поднаборов в facet.
Error: command failed: command aggregate failed: Exceeded memory limit for $group stageФикс: использовать allowDiskUse: true, сократить поля в ранних этапах ($project), или разбить задачу на несколько запросов по времени.
Пример с опцией дисковой поддержки:
db.events.aggregate(pipeline, { allowDiskUse: true })
// Опция может увеличить время выполнения до 2–5x, но снимает ограничение RAM.Задача: получить топ-20 пользователей по количеству событий. Правильный порядок этапов критичен: сначала $group, затем $sort, затем $limit. Если есть индекс покрытия, стоит использовать его до группировки.
db.events.aggregate([
{ $match: { ts: { $gte: ISODate("2026-03-01T00:00:00Z") } } },
{ $group: { _id: "$userId", events: { $sum: 1 } } },
{ $sort: { events: -1 } },
{ $limit: 20 }
])Ожидаемый результат:
{ "_id" : "user789", "events" : 12433 }
{ "_id" : "user123", "events" : 11220 }
... 18 строк ...Типичная ошибка: если выполнить $sort на непроиндексированном большом множестве, MongoDB тратит память на сортировку.
Проблема: Sort exceeded memory limit, consider adding index or increasing RAMФикс: использовать индекс по полю, участвующему в сортировке, либо применять allowDiskUse: true и предварительно уменьшать набор через $match или $project. Альтернатива: поддерживать агрегированные счётчики в отдельной коллекции обновляемой через change streams или application layer.
Задача: сформировать итоговую проекцию с вычислением стоимости заказа и приведением дат в нужную зону времени. Используем $project и выражения.
db.orders.aggregate([
{ $match: { status: "paid", createdAt: { $gte: ISODate("2026-01-01T00:00:00Z") } } },
{ $project: {
orderId: 1,
totalCents: { $round: [ { $multiply: ["$unitPrice", "$quantity"] }, 0 ] },
createdLocal: { $dateToString: { date: "$createdAt", timezone: "Europe/Moscow", format: "%Y-%m-%d %H:%M:%S" } }
} },
{ $limit: 50 }
])Ожидаемый вывод:
{ "orderId" : "ord_001", "totalCents" : 1299, "createdLocal" : "2026-03-10 16:20:05" }Типичная ошибка: использование операций с нечисловыми типами приводит к NaN или ошибкам типов.
Ошибка: Cannot apply $multiply to types: string and intФикс: привести типы заранее: $toDouble или хранить цены в числовом поле. Пример приведения:
{ $project: { unitPrice: { $toDouble: "$unitPrice" } } }
Скриншот: пример $project с $dateToString и вычислением total
Оптимизация aggregation pipeline включает индексирование, минимизацию передаваемых полей, использование allowDiskUse осознанно и применение кеширования. Конкретные шаги и рекомендации:
$match (например, {ts:1}). Индексная селективность важна: индекс по {userId:1, ts:-1} уменьшит нагрузку при запросах по пользователю за период.$match → $project → $group → $sort/$limit. Это уменьшает объём данных до тяжёлых операций.allowDiskUse:true для больших группировок; ожидаемое увеличение времени: 2–5x, но RAM не будет переполняться.explain("executionStats") и анализируйте планы. Пример: db.events.aggregate(pipeline).explain("executionStats"). Ожидаемая информация: docsExamined, totalKeysExamined, executionTimeMillis.Пример анализа плана (выдержка):
"executionStats" : {
"executionTimeMillis" : 78,
"totalKeysExamined" : 1200,
"totalDocsExamined" : 1500
}Если totalDocsExamined значительно выше ожидаемого, проверьте отсутствие COLLSCAN и улучшите индекс.
Дополнительно: используйте Redis 7 (2025) как кэш для готовых результатных наборов dashboard-метрик. См. статью в категории database для практик кеширования.
Список распространённых антипаттернов при использовании aggregation pipeline и способы их избегать.
$facet для большого объёма данных без allowDiskUse. Последствие: OOM. Решение: разбить запросы, выполнить параллельно или включить дисковую поддержку.Если итоговые метрики нужны часто и рассчитаны по одним и тем же ключам — поддерживайте предагрегированные коллекции и обновляйте их инкрементально через change streams.
Выбор зависит от характера данных и частоты обновлений. Если связанные данные редко меняются и читаются вместе с основными документами, денормализация уменьшит задержки и уберёт необходимость в join на чтении. Если же отношение «один-ко-многим» с большим количеством связанных записей или частыми изменениями в связанной сущности, выгоднее хранить отдельную коллекцию и использовать $lookup. Для high-read сценариев предпочтительнее денормализация с механизмом обновления данных через батч-процессы или change streams. При выборе учитывайте размер документов: MongoDB ограничивает BSON до 16MB, поэтому нельзя релизовать полную денормализацию для очень больших связанных массивов.
Первое действие — включить allowDiskUse: true для переноса промежуточных массивов на диск. Второй шаг — сократить объём данных перед $group с помощью $match и $project. Третье — разбить агрегацию по временным интервалам и затем объединить результаты. В production-инфраструктуре хорошее решение — поддержание предагрегатов (daily/hourly) и шардирование коллекции, если объём превышает 100–200 GB. Наконец, анализируйте план через .explain("executionStats"), чтобы понять, где именно потребление памяти критично.
Даже маленькая коллекция может вызвать задержки, если джойны выполняются миллионами раз без индекса по внешнему полю. Проверьте, индексирован ли foreignField. Дополнительно убедитесь, что размер документов небольш, и что сеть/IO на сервере не перегружены. На тестовой машине Docker образ mongo:7.2 (~550 MB) загружается за ~8–20 секунд на типичном SSD; проблемы с IO напрямую влияют на время $lookup. Если latency остаётся высокой, рассмотрите кеширование связанной коллекции в RAM или на стороне приложения.
$setWindowFields полезен для расчёта скользящих средних, ранжирования и других оконных функций без необходимости клиента аггрегировать данные. Используйте его для аналитики по времени, когда нужна информация о предыдущих значениях (lag/lead) или округлённые скользящие метрики. Поскольку это относительно тяжёлая операция, убедитесь, что входной набор предварительно отфильтрован и что вы ограничиваете объем обработанных документов — например, применяйте $match по диапазону времени.
Минимальная рекомендация для серьёзных фасетов: 8 GB RAM на ноду при обработке десятков миллионов документов; для production-аналитики — 16–32 GB RAM и SSD NVMe. Если вы планируете использовать sharded cluster для аналитики, общий объём RAM и IO должен масштабироваться пропорционально объёму данных и требуемой параллельности. Всегда тестируйте с реалистичными данными: на локальной машине с 8 GB одна фасет-операция по 10M документов может завершаться за минуты с включённым дисковым режимом.
Комментарии (0)
Войдите или зарегистрируйтесь, чтобы оставить комментарий
Загрузка комментариев…