Streaming аналитика на ClickHouse + Kafka
Реализуете потоковую аналитику на ClickHouse с использованием Kafka Engine и materialized views за 60–90 минут. Пошаговая инструкция с командами, ожидаемым выводом и типовыми ошибками.
Статья была полезной?
Реализуете потоковую аналитику на ClickHouse с использованием Kafka Engine и materialized views за 60–90 минут. Пошаговая инструкция с командами, ожидаемым выводом и типовыми ошибками.
Статья была полезной?
clickhouse/clickhouse-server:25.0, ~260 MB.confluentinc/cp-kafka:7.6, ~450 MB.Типичная архитектура включает продьюсеров событий (API, микросервисы), кластер Kafka для доставки и буферизации, ClickHouse как движок аналитики с таблицами Kafka Engine и materialized views, которые записывают агрегаты в MergeTree. Ожидаемая задержка от события до агрегата — 0.5–2 секунды при правильной настройке.

Схема архитектуры ClickHouse и Kafka: продьюсер → Kafka topic → ClickHouse Kafka Engine → Materialized View → MergeTree
Причины такой схемы: Kafka гарантирует порядок и повторную доставку, ClickHouse обеспечивает быстрые агрегации и хранение исторических данных. Для кратковременной агрегации используют Materialized View с BufferTable или SummingMergeTree.

Таймлайн сообщений в Kafka и потребление ClickHouse: offset, partition
Цель шага: создать таблицу в ClickHouse, которая напрямую читает сообщения из Kafka-топика. Подойдёт для JSON- или Avro-схем. Команды ниже рассчитаны на ClickHouse 25.0 (релиз 2025) и Kafka 3.7 (релиз 2025).
-- Создать внешний Kafka engine используя HTTP-интерфейс ClickHouse
curl -sS -X POST 'http://localhost:8123/' --data-binary $'CREATE TABLE default.events_kafka (
event_time DateTime64(3),
user_id UInt64,
action String,
value Float64
) ENGINE = Kafka()
SETTINGS kafka_broker_list = 'kafka:9092',
kafka_topic_list = 'events',
kafka_group_name = 'clickhouse-consumer-1',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 1;'Пояснение: таблица с ENGINE = Kafka() не хранит данные постоянно — она лишь читает последние сообщения из топика и предоставляет их как временное представление. Параметр kafka_format определяет декодер (здесь JSONEachRow).
Ожидаемый успешный вывод:
Ok.Типовая ошибка и фикс:
Ошибка: Code: 1001. DB::Exception: Cannot parse input: bad JSON in row
Фикс: Проверьте формат сообщений в Kafka (ключ/значение). Для JSONEachRow каждое сообщение должно быть валидным JSON с полями, соответствующими схеме таблицы. Запустите producer с примером:
cat sample.json | kafka-console-producer --broker-list localhost:9092 --topic eventsЦель шага: автоматически переносить данные из Kafka Engine в столбцовую таблицу ClickHouse для постоянного хранения и последующих агрегаций. Здесь используем MergeTree с партицированием по дате.
-- Создаём целевую таблицу
curl -sS -X POST 'http://localhost:8123/' --data-binary $'CREATE TABLE default.events_mt (
event_date Date,
event_time DateTime64(3),
user_id UInt64,
action String,
value Float64
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, user_id);
-- Создаём materialized view, который читает из Kafka-таблицы и вставляет в MergeTree
curl -sS -X POST 'http://localhost:8123/' --data-binary $'CREATE MATERIALIZED VIEW default.events_mv TO default.events_mt AS
SELECT
toDate(event_time) AS event_date,
event_time,
user_id,
action,
value
FROM default.events_kafka;'Пояснение: materialized view делает INSERT в events_mt при появлении новых строк в events_kafka. Это типовой способ держать стабильное хранилище с low-latency записью.
Ожидаемый успешный вывод:
Ok.
Ok.Типовая ошибка и фикс:
Ошибка: Code: 57. DB::Exception: Unknown table default.events_kafka
Фикс: Проверьте, что таблица Kafka создана в том же database и что ClickHouse-server запущен. Также убедитесь, что вы не создаёте materialized view до Kafka-таблицы.Цель шага: написать агрегаты, которые уменьшают объем данных и позволяют получать ответы в реальном времени. В этом примере считаем активность по пользователям и суммарное значение по action за 1 минутный интервал. Для инкрементальной агрегации используем AggregatingMergeTree или SummingMergeTree в зависимости от требований.
-- Создаём таблицу с pre-aggregates (1-minute buckets)
curl -sS -X POST 'http://localhost:8123/' --data-binary $'CREATE TABLE default.events_1m_agg (
bucket_start DateTime64(3),
action String,
users_count UInt64,
value_sum Float64
) ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(bucket_start)
ORDER BY (bucket_start, action);
-- Materialized view обновляет агрегат в режиме near-real-time
curl -sS -X POST 'http://localhost:8123/' --data-binary $'CREATE MATERIALIZED VIEW default.events_1m_mv TO default.events_1m_agg AS
SELECT
toStartOfMinute(event_time) AS bucket_start,
action,
uniqExactState(user_id) AS users_count_state,
sumState(value) AS value_sum_state
FROM default.events_mt
GROUP BY bucket_start, action;
-- Для SummingMergeTree нужно развернуть state, используем функцию finalizeAggregates при чтении
'Пояснение: здесь демонстрация подхода: materialized view заполняет агрегатные состояния. В production используют AggregatingMergeTree и хранение state-колонок или дополнительно finalize при селекте.
Ожидаемый успешный вывод:
Ok.
Ok.Типовая ошибка и фикс:
Ошибка: Code: 49. DB::Exception: Aggregate function uniqExactState is not supported in this context
Фикс: Убедитесь, что используете AggregatingMergeTree как движок или храните агрегатные state-колонки. Альтернатива: заменить на approxAggregate(uniqExactState -> uniqState) или использовать simple counts для приемлемой погрешности.Цель шага: сгенерировать тестовые события в Kafka и проверить, что ClickHouse корректно их читает и агрегирует. Для генерации используйте kafka-console-producer или любой producer на языке приложения.
# Генерация 1000 событий за 5 секунд (пример для bash + kafka-console-producer)
for i in $(seq 1 1000); do
ts=$(date -u +"%Y-%m-%dT%H:%M:%S.%3NZ")
echo "{\"event_time\": \"$ts\", \"user_id\": $((RANDOM%5000+1)), \"action\": \"click\", \"value\": $(awk "BEGIN{srand(); printf \"%.2f\", rand()*10}") }" \
| kafka-console-producer --broker-list localhost:9092 --topic events > /dev/null
done
# Проверить вставку в ClickHouse
curl -sS 'http://localhost:8123/?query=SELECT+count%281%29+FROM+default.events_mt' | sed -n '1p'Ожидаемый успешный вывод:
1000Типичная ошибка и фикс:
Ошибка при продьюсе: ERROR: Failed to connect to kafka:9092
Фикс: Проверьте, что broker доступен по названию/хосту из контейнера. При Docker Compose используйте алиасы сервисов или пробросьте порты. Тестируйте изнутри контейнера ClickHouse:
docker exec -it clickhouse-server ping -c 3 kafkaЦель шага: организовать production-деплой с systemd (опционально), мониторингом и alerting. Минимальный стек: Prometheus + Grafana + ClickHouse exporter + Kafka exporter. Время развертывания для базовой конфигурации — 30–60 минут на однородном хосте с 8 CPU/32 GB RAM.
# Пример systemd unit для ClickHouse (файл /etc/systemd/system/clickhouse.service)
[Unit]
Description=ClickHouse Server
After=network.target
[Service]
Type=simple
User=clickhouse
ExecStart=/usr/bin/clickhouse-server --config-file=/etc/clickhouse-server/config.xml
LimitNOFILE=262144
Restart=on-failure
[Install]
WantedBy=multi-user.target
# После добавления:
systemctl daemon-reload
systemctl enable --now clickhouse
systemctl status clickhouse --no-pagerОжидаемый вывод команды systemctl status:
● clickhouse.service - ClickHouse Server
Loaded: loaded (/etc/systemd/system/clickhouse.service; enabled)
Active: active (running) since Thu 2026-03-12 12:34:56 UTC; 2min 10s agoТиповая ошибка и фикс:
Ошибка: Failed at step USER spawning /usr/bin/clickhouse-server: No such user
Фикс: Создайте пользователя clickhouse: useradd -r -s /sbin/nologin clickhouse, проверьте права на каталоги данных и логов.Список наиболее распространённых проблем и как их избегать:
merge_max_size, партиционирование и регулярно запускайте OPTIMIZE для маленьких партиций.Масштабирование предполагает масштабирование трёх компонент: Kafka (ингест), ClickHouse (хранение/вычисления) и сети между ними. Привожу рекомендации с конкретикой.
Distributed-таблицы и балансировку запросов через remote и cluster конфигурации.Практический пример: масштабирование агрегатов. Если один MergeTree не успевает выполнять merges, добавьте шардинг по hash(user_id) и распределите агрегаты по шардовой схеме; используйте Distributed view для запросов.
Мониторинг: соберите метрики по latency end-to-end (producer → Kafka → ClickHouse ingestion → materialized view latency). Установите alert при latencies > 2s. Примеры метрик: consumer_lag, commit_latency, merges_in_queue, memory_usage.
Дублирование происходит, если materialized view повторно читает одни и те же сообщения: это случается при неправильной конфигурации групп или при ручном реимпорте. Решение — использовать контролируемые kafka_group_name и idempotent inserts (например, уникальный ключ в MergeTree и INSERT с ON DUPLICATE SKIP или использовать ReplacingMergeTree с ключом). Также полезно хранить offset-позиции вне ClickHouse, если нужна сложная логика повторной доставки.
Avro с Schema Registry предпочтительнее в продах из-за строгой версии схем и совместимости (backward/forward). JSON проще для быстрых прототипов, но при изменении полей требуется дополнительная обработка на стороне парсера. Для ClickHouse+Kafka production-рекомендуется Avro/Protobuf + Schema Registry (Confluent), особенно если несколько сервисов публикуют события.
Проверьте несколько вещей: совпадает ли database у Kafka-таблицы и materialized view; не произошло ли исключение при парсинге; есть ли ошибки в логах ClickHouse (обычно в /var/log/clickhouse-server/clickhouse-server.log). Также убедитесь, что Kafka consumer group корректна и нет зависимости от security (SASL/SSL). Частая причина — mismatch полей/типов между Kafka-сообщением и схемой таблицы.
При использовании AggregatingMergeTree для state-агрегатов оценка зависит от типа state. При хранении approx-агрегатов (HyperLogLog, approx count) требуется ~1–5 KB на state, итого 100 млн * 1 KB = ~100 GB. Для точных state (uniqExactState) требуются десятки раз больше. Планируйте 2×–3× для overhead и индексов, используйте внешнее хранение state или approximate-агрегаты, если RAM ограничена.
При локальной сетевой среде и оптимальной конфигурации достигается end-to-end latency 0.5–2 секунд: Kafka доставляет событие обычно <1 s, ClickHouse consumer читает и materialized view применяет вставку в течении долей секунды. Ключевые факторы — размер партиций, число consumer'ов, скорость disk I/O и настройка merge'ов. Для сокращения latency уменьшите batch-сборку сообщений и увеличьте частоту flush.
Дополнительные материалы доступны в разделах DevOps и Базы данных на сайте, где есть примеры Docker Compose и production-настроек для ClickHouse и Kafka.
Комментарии (0)
Войдите или зарегистрируйтесь, чтобы оставить комментарий
Загрузка комментариев…