Стек Big Data — это совокупность технологий и инструментов, организованных в логичные уровни, которые вместе обеспечивают сбор, хранение, обработку, анализ и визуализацию очень больших, разнообразных и быстро меняющихся объёмов данных.
Ключевые характеристики Big Data (5 «V»):
- Volume (объём) – хранение и обработка петабайт–экзабайт данных;
- Velocity (скорость) – приём и обработка потоковых данных в реальном времени;
- Variety (разнообразие) – структурированные, полуструктурированные и неструктурированные источники;
- Veracity (достоверность) – обеспечение качества, чистоты и целостности данных;
- Value (ценность) – быстрое извлечение бизнес‑инсайтов.
Основные слои стека Big Data:
- Sources (источники): реляционные/NoSQL‑БД, API, логи, IoT‑датчики, файлы, очереди сообщений.
- Ingest (погружение): пакетная (Spark, Hadoop MapReduce) и потоковая (Spark Streaming, Kafka Streams, Flink) загрузка.
- Storage (хранилище): «озеро» (Data Lake) на S3/Object Storage + табличные форматы (Hudi, Iceberg, Delta), специализированные базы (HBase, Cassandra).
- Compute (вычисления): распределённый SQL‑движок (Trino/Presto), фреймворки для ML и ETL (Spark, Flink), графовые движки.
- Serving / Speed Layer: быстрые OLAP‑решения (ClickHouse, Druid) для низколатентных запросов.
- BI & Visualization: дашборды и отчёты (Tableau, Looker, Metabase, Yandex DataLens).
- Orchestration & Infra: планирование и автоматизация (Airflow, Kubernetes, Terraform), мониторинг (Prometheus, Grafana), каталог метаданных (Glue, OpenMetadata).
Каждый уровень стека тесно интегрируется по следующим принципам:
- Масштабируемость: линейное добавление ресурсов.
- Устойчивость: отказоустойчивые кластеры и репликация.
- Гибкость: возможность подменять компоненты без глобального рефакторинга.
- Автоматизация: единые пайплайны CI/CD для кода и инфраструктуры.
Такой модульный подход позволяет строить конвейеры от источника до отчёта, адаптируя стек под бизнес‑задачи и технологические требования компании.
Архитектурные слои типового Big Data‑стека
Ниже мы разберём основные слои, через которые проходит типичный поток данных в компании. Представьте себе процесс, как конвейер на фабрике: в начале приходят «сырьё» (данные), дальше оно обрабатывается, хранится, рассчитывается аналитика и, наконец, покупатель (или аналитик) видит готовый «товар» (отчёт или дашборд).
1. Источники (Sources)
Что это?
Все точки, где данные изначально генерируются.
Пример:
- Пользователь заходит на сайт или в мобильное приложение и кликает на товар.
- Загрузка фотографий товаров от поставщиков.
- Интеграция с логистическими партнёрами (API для статуса отправлений).
- Системы оплаты (банк‑эквайринг).
Простой язык: это как разные краны на кухне – один даёт воду из колодца (сайт), другой – фильтрованную воду (партнёрские API), третий – газированную (сервисы платежей).
2. Приём и сбор данных (Ingest)
Что это?
Компоненты, которые «ловят» эти данные в реальном времени или пакетно и доставляют дальше.
Как бывает устроено:
- Stream‑инжест (Kafka, AWS Kinesis, Google Pub/Sub) – для кликов пользователей и логов.
- Batch‑ингест (Airflow + Sqoop, Spark‑job, AWS Glue) – для периодического импорта ценников из ERP-системы.
Жизненный пример: представьте, что вы ставите сетчатое сито под краны: мелкие частицы (нужные данные) проходят через сито дальше, а всё лишнее отбрасывается.
3. Хранилище (Storage)
Что это?
Где данные «ждут», пока их не возьмут для обработки.
Основные виды:
- Data Lake (HDFS, S3, Google Cloud Storage) – «сырой» склад: хранятся всё подряд (логи, json‑файлы, csv).
- Data Warehouse (Amazon Redshift, Snowflake, Google BigQuery) – «чистый склад»: данные уже очищены и структурированы для аналитики.
Аналогия: похож на два склада: на одном – мешки с необработанным зерном (Data Lake), на другом – отборная мука в мешочках (Data Warehouse).
4. Обработка и вычисления (Compute)
Что это?
Где данные готовят: чистят, трансформируют, агрегируют, запускают машинное обучение.
Типовые инструменты:
- Batch‑вычисления: Apache Spark, Hadoop MapReduce.
- Stream‑вычисления: Flink, Spark Streaming, Kafka Streams.
- ML‑платформы: MLflow, SageMaker, TFX.
Пример:
- Расчёт рекомендаций: «вам может понравиться…» на основе истории просмотров.
- Подсчёт продаж по категориям за день.
- Обучение моделей прогнозирования спроса на украшения к празднику.
Упрощённо: это кухня, где из муки и других ингредиентов (данных) пекут хлеб (аналитику, модели).
5. Инструменты презентации и BI (Presentation/BI)
Что это?
Инструменты, в которых конечный пользователь (аналитик, маркетолог, менеджер) видит готовые отчёты, дашборды, визуализации.
Популярные BI‑решения:
- Tableau, Power BI, Looker, Superset.
- Self‑service‑платформы внутри компании.
Пример:
- Дашборд «Конверсия сайта»: показывает, сколько человек добавили товар в корзину и оформили заказ.
- Отчёт «Самые продаваемые товары по регионам» для руководителя по продажам.
Жизненная аналогия: это витрина магазина, где покупатель (пользователь отчёта) видит уже упакованные и красиво разложенные товары.
6. Оркестрация, инфраструктура и мониторинг (Orchestration/Infra/Monitoring)
Что это?
Набор сервисов и процессов, которые управляют всем этим «конвейером», следят, чтобы он не остановился и работал без сбоев.
- Оркестрация: Apache Airflow, Prefect, Dagster – отвечает за расписание и зависимость задач.
- Инфраструктура: кластерные менеджеры (YARN, Kubernetes), облачные сервисы (AWS EMR, GCP Dataproc).
- Мониторинг: Prometheus, Grafana, ELK‑стек – собирают метрики (время работы задач, ошибки, задержки).
Пример:
- Ежедневно в 3:00 запускается ETL‑пайплайн – Airflow проверяет успешность каждого шага, в случае сбоя отправляет уведомление в Slack.
- Kubernetes автоматически создаёт новые поды для Spark‑задач, если нагрузка растёт.
- Grafana показывает, что задержка доставки кликов в Kafka выросла выше порога, и команда SRE сразу получает оповещение.
Как мы заботимся о конвейере: представьте диспетчерскую, где операторы следят за всеми линиями, перенастраивают оборудование и мгновенно реагируют на аварии.
Краткая схема потока
Описание технологического процесса
Какие события (events) мы собираем?
- Типы событий
- Click – любое нажатие (кнопка «Добавить в корзину», «Купить», навигация по категориям).
- View (просмотр) – открытие карточки товара, просмотр баннера, пролистывание ленты.
- Add to cart (добавление в корзину) – момент, когда товар попал в корзину.
- Purchase (покупка) – успешное завершение заказа и оплата.
- Метаданные каждого события
Поле Описание Пример user_id Уникальный идентификатор пользователя u_12345session_id Сессия – группа действий за один заход на сайт sess_20250417_987timestamp Временная метка в формате ISO 8601 2025-04-17T14:22:05Zproduct_id Идентификатор товара или баннера p_67890additional Доп. поля: device (mobile/desktop), referrer (откуда пришли) mobile,google.com
Пример события в JSON:
{ "event_type": "add_to_cart", "user_id": "u_12345", "session_id": "sess_20250417_987", "timestamp": "2025-04-17T14:22:05Z", "product_id": "p_67890", "device": "mobile" }
Где и как фиксируются эти события?
- Клиентская сторона (JS‑трекинг)
- В браузере или мобильном приложении встроен код, который “подслушивает” клики/просмотры и отправляет события на сервер.
- Инструменты: Google Tag Manager, собственные SDK.
- Лог‑сервисы
- Сервер приложения тоже пишет логи (NGINX, backend‑сервисы) на дисковый кластер.
- Логи собираются через Filebeat/Logstash и сразу “продавливаются” в очередь.
- Очередь сообщений (Kafka)
- Все события попадают в топики Kafka:
clicks,views,purchases.
- Плюсы: масштабируемость, гарантированная доставка, возможность “проиграть” логи заново.
- Все события попадают в топики Kafka:
Жизненный пример:
JS‑скрипт на вашем сайте – это будильник, который мгновенно “звонит” при каждом клике и диктует его детали в Kafka.
Как из сырых логов формируется «сырое» хранилище?
- Парсинг
- Читаем сообщения из Kafka или файлов лога, разбираем JSON/строку на поля.
- Валидация
- Проверяем, что обязательные поля (user_id, timestamp, event_type) есть и валидны.
- Отбрасываем “битые” или подозрительные записи (например, события с будущей датой).
- Первичная агрегация
- По необходимости объединяем мелкие события в “сессию” или считаем счётчики на лету (например, пустые клики фильтруем).
- Запись в Data Lake
- Сохраняем чистый, но необработанный датасет в формате Parquet/ORC (каталог по дате и топику).
Какие метрики и KPI нужны бизнес‑пользователям?
- Фундаментальные показатели
- Просмотры (Views) → CTR (Click‑Through Rate) = clicks / views
- Конверсия = purchases / sessions или purchases / clicks
- ARPU (Average Revenue Per User) = total_revenue / number_of_users
- LTV (Lifetime Value) – суммарная выручка с одного клиента за всё время
- Разрезы отчётов
- Ассортимент: какие категории или SKU приносят больше кликов/продаж.
- География: регионы с наибольшей активностью.
- Время: часы, дни недели, сезонность.
Пример:
Если в понедельник на мобильном CTR упал до 1 %, это может сигнализировать о неисправности трекинга или изменении в дизайне.
Какие этапы ETL/ELT‑пайплайна?
- Ingestion – сбор и загрузка сырых логов (Kafka → Data Lake).
- Чистка (Cleaning)
- Удаление дубликатов.
- Приведение типов (строка → дата, число).
- Удаление аномалий (нелогичные timestamps).
- Обогащение (Enrichment)
- Привязка product_id → категория, бренд, цена (склеивание со справочником).
- Привязка user_id → сегмент пользователя (гость, зарегистрированный, VIP).
- Агрегация
- Считаем daily/hourly показатели: клики, просмотры, добавления в корзину.
- Сохраняем агрегаты в Speed Layer или Data Warehouse.
Где и в каком формате хранятся результирующие таблицы?
- Data Lake (долговременное хранение)
- Форматы: Apache Hudi, Iceberg, Delta Lake на S3/HDFS.
- Идеально для “исторических” данных и large‑scale анализов.
- Speed Layer (горячее хранилище)
- ClickHouse, Druid или Elasticsearch для быстрых OLAP‑запросов.
- Отвечает на вопросы “real‑time”: сколько кликов за последние 5 минут.
Как строятся дашборды и отчёты?
- Выбор источника
- OLAP‑хранилище (Snowflake, BigQuery) – для исторических отчётов.
- Speed Layer (ClickHouse) – для near‑real‑time мониторинга.
- Визуализация
- Таблицы, графики временных рядов, воронки (CTR, конверсия).
- Фильтры: по дате, региону, категории.
- Расписание обновлений
- Batch‑дашборды — раз в день в 01:00.
- Real‑time мониторинг — каждые 5 минут.
Как организовать проверку качества данных?
- Контрольные выборки
- Случайным образом сверять сырые логи с данными в хранилище.
- Мониторинг тайм‑серий
- Автоматически отслеживать разрывы или резкие скачки (меньше 10 кликов за минуту → тревога).
- Алерты
- Интеграция с Grafana/Prometheus → уведомления в Slack/почту.
Данные для выполнения заданий.
Вот Python-скрипт для генерации тестовых логов пользовательских событий с учётом:
- разных вероятностей для каждого типа события;
- добавления категорий товаров, основанных на ассортименте Lamoda;
- сохранения данных в форматах JSON и CSV.
Используйте его для генерации фейковых логов и выполнения тренировочных заданий.
📦 Установка зависимостей
Убедитесь, что установлены необходимые библиотеки
pip install faker pandas
🐍 Обновлённый скрипт генерации логов
import random
import json
import pandas as pd
from faker import Faker
from datetime import datetime, timedelta
fake = Faker()
# Типы событий с соответствующими вероятностями
EVENT_TYPES = ["click", "view", "add_to_cart", "purchase"]
EVENT_WEIGHTS = [0.4, 0.3, 0.2, 0.1] # click: 40%, view: 30%, add_to_cart: 20%, purchase: 10%
# Устройства и источники перехода
DEVICES = ["mobile", "desktop"]
REFERRERS = ["google.com", "yandex.ru", "facebook.com", "instagram.com", "direct", "email"]
# Категории товаров на основе ассортимента Lamoda
CATEGORIES = [
"Одежда", "Обувь", "Аксессуары", "Красота", "Дом", "Спорт", "Premium", "Resale"
]
# Генерация списка товаров с уникальными идентификаторами и категориями
PRODUCTS = [
{"product_id": f"p_{i}", "category": random.choice(CATEGORIES)}
for i in range(1000, 1100)
]
# Список пользователей
USERS = [f"u_{i}" for i in range(10000, 10100)]
def generate_event():
user_id = random.choice(USERS)
session_id = f"sess_{datetime.utcnow().strftime('%Y%m%d')}_{random.randint(100, 999)}"
timestamp = (datetime.utcnow() - timedelta(seconds=random.randint(0, 86400))).isoformat() + "Z"
product = random.choice(PRODUCTS)
event_type = random.choices(EVENT_TYPES, weights=EVENT_WEIGHTS, k=1)[0]
device = random.choice(DEVICES)
referrer = random.choice(REFERRERS)
event = {
"event_type": event_type,
"user_id": user_id,
"session_id": session_id,
"timestamp": timestamp,
"product_id": product["product_id"],
"category": product["category"],
"device": device,
"referrer": referrer
}
return event
# Генерация N событий
def generate_events(n=1000):
return [generate_event() for _ in range(n)]
# Сохранение в JSON
def save_to_json(events, filename="events.json"):
with open(filename, "w", encoding="utf-8") as f:
json.dump(events, f, ensure_ascii=False, indent=2)
# Сохранение в CSV
def save_to_csv(events, filename="events.csv"):
df = pd.DataFrame(events)
df.to_csv(filename, index=False)
if __name__ == "__main__":
events = generate_events(1000)
save_to_json(events)
save_to_csv(events)
print("✔ Логи успешно сгенерированы и сохранены в файлы 'events.json' и 'events.csv'")
### 📁 Пример строки в `CSV`
event_type,user_id,session_id,timestamp,product_id,category,device,referrer
click,u_10023,sess_20250417_442,2025-04-17T13:45:23Z,p_1035,Обувь,desktop,yandexru
Задания (низкий порог входа)
- SQL‑челлендж на готовом CSV‑журнале кликов
- Задача 1: посчитать общее число кликов и уникальных сессий.
SELECT COUNT(*) AS total_clicks, COUNT(DISTINCT session_id) AS unique_sessions FROM clicks_csv; - Задача 2: найти топ‑5 самых популярных товаров по кликам.
SELECT product_id, COUNT(*) AS clicks FROM clicks_csv GROUP BY product_id ORDER BY clicks DESC LIMIT 5;
- Задача 1: посчитать общее число кликов и уникальных сессий.
- Построить простой отчёт в Metabase/Excel
- Что сделать:
- Построить график «клики и продажи по дням».
- Добавить интерактивный фильтр по категории товара.
- Польза: научиться работать с дашборд‑инструментом, увидеть, как одни и те же данные можно представить разными способами.
- Что сделать:
- Распределить KPI‑метрики на листе бумаги/доске
- Задача: нарисовать схему, какие поля из событий нужны для расчёта:
- CTR:
viewsиclicks - Конверсия:
clicksиpurchases
- CTR:
- Цель: понять взаимосвязь данных и метрик до погружения в код.
- Задача: нарисовать схему, какие поля из событий нужны для расчёта:
Роли и зоны ответственности в процессе работы с Big Data
Data Engineer
Задачи
- Сбор данных
- Настройка потоков — Kafka, API-коннекторы, файловые загрузки.
- Пример: пишете коннектор на Python, который раз в час вычитывает CSV от логистов и шлёт в Kafka.
- Очистка и трансформация
- Парсинг сырых событий, удаление дубликатов, приведение типов.
- Пример: фильтруете «битые» JSON-события с некорректным timestamp.
- Построение ETL/ELT‑пайплайнов
- Пишете DAG в Airflow: ingestion → cleaning → enrichment → загрузка в DW.
- Параметризуете расписание (ежедневно в 2:00), retry-политику, уведомления.
- Деплой и поддержка
- Конфигурируете Kubernetes‑Job или EMR‑кластеры, чтобы скрипты запускались автоматически.
- Следите, чтобы при росте данных масштабирование работало без ручных правок.
Инструменты
- Spark/Flink — крупномасштабная обработка.
- Trino — интерактивные ad‑hoc‑запросы по Data Lake.
- Airflow/Dagster — оркестрация.
- Kubernetes/EMR/Dataproc — инфраструктура выполнения.
Артефакты
- Таблицы в Data Lake (Parquet/Hudi/Iceberg).
- Скрипты ETL на Python/Scala/SQL.
- DAG‑файлы (Python-модули или YAML).
- Документация: READMEs, схемы полей, инструкции по запуску.
Взаимодействие
- с DevOps — согласование CI/CD‑pipeline, контейнеризация, секреты и учётные данные.
- с Data Analyst — контракт на формат таблиц, SLA свежести данных, обзор бизнес‑терминов.
- с Data Steward — описание полей в каталоге, управление версионированием схем.
Data Analyst
Основные обязанности
- Анализ и отчёты
- Строит дашборды по ключевым метрикам (CTR, конверсия, ARPU).
- Пишет ad‑hoc‑запросы для маркетологов или продукта: «Как изменился ARPU после рассылки?»
- Визуализация
- Настраивает графики временных рядов, воронки, картограммы регионов.
- Пример: в Metabase делает дашборд «Динамика продаж по городам» с фильтрами.
Инструменты
- ClickHouse, Snowflake, BigQuery — для быстрых OLAP‑запросов.
- Metabase, Tableau, Power BI — BI‑конструкторы.
- Excel/Google Sheets, Jupyter — для ad‑hoc‑отчётов.
- SQL (+ Python/R при необходимости).
Какие метрики и дашборды строит
- Конверсия: users→sessions→purchases.
- Воронки: просмотры→клики→добавления в корзину→покупки.
- ARPU и LTV: анализ распределения выручки по когорте.
Где берёт данные и как проверяет качество
- Источник: таблицы в DW или Speed Layer, подготовленные Data Engineer.
- Проверка:
- Сравнивает итоговые суммы с «сырыми» логами.
- Строит контрольные выборки (результаты за вчера vs за неделю назад).
- Настраивает простые правила (например, если кликов < 100/минуту → алерт).
DevOps
Зона ответственности
- Инфраструктура
- Кластеры для Spark, Trino, ClickHouse, Airflow, Kubernetes.
- CI/CD
- Собирает образы контейнеров, пишет пайплайны в Terraform/Helm charts.
- Автоматизирует тестирование ETL‑скриптов (unit и интеграционные).
- Мониторинг и авто‑скейлинг
- Настраивает Prometheus + Grafana → собирает метрики задач, узлов кластеров.
- Пишет правила автоскейлинга: при CPU > 70 % добавлять ноды.
Инструменты и практики
- Terraform, Helm — инфраструктура как код.
- GitLab CI, Jenkins, Argo CD — деплой.
- Prometheus, Grafana, Alertmanager — мониторинг и оповещения.
- SRE‑подход: SLO/SLI, run‑books, post‑mortems.
Взаимодействие с Data Engineer
- Утверждают формат Docker‑образов и переменных окружения.
- Настраивают секреты и доступы (IAM, Vault).
- Организуют уведомления о падении пайплайнов (Slack, Email).
Data Steward
Ключевые задачи
- Метаданные и каталог
- Поддерживает бизнес‑глоссарий, описания полей, схемы в OpenMetadata/Glue.
- Data Governance
- Управляет правами доступа, классифицирует данные (PII, GDPR).
- Качество данных
- Запускает профилирование (частота NULL, статистики распределения).
- Фиксирует SLA на задержку обновления.
Инструменты
- OpenMetadata, Glue Data Catalog, Alation — хранение метаданных и lineage.
- Great Expectations, Deequ — автоматические проверки качества.
- Collibra, Erwin — корпоративное управление политиками.
Lineage и коммуникация
- Строит граф: от сырых логов к конечным дашбордам.
- Вовлекает Data Engineer (изменение схем), Data Analyst (новые поля) и юридический отдел (регуляции).
- Обрабатывает запросы на доступ: кто, зачем, на какой срок.
Задание 1. Короткий устный опрос (5 минут)
Разбейтесь на пары. Один задаёт вопросы, другой отвечает, затем меняетесь ролями. По очереди задайте друг другу:
- «Кто настроит, чтобы по ночам автоматически приходили новые данные?»
- «Кто нарисует график продаж в Metabase?»
- «Кому написать, если в описании таблицы пропали значения?»
- «Кто включит новые серверы, если нагрузка сильно выросла?»
Цель: прогреть память и проверить понимание ролей.
Задание 2. Практика: RACI‑таблица для простого процесса (20 минут)
Что такое RACI? Это таблица, где для каждого шага процесса мы указываем:
- R (Responsible) — кто выполняет работу;
- A (Accountable) — кто отвечает за итог;
- C (Consulted) — с кем советуются;
- I (Informed) — кого ставят в известность.
Сценарий:
Процесс «1) Сбор кликов → 2) Чистка и подготовка → 3) Загрузка в хранилище → 4) Публикация дашборда».
Ваша задача: заполнить таблицу, выбрав роли из списка: Data Engineer, Data Analyst, DevOps, Data Steward.
| Шаг процесса | R | A | C | I |
|---|---|---|---|---|
| 1. Сбор кликов из сайта | ||||
| 2. Чистка и проверка данных | ||||
| 3. Загрузка готовых таблиц в хранилище | ||||
| 4. Обновление дашборда в Metabase |
После выполнения обсудите в группе: всё ли однозначно, кому что назначить.
Задание 3. Практика: карточки‑ролей (15 минут)
- Каждому участнику выдаётся карточка с описанием одной роли (Data Engineer, Data Analyst, DevOps, Data Steward).
- Внутри группы (4 – 5 человек) обменяйтесь карточками и прочитайте чужие описания.
- Постройте в группе «цепочку передачи данных» — кто после кого работает:
- Карточку «Data Engineer» кладёте первой, затем «Data Analyst», и т.д.
- Объясните друг другу, почему именно такой порядок, и какие точки взаимодействия между ролями.
Задание 4. Мини‑тест «Кто за что?» (10 минут)
Ответьте письменно на 5 вопросов (каждый — один из четырёх вариантов):
- Кто отвечает за настройку ночного обновления данных?
A) Data Engineer
B) Data Analyst
C) DevOps
D) Data Steward - Кто строит графики и дашборды для руководителя?
A) Data Engineer
B) Data Analyst
C) DevOps
D) Data Steward - Кто следит, чтобы в таблицах не было «пустых» или неверных значений?
A) Data Engineer
B) Data Analyst
C) DevOps
D) Data Steward - Кто автоматически добавит новые серверы при росте нагрузки?
A) Data Engineer
B) Data Analyst
C) DevOps
D) Data Steward - Кто согласует изменения в описании колонок (metadata)?
A) Data Engineer
B) Data Analyst
C) DevOps
D) Data Steward
Цель: самостоятельно проверить и сразу обсудить правильные ответы.
Источники данных
Object Storage
Принцип «ключ–значение»
- Объект = блоб (blob) данных + его ключ (уникальное имя) + метаданные.
- Представьте: вы храните семейные фото не в папках, а в наборе пронумерованных ячеек в шкафу. Ячейка имеет свой номер (ключ), а внутри – любой файл (объект).
Чем отличается от файловых и блочных хранилищ
| Хранилище | Структура | Когда удобно |
|---|---|---|
| Файловая система | Файлы и папки | Когда нужна иерархия папок, блокировка файлов |
| Блочное хранилище | Блоки одинакового размера | Для виртуальных дисков, где операционная система формирует файловую систему поверх блока |
| Объектное хранилище | Ключ–объект (нет папок!) | Для больших объёмов: логи, бэкапы, статические файлы сайта |
Архитектура и сравнение Yandex Cloud Object Storage vs AWS S3
| Параметр | AWS S3 | Yandex Cloud Object Storage |
|---|---|---|
| Совместимость API | Полная S3‑совместимость | Почти полная S3‑совместимость |
| Региональность (regions) | 25+ географических зон (например, us-east-1, eu-central-1) |
Зоны в России и Европе (например, ru-central1, eu-central-1) |
| SLA (гарантии доступности) | 99.9 % (Standard), 99.99 % (Standard‑IA) | 99.9 % (Basic), 99.95 % (Enhanced) |
| Ценообразование | плата за хранение, за PUT/GET‑запросы, за исходящий трафик | аналогично, но цены и пороги могут отличаться по регионам |
Совет новичку: выбирайте регион ближе к вашим серверам и пользователям — так будет меньше задержка и дешевле трафик.
Основные концепции и операции
- Bucket (контейнер)
- Как «коробка» для объектов. Имя должно быть уникальным в пределах провайдера.
- Object
- Состоит из:
- ключа (например,
logs/2025-04-17/events.csv), - тела (сам файл),
- метаданных (дата создания, размер, собственные теги).
- ключа (например,
- Состоит из:
- Versioning (версии)
- Если включить, старые версии объекта не удаляются автоматически, можно откатиться к предыдущему состоянию.
- CRUD‑операции
- PUT – загрузить объект;
- GET – скачать объект;
- DELETE – удалить;
- LIST – получить список ключей (с поддержкой префиксов и пагинации).
- Префиксы и «псевдодиректории»
- Хотя папок нет, ключи с общим началом (
images/,logs/) позволяют группировать объекты.
- Хотя папок нет, ключи с общим началом (
- Lifecycle‑правила
- Автоматизация: через настройки можно переводить старые объекты в «холодное» хранение или удалять их по расписанию.
Интеграция с ETL/SQL‑движками
- Spark
- Через
spark-hadoop-connectorчитаем и пишем объекты S3/YCS:df = spark.read.csv("s3a://my-bucket/logs/2025-*.csv")
- Через
- Trino / Hive
- Подключаем S3‑коннектор в конфигурации сервера, работаем как с любой таблицей.
- Табличные форматы
- Храним данные в Parquet или ORC, а ещё в дата‑лейках типа Iceberg и Hudi для версионирования и оптимизации.
Безопасность и доступ
- IAM‑политики и роли
- Задают, кто может читать/писать/удалять объекты.
- Подписи запросов (SigV4)
- При прямом HTTP‑доступе к S3 каждый запрос подписывается ключом и секретом.
- Временные токены (STS)
- Вместо постоянных ключей выдаём короткоживущие креденшалы (удобно для сервисов).
- Шифрование
- На сервере: SSE‑S3 (ключ провайдера), SSE‑KMS (ваш ключ в KMS).
- На клиенте: вы шифруете данные перед отправкой.
Best Practices
- Префиксы для равномерной нагрузки
- Разбивайте ключи:
logs/2025/04/17/…вместоlogs/20250417…, чтобы запросы не «накапливались» на одном месте.
- Разбивайте ключи:
- Lifecycle‑правила
- Горячие данные (последние 30 дней) оставляйте в Standard.
- Старше – переводите в «холодное» хранилище или архив (Glacier/Cold HDD).
- Оптимизация загрузок
- Small‑objects (< 5 МБ): мелкие файлы лучше группировать перед загрузкой.
- Large‑objects (> 100 МБ): используйте multipart‑upload, чтобы параллельно загружать части.
Практические задания (низкий порог входа)
Выберите среду: AWS или Yandex Cloud. Везде используется единый S3‑совместимый интерфейс.
Задание 1. Работа через CLI/консоль
- Создать bucket:
- AWS:
aws s3 mb s3://my-test-bucket-$(date +%s) - Yandex Cloud:
yc storage bucket create --name my-test-bucket --default-storage-class standard
- AWS:
- Загрузить набор CSV‑логов (скачайте примеры из репозитория курса):
aws s3 cp logs/ s3://my-test-bucket/logs/ --recursive - Перечислить объекты:
aws s3 ls s3://my-test-bucket/logs/ - Скачать один файл локально:
aws s3 cp s3://my-test-bucket/logs/2025-04-17.csv ./2025-04-17.csv
Задание 2. Мини‑скрипт на Python (boto3 или yc‑sdk)
- Установите библиотеку:
pip install boto3 pandas - Напишите
script.py:import boto3 import pandas as pd # Настройка клиента (AWS или Yandex) s3 = boto3.client('s3', aws_access_key_id='ВАШ_КЛЮЧ', aws_secret_access_key='ВАШ_СЕКРЕТ', endpoint_url='https://storage.yandexcloud.net' # для Yandex ) bucket = 'my-test-bucket' prefix = 'logs/' # Получить список объектов objs = s3.list_objects_v2(Bucket=bucket, Prefix=prefix) keys = [item['Key'] for item in objs.get('Contents', [])] print("Найдено объектов:", len(keys)) # Прочитать первый CSV в pandas first_key = keys[0] obj = s3.get_object(Bucket=bucket, Key=first_key) df = pd.read_csv(obj['Body']) print(df.head()) - Запустите и убедитесь, что данные считались.
Задание 3. Spark‑бомбардировка
- В окружении с PySpark (локально или в кластере):
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("ReadS3") \ .config("spark.hadoop.fs.s3a.endpoint", "https://storage.yandexcloud.net") \ .config("spark.hadoop.fs.s3a.access.key", "ВАШ_КЛЮЧ") \ .config("spark.hadoop.fs.s3a.secret.key", "ВАШ_СЕКРЕТ") \ .getOrCreate() # Чтение CSV прямо из bucket df = spark.read.csv("s3a://my-test-bucket/logs/*.csv", header=True, inferSchema=True) print("Всего записей:", df.count()) - Проверьте, что Spark видит все файлы и возвращает общее число строк.
Реляционные базы данных
Представьте себе обычную табличку в Excel:
| id | name | age |
|---|---|---|
| 1 | Анна | 28 |
| 2 | Борис | 35 |
| 3 | Виктория | 22 |
- Таблица – это как лист Excel.
- Строка – одна запись, как строка в таблице («Анна, 28»).
- Столбец – одно свойство, например «name» или «age».
- СУБД (система управления базами данных) – программа, которая хранит такие таблицы и позволяет быстро находить, изменять и связывать их между собой.
Почему «реляционная»?
«Реляционная» означает, что таблицы связаны друг с другом через специальные поля (ключи). Например, есть две таблицы:
customers
| customer_id | name |
|---|---|
| 1 | Анна |
| 2 | Борис |
orders
| order_id | customer_id | total |
|---|---|---|
| 10 | 1 | 2500 |
| 11 | 2 | 1800 |
| 12 | 1 | 3900 |
Здесь поле customer_id связывает заказы с покупателями: заказ №10 принадлежит Анне.
Зачем нужны реляционные СУБД?
- Надёжность транзакций (ACID)
– Если вы переводите деньги, важно, чтобы либо обе части операции прошли, либо ни одна. Аналогично, в базе «всё или ничего». - Целостность данных
– База сама следит, чтобы вы не вставили заказ с несуществующимcustomer_id. - OLTP‑нагрузки
– Быстрые операции: вставить новый заказ, обновить статус, проверить баланс. - Лёгкие OLAP‑выборки
– Небольшие отчёты: сколько заказов было вчера, средний чек и т.п.
Коротко о трёх популярных СУБД
| СУБД | Простыми словами |
|---|---|
| PostgreSQL | «Умная» база: можно хранить в ячейках даже целые документы (JSON), добавлять собственные функции. |
| MySQL | «Быстрая» база: отличная для сайтов, где важно быстро читать информацию (например, списки товаров). |
| Oracle | «Тяжёлая артиллерия»: поддерживает сразу несколько серверов в кластере, очень защищена, но сложнее в настройке. |
Как правильно проектировать таблицы (моделирование данных)
- Нормализация (1NF–3NF)
– Не повторяйте одни и те же данные в разных местах.
– Пример: вместо того чтобы в таблице заказов дублировать имя покупателя, хранитеcustomer_id, а имя берите из таблицыcustomers. - Денормализация
– Иногда копия данных ускоряет чтение. Например, вы можете сразу добавитьcustomer_nameвorders, чтобы не делать JOIN, но тогда надо помнить об обновлениях. - Схема «звезда» для отчётов
- Таблица фактов – основная, где хранятся числа (суммы заказов).
- Таблицы-измерения – вокруг неё как лучи звезды:
dim_customers,dim_date,dim_products.
Как реляционные БД вписываются в Big Data‑стек
- CDC‑стриминг (Debezium → Kafka)
– Каждый раз, когда в таблице меняются строки, появляется сообщение в очереди. - Доступ из Spark/Trino
– Через JDBC (как подключиться к СУБД из Java‑программы) читаем таблицы прямо в большой кластер. - Дампы (резервные копии)
–pg_dumpилиmysqldumpсохраняют всю базу в файл, который потом можно восстановить.
Ежедневные операции с базами
- Запуск и настройка
– Локально: легко через Docker, чтобы потренироваться.
– В облаке: сервис сам поддерживает базу (AWS RDS, Yandex Managed Service). - Бэкапы и восстановление
– Делаем полную копию раз в день и журналы изменений, чтобы откатиться на любой момент. - Мониторинг
– Смотрим: сколько сейчас подключений, не слишком ли долго выполняются запросы, нет ли «узких мест» (locks).
Безопасность и права доступа
- Пользователи и роли
– Как раздавать ключи от дверей: кто может читать данные, кто добавлять, кто изменять структуру. - Шифрование
– Данные «на диске» и в пути (между вашим компьютером и сервером) шифруются, чтобы никто не подсмотрел. - Аудит
– Запись того, кто и когда поменял что в базе.
Задание 1. Локальное развертывание СУБД (Docker, 10 минут)
- Выберите СУБД: PostgreSQL или MySQL.
- Запустите контейнер:
# PostgreSQL docker run -d --name pg-test -e POSTGRES_PASSWORD=secret -p 5432:5432 postgres # MySQL docker run -d --name mysql-test -e MYSQL_ROOT_PASSWORD=secret -p 3306:3306 mysql - Подключитесь через командную строку или DBeaver.
- Проверьте: создайте базу
testdbи выполнитеSELECT version();.
Задание 2. Базовые SQL‑операции (15 минут)
- Создайте таблицы:
-- для PostgreSQL CREATE TABLE customers ( id SERIAL PRIMARY KEY, name TEXT, signup_date DATE ); CREATE TABLE orders ( id SERIAL PRIMARY KEY, customer_id INT REFERENCES customers(id), total NUMERIC(10,2), order_date DATE ); - Заполните 5–10 строк командой
INSERT. - Напишите и выполните запрос с
JOIN, например:SELECT c.name, o.total FROM customers c JOIN orders o ON c.id = o.customer_id WHERE o.order_date >= '2025-04-01'; - Проверьте план через
EXPLAIN(одно нажатие), чтобы увидеть, используются ли индексы.
Задание 3. Интеграция через JDBC в Spark (10 минут)
- В файле Spark‑сессии (
pysparkилиspark-shell) добавьте:df = spark.read \ .format("jdbc") \ .option("url", "jdbc:postgresql://localhost:5432/testdb") \ .option("dbtable", "orders") \ .option("user", "postgres") \ .option("password", "secret") \ .load() print("Записей в orders:", df.count()) - Запустите и убедитесь, что Spark видит таблицу и возвращает число строк.
Задание 4 (опционально для продвинутых). CDC → Kafka с Debezium (20 минут)
- Запустите Debezium‑коннектор в Docker, подключив его к вашему PostgreSQL/MySQL.
- Вставьте новую строку через SQL.
- Проверьте в топике Kafka: должно появиться сообщение с данными вставки.
NoSQL & Search: Elasticsearch, HBase, ClickHouse
Зачем нужны NoSQL и поисковые движки?
- Когда реляционных БД мало
– Очень быстро записывать тысячи сообщений лога в секунду;
– Делать свободный поиск по тексту (например, найти все товары, где в описании встречается «хлопок»);
– Запускать аналитические запросы по большим объёмам данных ближе к реальному времени. - Паттерны доступа
- Full‑text search (поиск по словам и фразам).
- Fast writes (быстрые вставки новых записей).
- OLAP‑запросы (сводные отчёты, агрегации).
Как выбрать NoSQL‑хранилище?
- Документное (Elasticsearch)
– Хранит JSON‑документы, хорошо ищет по тексту. - Ширококолоночное (HBase)
– Таблицы с «супер‑столбцами» для огромных наборов строк, оптимально для time‑series. - Колоночное OLAP (ClickHouse)
– Хранит данные по столбцам, очень быстро считает агрегаты.
Кратко про каждый инструмент
Elasticsearch
- Что внутри
- Индекс = аналог папки, где лежат документы.
- Шард = кусок индекса (для масштабирования).
- Реплика = копия, чтобы выдерживать сбои.
- Inverted index = список «в каких документах встречается слово».
- Сценарии
- Лог‑аналитика (смотрим, какие ошибки выпали в приложении).
- Поиск по товарным карточкам.
- Быстрые сводки (агрегации по категориям, регионам).
- Как подключить
- Filebeat или Logstash → Kafka → Elasticsearch.
- Дашборды в Kibana.
HBase
- Что внутри
- Хранит данные в HDFS (как большой файловый склад).
- RegionServer = процесс, который обслуживает часть таблицы.
- Master = координирует RegionServer‑ы.
- Когда использовать
- Огромные таблицы (миллиарды строк).
- Чтение/запись по ключу (например, история кликов пользователя).
- Как подключить
- Spark‑коннектор читает/пишет прямо в HBase.
- Trino может читать таблицы HBase.
ClickHouse
- Что внутри
- Хранение по столбцам → быстрее читать отдельные поля.
- Движки MergeTree: автоматически сортируют и сливают файлы.
- Партиции по дате (разбитие данных на месяцы/дни).
- Сценарии
- Speed Layer: real‑time сводки за последние минуты/часы.
- OLAP‑отчёты с миллионами строк.
- Как подключить
- Запись из Airflow (batch) или Kafka (stream).
- BI‑инструменты через JDBC.
Задание 1. Elasticsearch Quick Start
- Запустить Docker:
docker run -d --name es -p 9200:9200 -e "discovery.type=single-node" elasticsearch:8.8.1 docker run -d --name kibana -p 5601:5601 --link es kibana:8.8.1 - Создать индекс:
curl -X PUT "localhost:9200/products?pretty" - Добавить 5 JSON‑документов:
curl -X POST "localhost:9200/products/_doc/1" -H 'Content-Type: application/json' -d' { "id": 1, "name": "Футболка хлопок", "category": "Одежда" }'(повторить для id=2…5)
- Поиск по тексту:
curl -X GET "localhost:9200/products/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "match": { "name": "хлопок" } } }' - Агрегация по категории:
curl -X GET "localhost:9200/products/_search?pretty" -H 'Content-Type: application/json' -d' { "size": 0, "aggs": { "by_category": { "terms": { "field": "category.keyword" } } } }'
Задание 2. HBase Hello‑World
- Запустить Docker:
docker run -d --name hbase -p 16010:16010 harisekhon/hbase - Открыть shell:
docker exec -it hbase bash hbase shell - Создать таблицу:
create 'events:clicks', 'cf' - Вставить строки:
put 'events:clicks', 'row1', 'cf:data', 'user1,2025-04-17T10:00' put 'events:clicks', 'row2', 'cf:data', 'user2,2025-04-17T10:05' - Прочитать данные:
scan 'events:clicks'
Задание 3. ClickHouse Simple OLAP
- Запустить Docker:
docker run -d --name clickhouse -p 8123:8123 clickhouse/clickhouse-server - Создать таблицу:
CREATE TABLE default.sales ( sale_date Date, product String, amount UInt32 ) ENGINE = MergeTree() PARTITION BY toYYYYMM(sale_date) ORDER BY (sale_date); - Загрузить CSV (файл
sales.csv):curl 'http://localhost:8123/?query=INSERT%20INTO%20default.sales%20FORMAT%20CSV' --data-binary @sales.csv - Посчитать продажи по дате:
SELECT sale_date, sum(amount) AS total FROM default.sales GROUP BY sale_date ORDER BY sale_date;
Structured explanation for BigQuery
Для раздела про «Data‑warehouse/SaaS: Google BigQuery», мне нужно создать детальную теоретическую часть с простыми объяснениями. Я начну с введения: что такое хранилище данных и что такое BigQuery. Каждую тему (компоненты, загрузка данных, SQL Engine и т.п.) нужно описать понятным языком, с примерами и аналогиями. Готов добавить простые схемы или фрагменты кода с объяснениями. Важно подобрать практические задания и мини-задачи, с реальными действиями, а не рекомендациями, чтобы сделать процесс обучения более наглядным.
Data‑warehouse/SaaS: Google BigQuery
Зачем BigQuery?
- Serverless‑архитектура
Представьте, что вам не нужно покупать и настраивать серверы — вы просто платите за обработанные данные. Как такси вместо личного автомобиля: не нужно думать о ремонте и парковке, платите только за поездку. - Отдельное хранилище и вычисления
Хранить данные можно дешево и надёжно, а за вычисления (запросы) — платить отдельно.- Pay‑per‑query: платите, когда запускаете запрос, по объёму прочитанных терабайт.
- Flat‑rate slots: фиксированная ежемесячная плата за «бронь» определённого числа параллельных вычислительных юнитов (slots), если у вас много запросов.
Основные компоненты BigQuery
- Dataset
Как папка в Google Drive — внутри неё лежат таблицы. - Table
Как файл Excel — набранная строка за строкой таблица с названиями колонок. - Partition
Разбиение таблицы на «куски» по дате (или другому полю).- Преимущество: запрос читает только нужный «кусок», а не всю таблицу.
- Cluster
Сортировка внутри партиции по выбранным колонкам, как индекс, помогает ещё быстрее искать. - Slots и Reservation
- Slot — единица параллельной обработки (как повар, который готовит один заказ).
- Reservation — бронирование определённого числа «поваров» под ваши задачи, чтобы они не ушли к чужим «заказам».
Загрузка данных
- Batch‑import (пакетное)
- Загрузить CSV/JSON в Google Cloud Storage (GCS) или прямо из локального файла через веб‑интерфейс.
- В UI или с помощью
bq loadуказать, в какую таблицу и с какими настройками (разделитель, кодировка) загрузить.
- Streaming inserts (поток)
– По одной строке через API (например, из приложения), чтобы сразу можно было запросить свежие данные без ожидания пакета.
SQL‑движок BigQuery
- ANSI SQL
Полный набор стандартных команд (SELECT, JOIN, GROUP BY). - Аналитические функции
– WINDOW: например,ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY event_time)нумерует события внутри каждого пользователя.
– ARRAY: можно собирать список значений в одну ячейку, например все продукты в одном заказе. - Federated queries (внешние таблицы)
– Запросы прямо к данным в GCS или Cloud SQL без предварительной загрузки в BigQuery.
Стоимость и оптимизация
- Хранение
– Около $0.02 за GB в месяц.
– Оплата за фактический объём: если таблица 100 GB, это $2 в месяц. - Обработка
– $5 за TB прочитанных данных при on‑demand.
– Сократить стоимость помогут:- Partitioned tables (читаем только нужный период).
- Clustered tables (читаете только нужные столбцы быстрее).
- Materialized views (предварительно посчитанные результаты).
- Query optimizer автоматически выбирает лучший план выполнения.
Безопасность и управление доступом
- IAM‑роли
– Наборы прав: кто может читать таблицы, кто — писать или администрировать. - Authorized views
– Представления, через которые пользователи видят только разрешённые колонки или строки. - VPC Service Controls
– Ограничивают доступ к BigQuery из заданных сетей и сервисов. - Шифрование
– На стороне Google (по‑умолчанию) и возможность «принести свой ключ» (BYOK) для шифрования данных.
Интеграция с остальным стеком
- ELT‑пайплайны
– Dataflow или Airflow запускают загрузку и трансформации, результат пишет в BigQuery. - BI‑коннекторы
– Google Data Studio, Looker или Metabase напрямую читают таблицы и строят дашборды. - Дополнительные инструменты
– Dataproc/Spark может читать из BigQuery и писать обратно.
Задание 1. Console Quickstart
- Создать проект в Google Cloud (если ещё нет).
- Перейти в BigQuery UI: нажать «Создать Dataset», дать ему имя
my_dataset. - Загрузить CSV
- В разделе
my_datasetнажать «Create table» → Upload, выбрать файлsample.csvиз репозитория. - Указать разделитель (запятая), в первом ряду заголовки.
- В разделе
- Выполнить запрос:
SELECT category, COUNT(*) AS cnt FROM `PROJECT_ID.my_dataset.sample` WHERE event_date >= '2025-04-01' GROUP BY category ORDER BY cnt DESC; - Посмотрите объём прочитанных данных — это и будет ваша плата за запрос (MB, GB).
Задание 2. Работа через bq CLI
- Установить и инициализировать CLI:
pip install google-cloud-bigquery gcloud init gcloud components install bq - Создать dataset:
bq mk my_dataset - Загрузить CSV:
bq load --autodetect --source_format=CSV \ my_dataset.sample_cli gs://my-bucket/sample.csv - Показать схему:
bq show --schema --format=prettyjson my_dataset.sample_cli
Задание 3. Python + pandas
- Установить пакет:
pip install google-cloud-bigquery pandas - Написать
query.py:from google.cloud import bigquery import pandas as pd client = bigquery.Client() sql = """ SELECT category, COUNT(*) AS cnt FROM `PROJECT_ID.my_dataset.sample` GROUP BY category ORDER BY cnt DESC LIMIT 5 """ df = client.query(sql).to_dataframe() print(df.head()) - Запустить и убедиться, что вы видите первые пять строк результата в виде DataFrame.
Change Data Capture: Debezium (CDC‑коннекторы к Kafka)
Что такое CDC и зачем оно нужно?
- CDC (Change Data Capture) – это механизм, который следит за тем, что происходит в базе (вставки, обновления, удаления) и моментально передаёт эти изменения дальше.
- Зачем:
- Не нужно каждую ночь полностью перекачивать таблицы – изменения приходят по кусочкам.
- Минимизируется задержка («latency») от момента, когда кто‑то нажал INSERT, до того, как данные стали доступны в других системах (аналитика, мониторинг).
Архитектура Debezium + Kafka
- Kafka Connect cluster – набор сервисов, которые запускают коннекторы.
- Debezium Connector – специальный плагин, который читает журнал изменений (binlog/WAL) из СУБД.
- Source Task – рабочий поток внутри коннектора, от которого зависит количество параллельных задач.
- Kafka topics – тема
dbserver1.ordersилиdbserver1.customers, куда летят события.
Поддерживаемые источники и механизмы
- СУБД: MySQL, PostgreSQL, Oracle, SQL Server, MongoDB.
- Как читаем изменения:
- MySQL: binlog;
- PostgreSQL: WAL;
- Коннектор запоминает, до какого места («offset») он дочитал, и при перезапуске продолжает с этого же места.
Формат сообщений и схемы
- JSON vs Avro/Protobuf + Schema Registry (хранилище схем)
- Поля сообщения:
- before – то, что было до изменения;
- after – что стало после;
- key – значение первичного ключа (например,
order_id); - transaction metadata – время, ID транзакции.
Обработка изменений схемы
- Если вы добавили новый столбец в таблицу, Schema Registry может автоматически обновить схему или потребует ручного согласования.
- downstream‑сервисы (которым приходят события) должны уметь пропускать неизвестные поля или получать уведомление об изменении.
Гарантии и отказоустойчивость
- At‑least‑once: каждое изменение придёт минимум один раз (иногда может дублироваться).
- Exactly‑once (труднее настраивать): приходит ровно одна копия.
- Масштабирование: каждый Source Task обслуживает свой набор шардов базы (или таблиц), можно увеличить число задач, чтобы читать быстрее.
Среда: локальная машина с Docker и Docker Compose.
Задание 1. Поднять Docker Compose‑стек
- Создать
docker-compose.ymlв пустой папке:version: '3.8' services: zookeeper: image: confluentinc/cp-zookeeper:7.4.0 environment: ZOOKEEPER_CLIENT_PORT: 2181 kafka: image: confluentinc/cp-kafka:7.4.0 depends_on: [zookeeper] environment: KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 connect: image: debezium/connect:2.4 depends_on: [kafka] environment: BOOTSTRAP_SERVERS: 'kafka:9092' GROUP_ID: '1' CONFIG_STORAGE_TOPIC: 'connect-configs' OFFSET_STORAGE_TOPIC: 'connect-offsets' STATUS_STORAGE_TOPIC: 'connect-status' KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter' VALUE_CONVERTER: 'io.confluent.connect.avro.AvroConverter' VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081' PLUGIN_PATH: '/kafka/connect/debezium-connector-mysql' volumes: - ./connect-plugins:/kafka/connect/debezium-connector-mysql schema-registry: image: confluentinc/cp-schema-registry:7.4.0 depends_on: [kafka] environment: SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:9092' SCHEMA_REGISTRY_HOST_NAME: 'schema-registry' SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8081' mysql: image: mysql:8.0 environment: MYSQL_ROOT_PASSWORD: root MYSQL_USER: debezium MYSQL_PASSWORD: dbz MYSQL_DATABASE: testdb ports: - '3306:3306' - Запустить стек:
docker-compose up -d - Проверить, что все сервисы поднялись:
docker-compose ps
Задание 2. Настроить Debezium‑коннектор для MySQL
- Создать файл
connector.jsonрядом сdocker-compose.yml:{ "name": "mysql-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.name": "dbserver1", "database.include.list": "testdb", "table.include.list": "testdb.orders", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "schema-changes.testdb" } } - Подать конфигурацию в Connect API:
curl -X POST -H "Content-Type: application/json" \ --data @connector.json \ http://localhost:8083/connectors
Задание 3. Генерация и чтение событий
- Войти в MySQL:
docker exec -it $(docker-compose ps -q mysql) mysql -udebezium -pdbz testdb - Создать таблицу и генерить данные:
CREATE TABLE orders ( order_id INT PRIMARY KEY, total DECIMAL(10,2), status VARCHAR(20) ); INSERT INTO orders VALUES (1, 100.00, 'NEW'); UPDATE orders SET status='PAID' WHERE order_id=1; DELETE FROM orders WHERE order_id=1; - Читать события из Kafka:
docker exec -it $(docker-compose ps -q kafka) \ kafka-console-consumer --bootstrap-server kafka:9092 \ --topic dbserver1.testdb.orders \ --from-beginning \ --property print.key=true \ --property value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer \ --property schema.registry.url=http://schema-registry:8081
Задание 4. Python‑читатель Avro‑сообщений (опционально)
- Установить зависимости:
pip install kafka-python fastavro requests - Написать
cdc_reader.py:from kafka import KafkaConsumer from fastavro import schemaless_reader import io, requests # Получаем схему из Schema Registry resp = requests.get('http://localhost:8081/subjects/dbserver1.testdb.orders-value/versions/latest') schema = resp.json()['schema'] consumer = KafkaConsumer( 'dbserver1.testdb.orders', bootstrap_servers='localhost:9092', value_deserializer=lambda m: schemaless_reader(io.BytesIO(m), schema) ) for msg in consumer: print("Key:", msg.key, "Value:", msg.value) - Запустить и убедиться, что вы видите словари Python с полями
beforeиafter.
Подготовка теории и практических задач
Нужно подготовить раздел «Событийные источники и очередь: Apache Kafka». Виктор просит продолжить в том же стиле, с простыми объяснениями и примерами. Мы начнем с основ: что такое Kafka, для чего она используется и как устроена. Чтобы было легче понять, будем использовать аналогии, такие как почта или конвейер.
Затем разберемся с такими вопросами, как архитектура Kafka, типы доставки и гарантии, а также масштабируемость и устойчивость.
Событийные источники и очередь: Apache Kafka
Зачем нужен Kafka?
- Развязка (decoupling)
Представьте конвейер на фабрике: производственные линии (микросервисы) могут класть посылки (события) в общие ящики (topics), а потребители (аналитика, отчёты) – забирать их в своём темпе, не мешая друг другу. - Высокая пропускная способность
Kafka легко выдерживает десятки тысяч сообщений в секунду, как скоростная автомагистраль для данных. - Устойчивость к сбоям
Дублирование сообщений на разных серверах (реплики) позволяет не потерять посылки, даже если один сервер упал.
Архитектура Kafka
- Broker – отдельный сервер Kafka.
- Topic – «ящик» для посылок‑сообщений.
- Partition – внутри каждого ящика несколько секций, чтобы писать и читать параллельно.
- Replica – копии каждой секции на других брокерах для надёжности.
- Zookeeper vs KRaft – «дирижёр» кластера, который следит за тем, кто главный (leader) и кто подчинён (followers). Современные версии могут работать без Zookeeper, используя встроенный KRaft‑контроллер.
- Producer – тот, кто отправляет сообщения.
- Consumer – тот, кто читает.
- Connect API – готовые коннекторы к базам и другим системам, чтобы не писать свой код.
- Streams API – библиотека для обработки событий на лету (фильтрация, агрегация).
Гарантии доставки сообщений
- at‑most‑once – сообщение может быть потеряно, но не продублировано.
- at‑least‑once – сообщение придёт минимум один раз, может дублироваться.
- exactly‑once (EOS) – ровно одна копия, без пропусков и дубликатов.
- Offset – номер последнего прочитанного сообщения в разделе. Представьте счётчик писем в почтовом ящике: вы запомнили, сколько взяли, и при следующем визите начинаете с того же места.
Ретеншн и дедупликация
- Политики хранения (retention)
– по времени (например, держим сообщения 7 дней)
– по размеру (например, не более 100 GB) - Compacted topics
– автоматически оставляют только последнее сообщение для каждого ключа (что удобно для CDC‑сценариев: хранится текущее состояние объекта, а не вся история).
Масштабирование и отказоустойчивость
- Добавление брокеров и партиций
– больше партиций = больше параллельных «дорожек» для чтения и записи. - ISR (In-Sync Replicas)
– список реплик, которые в курсе последних изменений. - Leader election
– в каждой партиции выбирается главный «лидер», от которого другие (followers) получают копии.
Интеграция в стек
- Производители событий
– Микросервисы выкладывают события о заказах, кликах, статусах доставки в Kafka. - Downstream‑потребители
- Spark Streaming, Flink – для real‑time‑аналитики.
- Debezium Sink – записывает CDC‑события в базы.
- Custom‑приложения – мониторинг, алерты, архивация в хранилище.
Задание 1. Поднять Docker Compose‑кластер
- Создать
docker-compose.yml:version: '3.8' services: zookeeper: image: confluentinc/cp-zookeeper:7.4.0 environment: ZOOKEEPER_CLIENT_PORT: 2181 kafka: image: confluentinc/cp-kafka:7.4.0 depends_on: [zookeeper] environment: KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - Запустить:
docker-compose up -d - Убедиться, что всё работает:
docker-compose ps
Задание 2. Консольные операции
- Создать topic:
docker exec -it $(docker-compose ps -q kafka) \ kafka-topics --create --topic events --bootstrap-server kafka:9092 --partitions 3 --replication-factor 1 - Отправить сообщения:
echo -e "msg1\nmsg2\nmsg3" | \ docker exec -i $(docker-compose ps -q kafka) \ kafka-console-producer --topic events --bootstrap-server kafka:9092 - Прочитать сообщения:
docker exec -it $(docker-compose ps -q kafka) \ kafka-console-consumer --topic events --bootstrap-server kafka:9092 --from-beginning
Задание 3. Простой код‑продьюсер и консьюмер на Python
- Установить:
pip install kafka-python producer.py:from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='localhost:9092') for i in range(10): msg = f"message-{i}".encode('utf-8') producer.send('events', msg) producer.flush() print("Отправили 10 сообщений")consumer.py:from kafka import KafkaConsumer consumer = KafkaConsumer('events', bootstrap_servers='localhost:9092', auto_offset_reset='earliest', enable_auto_commit=True) for msg in consumer: print("Прочитали:", msg.value.decode('utf-8'))- Запустить оба скрипта и убедиться в обмене сообщениями.
Задание 4. Изучение производительности
- Измерить время отправки и чтения 1 000 сообщений в тех же скриптах (используйте
time.time()вокруг цикла). - Поэкспериментировать с числом партиций (1 → 3 → 6) и сравнить время.
Артефакты и файлы: JFrog Artifactory, файловая система, внешние API, выгрузки Power BI
JFrog Artifactory
- Зачем нужен репозиторий артефактов?
Представьте склад готовых деталей: вместо того, чтобы каждый раз собирать программу заново, вы храните готовые «библиотеки» (JAR, Python‑whl), контейнеры (Docker‑образы) и скрипты в одном месте. - Типы артефактов
- JAR / WAR – библиотеки Java
- Python‑wheel (.whl)
- Docker‑образы
- SQL‑скрипты и другие сборки
- Организация репозиториев
- Локальный – хранит ваши собственные сборки
- Удалённый – «проксирует» внешние репозитории (Maven Central, PyPI)
- Виртуальный – объединяет локальный и удалённый в один «витринный» URL
- Управление версиями и чистка
- Каждая сборка хранится с номером версии:
my-lib-1.0.0.whl,my-lib-1.0.1.whl. - Retention policy и Garbage collection автоматически удаляют старые неиспользуемые артефакты.
- Каждая сборка хранится с номером версии:
Файловая система
- Когда нужен NFS/SMB
– Если несколько машин должны одновременно читать/писать файлы так же, как на локальном диске. - Object Storage как «файловый» источник
– S3‑совместимый «шкаф» тоже можно «прикрутить» как диск, но там нет POSIX‑блокировок. - Паттерны доступа
- Монтирование NFS: все видят одинаковые пути
/mnt/data - SMB на Windows: сетевые папки
\\server\share
- Монтирование NFS: все видят одинаковые пути
- Отказоустойчивость и бэкапы
– Для NFS ставят RAID, репликацию и регулярные резервные копии на другой сервер.
Внешние API
- Типы API‑источников
- REST (URL + методы GET/POST)
- GraphQL (запросы описывают, какие поля нужны)
- Аутентификация
- API‑key – простой ключ в заголовке или параметре
- OAuth – токены с ограниченным сроком жизни
- Пагинация и rate‑limit
- Большие списки «на порции»:
?page=2&per_page=100 - Ограничение запросов в минуту / час: не больше 1000 вызовов в минуту
- Большие списки «на порции»:
- Форматы ответа
- Чаще всего JSON, иногда XML
- Сериализация/десериализация
- Библиотеки на Python:
requestsили встроенныйhttp.client - Преобразование JSON в словарь:
data = response.json()
- Библиотеки на Python:
Выгрузки Power BI
- Как получить данные из отчёта
- В веб‑интерфейсе Power BI: Export → CSV или XLSX
- Через Power BI REST API: настроить сервис-приложение, получить токен и вызвать эндпоинт
/reports/{reportId}/export
- Расписание и автоматизация
- В самом Power BI можно запланировать обновление и выгрузку
- Через PowerShell или Python можно по расписанию скачивать готовые файлы
- Ограничения
- Максимальный объём одного экспортируемого файла (~100 MB)
- Формат CSV не всегда сохраняет формат дат и чисел — проверяйте локальные настройки
Задание 1. Работа с Artifactory (CLI)
- Войти:
jfrog rt config # ввести URL вашего Artifactory, имя пользователя и API‑ключ - Залить Python‑whl:
jfrog rt u my_package-0.1-py3-none-any.whl my-python-local/ - Залить Docker‑образ:
docker tag my-app:latest your.artifactory.local/my-docker-local/my-app:latest docker push your.artifactory.local/my-docker-local/my-app:latest - Скачать и проверить:
jfrog rt dl my-python-local/my_package-0.1-py3-none-any.whl . sha256sum my_package-0.1-py3-none-any.whl # сверить с исходным
Задание 2. Чтение из внешнего API (Python)
- Установить:
pip install requests pandas - Скрипт
fetch_api.py:import requests, pandas as pd url = 'https://api.example.com/data' headers = {'Authorization': 'Bearer YOUR_API_KEY'} params = {'page': 1, 'per_page': 10} resp = requests.get(url, headers=headers, params=params) data = resp.json() # список словарей df = pd.DataFrame(data[:10]) # первые 10 записей df.to_csv('api_data.csv', index=False) print("Сохранено 10 строк в api_data.csv")
Задание 3. Загрузка выгрузки Power BI
- Скачать вручную из Power BI Service:
- Открыть отчёт → File → Export → CSV
- PowerShell‑скрипт (Windows):
# Установить Power BI cmdlets Install-Module -Name MicrosoftPowerBIMgmt.Profile Connect-PowerBIServiceAccount Export-PowerBIReport -Scope Organization -Id REPORT_ID -OutFile report.csv - Открыть в Excel/Sheets и выполнить:
- Подсчитать общее число строк (стрелка вниз или COUNTA).
- Среднее по полю (FORMULA:
=AVERAGE(B2:B101)).
Задание 4. Файловая система
- Смонтировать NFS (Linux):
sudo mkdir /mnt/data sudo mount -t nfs server:/export/data /mnt/data - Скопировать JSON:
cp ~/local_jsons/*.json /mnt/data/ - Проверить:
ls /mnt/data | wc -l # убедиться, что число файлов совпадает с локальным
Ingest данных
Пакетная обработка: Apache Spark (YARN/Kubernetes)
Что такое пакетная обработка и зачем Spark?
- Batch vs Stream
– Batch (пакетная): собираем большой набор сырых данных за период (например, логи за сутки) и обрабатываем их разом.
– Stream (потоковая): обрабатываем каждое событие сразу, по мере поступления. - Почему Spark?
– Многопоточные вычисления на сотнях машин: как фабрика, где каждый работник делает часть работы.
– Ленивые трансформации: вы описываете, что хотите сделать, но реальная работа начинается только при «действительном запуске» (action).
– API DataFrame/RDD:- RDD (Resilient Distributed Dataset) – «цепочки данных», которые восстанавливаются при сбое.
- DataFrame – таблица с именованными колонками, как Excel на кластере.
– Catalyst и Tungsten: внутренние оптимизаторы, которые переписывают ваш код и упаковывают данные для максимальной скорости.
Аналогия: вы составляете рецепт (последовательность шагов обработки), но кухня (Spark) ждёт, когда вы соберёте весь рецепт, и готовит партию блюд одним заходом, оптимизируя маршруты поваров и использование духовок.
Режимы запуска
a) YARN (Hadoop)
- Интеграция: Spark работает как приложение в экосистеме Hadoop.
- Очереди (queues): как разные залы в ресторане — заказы (job’ы) могут ждать своей очереди.
- Scheduler (Capacity/Fair) распределяет ресурсы (ядра и память) между разными командами.
b) Kubernetes
- Spark Operator: «электронный менеджер», который запускает нужное количество «поваров» (executors) в виде Pod’ов.
- Dynamic allocation: Spark сам просит у Kubernetes больше или меньше ресурсов в зависимости от нагрузки.
- Sidecars: дополнительные «помощники» в Pod‑ах для логирования и мониторинга.
Пример:
- На YARN вы сдаёте «заказ» кухне Hadoop;
- На Kubernetes ваша «кухня» разворачивается в изолированных контейнерах прямо в облаке.
Архитектура кластера
- Driver: главный процесс, который читает ваш код и распределяет задачи.
- Executors: рабочие процессы на узлах, которые выполняют конкретные операции (map, filter, reduce).
- Fault‑tolerance: если один executor упал, Driver заново отправит этот кусок работы другому — никакой строки не потеряется.
Настройка ресурсов:
--executor-cores 2 # сколько «поваров» одновременно работает в одном executor --executor-memory 4G # сколько «ингредиентов» (памяти) выделено на executor
Чтение и запись данных
- Коннекторы
- S3/HDFS: читаем файлы прямо из облака или распределённого хранилища.
- JDBC: берём данные из любой реляционной БД.
- Kafka: используем
spark.readStream/writeStreamдля получения/отправки сообщений.
- Форматы
- Parquet/ORC: колоночные форматы с метаданными, очень эффективны для аналитики.
- Iceberg/Hudi: «лейк‑сторы» с версионностью и возможностью UPDATE/DELETE.
- Шардирование
df.write.partitionBy("date")— разбиваем большие файлы по датам, чтобы одновременные запросы не читали лишнее.
- Сжатие
- При записи указываем
compression="snappy"или"gzip"для экономии диска и сети.
- При записи указываем
Оптимизация и отладка
- Spark UI: веб‑интерфейс по адресу
http://driver:4040показывает, сколько времени заняли стадии и задачи. - Настройки шифла:
--conf spark.sql.shuffle.partitions=200– контролирует число файлов после shuffle;
- Join‑стратегии:
- Broadcast‑join: маленькую таблицу раздаём всем executors (быстро, без shuffle).
- Shuffle‑join: оба набора данных перегоняются по сети и пересортировываются (медленнее, но подходит для больших таблиц).
- Кеширование:
df.cache()– хранит DataFrame в памяти между несколькими действиями, как охлаждённая заготовка в холодильнике.
- Устный опрос (5 мин)
- Что делает Driver?
- Чем RDD отличается от DataFrame?
- Что такое ленивые трансформации?
- Мини‑практика
Обязательно: локальный пробный запуск помогает «ощутить» распределённые вычисления.
Задание 1. Локальный Spark Quickstart
- Запустить Docker:
docker run -d --name spark-master -p 7077:7077 -p 8080:8080 bitnami/spark:latest - Открыть spark-shell:
docker exec -it spark-master spark-shell --master spark://localhost:7077
Задание 2. Простой PySpark‑скрипт
- Создать файл
clicks_stats.py:from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("ClicksCount") \ .master("local[*]") \ .getOrCreate() # Считаем CSV (можно заменить на локальный путь или s3a://…) df = spark.read.csv("data/clicks.csv", header=True, inferSchema=True) total = df.count() unique_users = df.select("user_id").distinct().count() print(f"Всего записей: {total}, уникальных пользователей: {unique_users}") # Сохраняем результат в Parquet stats_df = spark.createDataFrame( [(total, unique_users)], ["total_clicks", "unique_users"] ) stats_df.write.mode("overwrite").parquet("output/stats.parquet") spark.stop() - Запустить:
spark-submit --master local[*] clicks_stats.py
Задание 3. Запуск на YARN или Kubernetes (опционально)
- YARN:
spark-submit --master yarn --deploy-mode cluster clicks_stats.py– зайдите в YARN UI (обычно
http://<rm-host>:8088) и найдите ваш job. - Kubernetes:
spark-submit \ --master k8s://https://<k8s-api>:6443 \ --deploy-mode cluster \ --conf spark.kubernetes.container.image=bitnami/spark:latest \ clicks_stats.py– посмотрите логи в
kubectl get pods.
Задание 4. Тюнинг‑челлендж
- Изменить параметры:
spark-submit \ --master local[*] \ --conf spark.sql.shuffle.partitions=10 \ --conf spark.executor.cores=2 \ clicks_stats.py - Замерить время выполнения (до и после).
- Определить, какая конфигурация быстрее на ваших данных, и пояснить, почему.
Потоковая обработка: Spark Streaming
Что такое потоковая обработка и чем она отличается от пакетной?
- Batch (пакетная): собираем «кучу» записей (например, логи за сутки) и обрабатываем всех сразу.
- Streaming (потоковая): обрабатываем события по мере их поступления, чуть задерживая их в «микропакетах» (micro‑batches) или «непрерывно» (Continuous Processing).
- Плюс низкой задержки: результаты появляются почти сразу, как в реальном времени.
Основы Structured Streaming
- Spark Session → StreamingQuery
- Вы создаёте
SparkSession. - Определяете источник (
readStream) и приёмник (writeStream). - Запускаете
query.start()и он работает «до кнопки стоп».
- Вы создаёте
- Event time vs Processing time
- Processing time — когда Spark получил событие.
- Event time — когда событие реально произошло (по полю
timestamp).
- Watermarks
Позволяют игнорировать очень «запоздавшие» события: вы говорите «ждать до 1 минуды» (максимальный лаг), и более старые события не портят результаты. - Triggers и Checkpointing
- Trigger задаёт интервал между микробатчами, например
Trigger.ProcessingTime("30 seconds"). - Checkpoint сохраняет состояние (offset’ы, агрегаты) в папку, чтобы после сбоя возобновить работу без дубликатов.
- Trigger задаёт интервал между микробатчами, например
- Гарантии доставки
Spark Structured Streaming может обеспечить exactly‑once, если правильно настроить источник/приёмник и чекпойнты.
- Устный опрос (5 мин):
- Что такое микробатч?
- Чем event time отличается от processing time?
- Зачем нужны watermarks?
- Как Spark помнит, что уже обработал (checkpoint)?
Задание 1. File‑stream Quickstart
- Создайте папку в вашем проекте:
mkdir input - Запустите скрипт
file_stream.py:from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("FileStreamDemo") \ .master("local[*]") \ .getOrCreate() df = spark.readStream \ .option("header", "true") \ .csv("input/") # читает новые CSV-файлы query = df.writeStream \ .format("console") \ .option("truncate", False) \ .start() query.awaitTermination() - В другом терминале скопируйте туда тестовый CSV:
cp sample1.csv input/ sleep 5 cp sample2.csv input/ - Увидьте в консоли, как Spark немедленно выводит строки из каждого нового файла.
Задание 2. Kafka‑stream Starter
- Поднимите Kafka (Docker‑Compose или локально).
- Producer отправляет:
echo -e "user1,click\nuser2,view" | \ kafka-console-producer --topic events --bootstrap-server localhost:9092 - Скрипт
kafka_stream.py:from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("KafkaStreamDemo") \ .master("local[*]") \ .getOrCreate() df = spark.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "events") \ .load() # предполагаем, что value — строка "user,event" parsed = df.selectExpr("CAST(value AS STRING) as v") \ .selectExpr("split(v,',')[0] as user", "split(v,',')[1] as evt") counts = parsed.groupBy("evt").count() query = counts.writeStream \ .outputMode("complete") \ .format("console") \ .start() query.awaitTermination() - Запустите скрипт и наблюдайте, как меняются счётчики для
clickиview.
Задание 3. Windowed‑aggregation
- Расширьте
kafka_stream.py, добавив имитацию поля времени:from pyspark.sql.functions import current_timestamp, window # после parsed: withTime = parsed.withColumn("ts", current_timestamp()) windowed = withTime \ .withWatermark("ts", "1 minute") \ .groupBy(window("ts", "1 minute", "30 seconds"), "evt") \ .count() - Запустите и отправляйте новые сообщения в топик, наблюдая, как каждые 30 секунд Spark выводит число событий за последнюю минуту.
Задание 4. Checkpoint и exactly‑once
- Добавьте в
writeStreamопцию:.option("checkpointLocation", "chkpoint/") - Запустите скрипт, отправьте несколько сообщений, остановите (Ctrl+C) и перезапустите.
- Убедитесь, что при перезапуске Spark не выводит уже обработанные сообщения повторно.
Ad‑hoc SQL: Trino (распределённый SQL‑движок)
Что такое Trino и зачем он нужен?
- Trino — это сервис, который позволяет писать обычный SQL и получать данные сразу из разных хранилищ (S3‑хранилище, базы, Elasticsearch и т. д.).
- Почему он полезен:
- Не нужно копировать всю информацию в одну базу — Trino подаёт вам данные на лету.
- Вы можете делать быстрые запросы по большим таблицам без разворачивания тяжёлых кластеров.
Как он устроен?
- Coordinator («координатор»)
– Принимает ваш SQL, разбивает его на маленькие задачи и раздаёт их рабочим. - Workers («рабочие узлы»)
– Выполняют часть работы: читают файлы или таблицы и пересылают промежуточные результаты обратно. - Shuffle / pipelines
– Если нужно соединить данные из разных workers, они обмениваются кусочками «по сети» и продолжают счёт.
Каталоги и коннекторы
- Каталог — файл настроек, в котором прописано, где и как лежат данные.
Примеры:- Hive — для таблиц на S3 с метаданными в Glue или Hive Metastore.
- Iceberg/Hudi — версии таблиц и быстрые обновления.
- JDBC — подключение к обычным базам (PostgreSQL, MySQL).
- Kafka/Elastic/MongoDB — другие хранилища подключаем «на лету».
Возможности SQL в Trino
- Полный ANSI SQL, плюс:
- Оконные функции:
ROW_NUMBER(),SUM() OVER (PARTITION BY …) - ROLLUP, GROUPING SETS для расширенных отчётов
- Массивы (ARRAY) и структуры (ROW) для сложных данных
- Оконные функции:
Оптимизация запросов
- Pushdown
– Trino старается отправить фильтр и выборку колонок как можно ближе к данным, чтобы меньше передавать по сети. - Партиционирование
– Если таблица разбита по дате или другому полю, Trino читает только нужные папки. - Параметры
query.max-memory— сколько памяти может занять запрос.spill-to-disk— если памяти не хватает, Trino сбрасывает часть данных на диск.task.concurrency— сколько параллельных потоков у каждого worker.
Безопасность и доступ
- Аутентификация: LDAP, Kerberos или JWT — вы доказываете, кто вы, и Trino пускает вас дальше.
- RBAC и ACL
– Можно дать права «только читать таблицы в каталоге analytics» или «писать только в каталог staging».
Когда использовать Trino?
- Ad‑hoc‑аналитика
– Если нужно быстро проверить гипотезу «сколько товаров продали в марте», без настройки ETL. - Federated queries
– Соединить данные из S3 (логи кликов) и из базы заказов одним запросом.
- Устный опрос (5 мин)
- «Кто расписывает план выполнения SQL‑запроса?»
- «Для чего нужны коннекторы?»
- «Что такое pushdown?»
- Мини‑практика
Непременно запускаем Trino, пробуем свои SQL и смотрим результаты «живьём».
Задание 1. Запуск Trino в Docker
- Старт:
docker run -d --name trino -p 8080:8080 trinodb/trino - Подключение через CLI:
docker exec -it trino trino \ --server localhost:8080 \ --catalog tpch \ --schema tiny
Задание 2. Простые аналитические запросы
В CLI или в UI Trino выполните:
- Посчитайте число стран:
SELECT nationkey, COUNT(*) FROM tpch.tiny.nation GROUP BY nationkey; - Вычислите выручку по датам:
SELECT l_shipdate AS ship_date, SUM(l_extendedprice * (1 - l_discount)) AS revenue FROM tpch.tiny.lineitem WHERE l_shipdate >= DATE '1995-01-01' GROUP BY ship_date ORDER BY ship_date;
Задание 3. Федерация источников
- Настройте Hive‑каталог над папкой с локальными CSV или Parquet.
- Создайте файл
catalog/hive.properties:connector.name=hive-hadoop2 hive.metastore.uri=thrift://<ваш-metastore>:9083
- Создайте файл
- Выполните JOIN:
SELECT n.name, c.value FROM tpch.tiny.nation n JOIN hive.default.local_table c ON n.nationkey = c.nationkey LIMIT 10;
Задание 4. Изучение производительности
- Сравните один и тот же запрос на
tpch.tiny(малый объём) и наtpch.sf1(scale-factor 1):EXPLAIN (TYPE DISTRIBUTED) SELECT COUNT(*) FROM tpch.sf1.lineitem WHERE l_shipdate >= DATE '1995-01-01'; - Добавьте фильтр по статусу и снова выполните
EXPLAIN:EXPLAIN (TYPE DISTRIBUTED) SELECT COUNT(*) FROM tpch.sf1.lineitem WHERE l_shipdate >= DATE '1995-01-01' AND l_returnflag = 'R'; - Сравните планы и отметьте, как pushdown сокращает обмен данными.
Хранилище (озеро данных)
Общая инфраструктура: Yandex Object Storage (или S3)
Роль Object Storage в Data Lake
- Почему S3‑совместимое?
– Большие объёмы «сырых» данных (логи, экспорт из БД) удобно держать не в базе, а в «облачном шкафу», который масштабируется от гигабайт до петабайт.
– Простой HTTP‑интерфейс (S3 API) понятен многим инструментам: Spark, Trino, Airflow. - Характеристики
– Масштабируемость: автоматически растёт по мере добавления данных.
– Долговечность: данные хранятся в нескольких копиях внутри региона.
– Консистентность: после успешной записи вы гарантированно увидите свежий объект.
Архитектурные принципы
- Flat‑namespace vs псевдодиректории
– На самом деле у бакета нет папок — есть ключиlogs/2025/04/17.csv. Мы просто используем слеши/для группировки. - Префиксы
– Чтобы распределить запросы равномерно по «шкафам», ключи строят с учётом хэша или разбивки по дате:logs/2025/04/17/events.csv backup/2025/04/daily.parquet - Regions и AZ
– Данные реплицируются внутри зоны доступности (AZ).
– Для повышенной надёжности можно настроить кросс‑региональную репликацию.
Управление данными и стоимость
- Классы хранения
- Standard – горячие данные (часто читаются/пишутся).
- Infrequent Access – редко читаются (например, старые логи).
- Archive – почти не читаются, только для долгосрочных бэкапов.
- Модель оплаты
- Хранение – платите за объём GB⋅мес.
- Запросы – за количество операций PUT/GET.
- Исходящий трафик – за каждый гигабайт, вышедший из региона.
Подключение и доступ
- S3‑API и CLI/SDK
# Yandex: yc storage bucket create --name my-bucket # AWS: aws s3 mb s3://my-bucket - Права
– IAM‑роли и политики на уровне бакета/ключа: кто может читать, кто писать. - Шифрование
– SSE‑S3 (ключ провайдера) или SSE‑KMS (ваш ключ в KMS).
– Client‑side: шифруете данные до отправки.
Надёжность и отказоустойчивость
- SLA
– Гарантия доступности (обычно 99.9 %). - Репликация
– Внутри региона (несколько копий), опционально между регионами. - Versioning и MFA Delete
– Включаете версионирование, и при удалении старые версии сохраняются.
– MFA Delete требует двухфакторную авторизацию при удалении версий.
Устный опрос vs практика
- Устный опрос
– Проверяем ключевые термины: «что такое префикс?», «какие есть классы хранения?», «зачем versioning?». - Мини‑практика
– Обязательно: только при личном опыте работы через CLI/SDK вы почувствуете разницу между классами и научитесь быстро управлять бакетом.
Задание 1. Создание и настройка бакета
- Через CLI
# Yandex Cloud yc storage bucket create --name my-data-lake \ --default-storage-class standard # AWS S3 aws s3 mb s3://my-data-lake - Включить версионирование
# Yandex Cloud yc storage bucket versioning enable --name my-data-lake # AWS aws s3api put-bucket-versioning \ --bucket my-data-lake \ --versioning-configuration Status=Enabled - Добавить lifecycle‑правило для перевода старых объектов в архив:
# Пример для AWS: aws s3api put-bucket-lifecycle-configuration \ --bucket my-data-lake \ --lifecycle-configuration '{ "Rules":[ { "ID":"ArchiveOld", "Filter": {"Prefix":""}, "Status":"Enabled", "Transitions":[ {"Days":30, "StorageClass":"STANDARD_IA"}, {"Days":365, "StorageClass":"GLACIER"} ] } ] }'
Задание 2. Загрузка и чтение данных
- Загрузить небольшой файл:
aws s3 cp sample.csv s3://my-data-lake/data/sample.csv - Скачать и проверить checksum:
aws s3 cp s3://my-data-lake/data/sample.csv ./downloaded.csv sha256sum sample.csv downloaded.csv
Задание 3. Эксперимент с классами хранения
- Перевести объект в Infrequent Access:
aws s3 cp s3://my-data-lake/data/sample.csv \ s3://my-data-lake/data/sample_ia.csv \ --storage-class STANDARD_IA - Перевести в архив (Glacier или Archive):
aws s3 cp s3://my-data-lake/data/sample.csv \ s3://my-data-lake/data/sample_archive.csv \ --storage-class GLACIER - Замерить время чтения каждого варианта:
time aws s3 cp s3://my-data-lake/data/sample_*.csv ./
Задание 4. Подключение из аналитики
- В Trino/Hive прописать каталог, например в
hive.properties:connector.name=hive-hadoop2 hive.s3.aws-access-key=YOUR_KEY hive.s3.aws-secret-key=YOUR_SECRET hive.s3.bucket=my-data-lake hive.s3.endpoint=s3.your-region.amazonaws.com - Запрос в Trino:
SELECT * FROM hive.default."data/sample.csv" LIMIT 10; - Убедитесь, что Trino читает строки из файла напрямую.
Табличные форматы: Apache Hudi, Delta Lake, Apache Iceberg
Зачем нужны табличные форматы в Data Lake?
Когда мы просто складываем файлы (CSV, Parquet) в «облачный шкаф» (S3, YCS), каждый раз при обновлении данных приходится перезаписывать целые файлы. Табличные форматы решают следующие задачи:
- ACID‑транзакции
– Все или ничего: при записи данных вы либо полностью сохраните новый вариант таблицы, либо оставите старый без «половинчатых» изменений (как банковские переводы). - Time‑travel (возврат в прошлое)
– Можно запросить, как выглядели данные «на вчерашний день» или в любой момент в прошлом. - Схемовая эволюция
– Добавлять или убирать колонки без ручного переписывания всех файлов. - Компакты
– Сливать мелкие файлы в крупные для скорости чтения.
Apache Hudi
- Merge On Read (MOR) vs Copy On Write (COW)
- COW: при каждом обновлении перезаписываем файлы Parquet целиком.
- MOR: изменения сначала пишем в «журнал» (лог), а затем периодически «сливаем» их с основными файлами.
- Инкрементальные upsert‑операции
– Добавляете новые строки и обновляете старые без переработки всей таблицы. - Встроенный индекс
– Hudi хранит метаданные о местоположении строк, чтобы быстро находить, что нужно обновить. - Кластеризация (Clustering)
– Позволяет группировать похожие по ключу записи в одни файлы для ускорения запросов.
Delta Lake
- Транзакционный log‑файл
_delta_log
– Каждое изменение (insert/update/delete) фиксируется в отдельном JSON‑журнале, а данные хранятся в Parquet. - Schema enforcement vs schema evolution
- Enforcement: не дадим записать данные, которые не соответствуют текущей таблице.
- Evolution: можно безопасно добавить новую колонку.
- OPTIMIZE / VACUUM
– OPTIMIZE объединяет мелкие файлы в крупные «сгустки».
– VACUUM удаляет устаревшие файлы после того, как их версии вам больше не нужны.
Apache Iceberg
- Manifest‑ и метафайлы (snapshot‑based)
– Iceberg хранит списки файлов (manifests) и снимки (snapshots), что упрощает управление версиями. - Hidden partitioning
– Вы можете задать правила разбиения данных без «захардкоженных» директорий. - Плавающие типы колонок
– Возможность изменять типы колонок (например,int→long) без переезда таблицы.
Каталог метаданных
Чтобы Spark, Trino или другие движки «видели» ваши таблицы, нужно хранить информацию о них в Hive Metastore или AWS Glue Catalog. Каталог содержит:
- Имя таблицы и её расположение в S3/YCS
- Схему (список колонок и их типы)
- Партитции (если есть)
Сравнение и случаи применения
| Формат | Обновления | Time‑travel | Интеграция | Когда выбирать |
|---|---|---|---|---|
| Hudi | MOR/COW | Да | Spark, Flink, Hive, Trino | Частые инкременты и быстрые upsert‑задачи |
| Delta | COW + журнал | Да | Databricks, Spark, Trino | Активное редактирование и time‑travel |
| Iceberg | Snapshot‑based | Да | Spark, Trino, Flink, Hive | Большие таблицы, сложные эволюции схем |
Устный опрос vs мини‑практика
- Устный опрос (5 мин)
– Сравнить ключевые характеристики:- Как Hudi понимает, где обновлять строку?
- Чем журнал Delta отличается от манифестов Iceberg?
- Как задать новую колонку без ошибок?
- Мини‑практика
Обязательно – только hands‑on покажет, как делаются upsert, compaction и time‑travel на самом деле.
Задание 1. Delta Lake Quickstart
- Локально запустить Spark с поддержкой Delta (загрузите
delta-coreJAR). - Создать Delta‑таблицу:
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("DeltaDemo") \ .config("spark.jars.packages", "io.delta:delta-core_2.12:2.2.0") \ .getOrCreate() data = spark.createDataFrame([(1,"Alice"),(2,"Bob")], ["id","name"]) data.write.format("delta").save("/tmp/delta-table") - Вставить и обновить:
spark.read.format("delta").load("/tmp/delta-table") \ .createOrReplaceTempView("t") spark.sql("INSERT INTO delta.`/tmp/delta-table` VALUES (3,'Carol')") spark.sql("UPDATE delta.`/tmp/delta-table` SET name='Alice Cooper' WHERE id=1") - Time‑travel:
history = spark.sql("DESCRIBE HISTORY delta.`/tmp/delta-table`") history.show() # Предположим, предыдущий version = 0 df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table") df.show()
Задание 2. Apache Hudi Hello‑World
- Spark + Hudi‑коннектор (добавьте
hudi-spark-bundle):from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("HudiDemo") \ .config("spark.jars.packages","org.apache.hudi:hudi-spark3-bundle_2.12:0.12.0") \ .getOrCreate() data = spark.createDataFrame([(1,"x")], ["id","value"]) data.write.format("hudi") \ .option("hoodie.table.name","hudi_table") \ .option("hoodie.datasource.write.recordkey.field","id") \ .option("hoodie.datasource.write.table.name","hudi_table") \ .option("hoodie.datasource.write.operation","insert") \ .save("/tmp/hudi-table") - Upsert:
updated = spark.createDataFrame([(1,"y")], ["id","value"]) updated.write.format("hudi") \ .option("hoodie.datasource.write.operation","upsert") \ ... \ .save("/tmp/hudi-table") - Посмотреть коммиты через Hoodie CLI:
hudi-cli> connect --path /tmp/hudi-table hudi-cli> commits show
Задание 3. Iceberg Simple Table
- Spark + Iceberg (добавьте
iceberg-spark3-runtime):from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("IcebergDemo") \ .config("spark.jars.packages","org.apache.iceberg:iceberg-spark3-runtime:1.2.0") \ .config("spark.sql.catalog.local","org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.local.type","hadoop") \ .config("spark.sql.catalog.local.warehouse","/tmp/iceberg-warehouse") \ .getOrCreate() # Создать таблицу spark.sql("CREATE TABLE local.db.ice_test (id INT, name STRING) USING iceberg") # Вставить данные spark.sql("INSERT INTO local.db.ice_test VALUES (1,'A'), (2,'B')") - Time‑travel:
# Получить snapshot-id snaps = spark.sql("CALL local.system.snapshots('db.ice_test')").show() # Предположим snapshot_id = 1 df_old = spark.read \ .format("iceberg") \ .option("snapshot-id", 1) \ .load("local.db.ice_test") df_old.show()
Задание 4. Сравнительный SELECT
- Для одного набора данных (например, изменение записей за последние 24 ч) выполните аналогичный запрос в каждой таблице:
SELECT COUNT(*) FROM <format>.table WHERE ts >= current_timestamp() - interval '1' day - Сравните
- Время выполнения
- Объём прочитанных данных (см. Spark UI или
df.queryExecution.logicalStats)
- Сделайте вывод, какой формат на ваших данных работает быстрее и почему.
Метаданные: Hive Metastore / AWS Glue Catalog
Зачем нужен единый каталог метаданных?
Представьте библиотеку, где книжки (таблицы) лежат на полках (S3/файловые хранилища), но без картотеки вы не найдёте нужную книгу быстро. Каталог метаданных — это как картотека: в ней хранится информация, где лежит каждая таблица, какие у неё колонки, как она разбита на части (партиции). Spark, Trino, Athena и другие инструменты читают эту картотеку, а не «гадывают» по именам файлов.
Hive Metastore
- Как устроен
– Сервис на Thrift, который отвечает на запросы «где эта таблица?» и «какая у неё схема?»
– Сам Metastore хранит данные в реляционной базе (MySQL или Postgres). - Основные объекты
- Database: логическая группа таблиц (как папка).
- Table: имя, список колонок и типы.
- Partition: подтаблицы по ключам (например, дата).
- StorageDescriptor: где лежат файлы, в каком формате, какая сериализация (SerDe).
- Регистрация таблиц
– Internal: данные лежат в стандартном каталоге Metastore.
– External: вы сами указываете путь в S3/на диск — Metastore только хранит мета‑информацию.
AWS Glue Catalog
- Serverless‑альтернатива
– Вместо своего Metastore вы используете управляемый AWS‑сервис — не нужно ставить и обновлять свой сервер. - Интеграция
– Athena, Redshift Spectrum, EMR, Glue ETL автоматически читают и пишут в Glue Catalog. - Glue Crawlers
– Специальные «ползунки», которые ходят по вашему S3‑бакету, находят файлы CSV/Parquet и сами создают таблицы в каталоге.
Управление схемами и эволюция
- Добавление колонок
– Можно дописать новую колонку без изменения старых файлов — инструменты будут её «видеть». - Удаление или изменение типа
– Требует осторожности: некоторые движки не поддерживают «сужение» типа (например, изstringвint).
Безопасность и права доступа
- IAM‑права
– Кто может читать каталог, кто — писать новые таблицы. - Lake Formation (AWS)
– Дополнительный уровень контроля: кто из пользователей может видеть определённые строки или колонки.
Надёжность и производительность
- SLA
– У Glue Catalog высокая доступность, вам не нужно «держать» свой Metastore. - Кэширование
– Некоторые движки (Spark, Trino) могут кэшировать мета‑информацию, чтобы не спрашивать каждый раз каталог.
Устный опрос и мини‑практика
Комбинируем оба подхода: сначала короткий устный опрос, чтобы проверить основные понятия, а затем практические упражнения.
- Устный опрос (5 мин)
- Где хранится информация о физическом расположении файлов таблицы?
- В чём отличие Hive Metastore от Glue Catalog?
- Как добавить новую колонку без потери старых данных?
- Мини‑практика
Обязательно: только на реальных примерах вы почувствуете, как движки находят и читают таблицы через каталог.
Задание 1. Hive Metastore локально
- Запустите Docker‑Metastore (например, с Postgres):
# Запустить Postgres docker run -d --name hive-metastore-db -e POSTGRES_PASSWORD=pass -p 5432:5432 postgres # Запустить простой Hive Metastore docker run -d --name hive-metastore \ -e METASTORE_DB_HOST=host.docker.internal \ -e METASTORE_DB_NAME=postgres \ -e METASTORE_DB_PASSWORD=pass \ -p 9083:9083 \ your-hive-metastore-image - Через Beeline или Spark SQL:
-- Подключитесь к Metastore !connect jdbc:hive2://localhost:9083/default CREATE DATABASE IF NOT EXISTS demo; CREATE EXTERNAL TABLE demo.events ( id STRING, event_time TIMESTAMP, payload STRING ) STORED AS PARQUET LOCATION 's3a://your-bucket/demo/events/'; SHOW TABLES IN demo;
Задание 2. AWS Glue Catalog
- Создайте базу
demo_glueчерез AWS CLI:aws glue create-database --database-input '{"Name":"demo_glue"}' - Запустите Glue Crawler на S3‑префиксе
s3://your-bucket/demo/events/:aws glue create-crawler \ --name demo-crawler \ --role AWSGlueServiceRole \ --database-name demo_glue \ --targets '{"S3Targets":[{"Path":"s3://your-bucket/demo/events/"}]}' aws glue start-crawler --name demo-crawler - Проверьте в AWS Console → Glue → Databases → demo_glue → Tables.
Задание 3. Чтение через Trino и Spark
- В Trino в
catalog/hive.propertiesилиglue.propertiesпропишите доступ к вашему Metastore/Glue:connector.name=hive-hadoop2 hive.metastore.uri=thrift://<hive-host>:9083 # или для Glue: connector.name=glue aws.region=us-east-1 - Запрос в Trino:
SELECT * FROM demo.events LIMIT 10; - В Spark:
df = spark.table("demo.events") df.show(10)
Задание 4. Эволюция схемы
- Добавьте колонку:
ALTER TABLE demo.events ADD COLUMNS (user_id STRING); - Запишите пару новых строк с этим полем:
INSERT INTO demo.events VALUES ('evt1', CURRENT_TIMESTAMP, 'data', 'user123'); - Убедитесь, что обе колонки (
payload,user_id) отображаются:SELECT id, payload, user_id FROM demo.events LIMIT 5;
Вычисления и Ad-Hoc
Apache Spark: архитектура кластеров в Yandex Cloud, оптимизация job’ов (Tungsten, Catalyst, tuning)
Архитектура Spark‑кластера в Yandex Cloud
- Data Proc vs свой Spark на Kubernetes
- Yandex Data Proc: готовый «облачный сервис» — вы указываете размер кластера, а всё остальное (установка Spark, мониторинг, авто‑скейлинг) берёт на себя Yandex.
- Spark на Kubernetes: вы разворачиваете Spark‑Operator в своём кластере K8s, сами управляете подами, конфигурацией и ресурсами.
- Роли узлов
- Мастер (Driver) — главный процесс, который распределяет задачи.
- Воркеры (Executors) — рабочие «повара», которые обрабатывают кусочки данных.
- При HA‑настройке могут использовать Zookeeper для согласования, но в Data Proc это уже готово «из коробки».
- Интеграция с облаком
- Object Storage (
s3a://илиstorage.yandexcloud.net) для чтения/записи данных. - IAM‑роли: Spark‑узлы получают временные токены и могут обращаться к бакетам без передачи ключей.
- VPC: весь трафик между узлами и хранилищем остаётся в приватной сети.
- Object Storage (
Механизмы оптимизации внутри Spark
- Catalyst Optimizer
- Логический план: Spark смотрит, что вы написали в SQL/DataFrame.
- Применяет правила (rule‑based) — объединяет фильтры, убирает лишние шаги.
- Cost‑based оптимизации — выбирает лучший способ выполнить join или агрегацию, опираясь на статистику.
- Tungsten Execution Engine
- Off‑heap память: хранит данные вне «кучи» Java, чтобы сборщик мусора летал реже.
- Код‑генерация (whole‑stage codegen): Spark сам «печатает» Java‑байт‑код, который очень быстро работает.
- Cache‑aware структуры: данные упаковываются так, чтобы быстро загружаться в кэш процессора и экономить память.
Основные параметры тюнинга
spark.sql.shuffle.partitions— сколько кусочков (partitions) создаётся при shuffle (пересортировке).spark.executor.memoryиspark.executor.cores— сколько «пищи» (памяти) и сколько «поваров» (ядер) на каждого executor.spark.driver.memory— сколько памяти у главного процесса.- Сериализация:
- Java (по‑умолчанию) — медленнее.
- Kryo — быстрее и компактнее, рекомендуется для больших объёмов.
- Join‑стратегии:
- Broadcast — маленькую таблицу раздаём всем executors (быстро).
- Shuffle‑join — пересортировка больших таблиц, но иногда неизбежна.
- Dynamic Allocation и checkpointing — сами регулируют число executors и защищают от потерь состояния.
Мониторинг и отладка
- Spark UI
- Stages: этапы вычисления, видно, какие tasks дольше всего.
- Storage: какие RDD/DataFrame закешированы и сколько памяти занимают.
- SQL: план выполнения SQL‑запросов.
- Environment: все настройки вашего job’а.
- Prometheus/Grafana
- Собирают метрики: нагрузку CPU, задержку, число spill’ов на диск.
- Типичные проблемы
- Skew: когда один executor получает слишком много данных и работает медленно.
- Spill to disk: когда не хватает памяти, данные сливаются на диск, и job тормозит.
Устный опрос vs мини‑практика
- Устный опрос (5 мин)
- Что делает Driver?
- Чем Catalyst отличается от Tungsten?
- Зачем уменьшать
shuffle.partitions?
- Мини‑практика
Обязательно: только запустив job и изменив параметры, вы почувствуете разницу в скорости и ресурсоёмкости.
Задание 1. Поднятие кластера Data Proc
- Через UI Yandex Cloud → Data Proc → Create cluster
- 1 мастер + 2 воркера, стандартный образ Spark.
- Проверить в разделе YARN ResourceManager или Spark History Server, что узлы «забиты» и готовы к задачам.
Задание 2. Выполнение простого Spark‑job
- Подготовьте
word_count.py:from pyspark.sql import SparkSession spark = SparkSession.builder.appName("WordCount").getOrCreate() text = spark.read.text("s3a://your-bucket/sample.txt") counts = text.rdd.flatMap(lambda r: r[0].split()) \ .map(lambda w: (w,1)).reduceByKey(lambda a,b: a+b) counts.show() spark.stop() - Запустить:
spark-submit \ --master yarn \ --deploy-mode cluster \ word_count.py - Посмотреть логи и Spark UI — убедиться, что задача выполнилась успешно.
Задание 3. Тюнинг‑челлендж
- Сменить
spark.sql.shuffle.partitions:--conf spark.sql.shuffle.partitions=50 - Переключить сериализацию:
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer - Сравнить
- Время выполнения (см. Spark UI или логи).
- Число spill’ов и загрузку памяти.
Задание 4. Анализ плана
- Откройте Spark UI → SQL Tab → DAG Visualization.
- Найдите шаги shuffle и узкие места (большие стадии, spill).
- Скриншот части плана, где Catalyst применил push‑down фильтра или где видно много shuffle‑tasks.
- Запишите, какие шаги можно оптимизировать (уменьшить partitions, добавить broadcast).
Trino: подключение к разным хранилищам, push‑down, federated queries
Конфигурация каталогов
- Trino хранит настройки соединений в файлах
catalog/*.properties. - Каждый файл описывает один «каталог» — источник данных, например, PostgreSQL или S3.
- Внутри свойства, самые важные:
connector.name=postgresql # тип коннектора connection-url=jdbc:postgresql://… # куда подключаться username=… # кто подключается password=… # пароль или токен - Аналогично для S3/Hive:
connector.name=hive-hadoop2 hive.metastore.uri=thrift://…
Типы коннекторов
- Hive: таблицы на S3/Object Storage + метаданные в Metastore.
- JDBC: подключение к реляционным БД (PostgreSQL, MySQL, Oracle).
- Табличные форматы: Iceberg, Delta, Hudi — хранятся как файлы, но видны как таблицы.
- NoSQL и поиск: Kafka, Elasticsearch — можно читать логи и документы напрямую.
Push‑down фильтров и проекций
- Что уходит в источник
– Если вы пишетеWHERE id < 10, Trino постарается передать этот фильтр самому источнику (Postgres или S3), чтобы читать меньше данных. - Проекции
– SELECT только нужных колонок (SELECT name, dateвместоSELECT *) тоже «толкается» вниз. - Настройки
predicate-pushdown.enabled=true pushdown-filter-enabled=true
Federated queries (федерация)
- JOIN между разными источниками
– Можно объединить, например, таблицу из Postgres и таблицу из S3 одним запросом:SELECT u.id, f.info FROM postgres.public.users u JOIN hive.default.files f ON u.id = f.user_id; - Согласование типов
– Trino автоматически приведёт, например,INTEGERв Postgres кBIGINTв файле, если потребуется. - Производительность
– Для небольших объёмов federated‑JOIN удобен; если таблицы огромные, лучше сначала перенести данные (ETL) в единое место.
Безопасность и разграничение доступа
- Аутентификация: LDAP, JWT, встроенный файл
password-authenticator. - ACL
– Вaccess-control.propertiesможно запретить или разрешить чтение/запись по каталогам, схемам и таблицам.
Устный опрос vs мини‑практика
Устный опрос (5 мин)
- «Где лежат файлы с настройками коннекторов?»
- «Что такое push‑down и зачем он нужен?»
- «Когда полезны federated‑запросы?»
Мини‑практика
Обязательна: только на практике вы научитесь настраивать *.properties, видеть в EXPLAIN, как Trino отсылает фильтры и как объединяет разные базы.
Задание 1. Запуск Trino + Docker‑Postgres
- Два контейнера:
docker run -d --name postgres -e POSTGRES_PASSWORD=pass -p 5432:5432 postgres:13 docker run -d --name trino -p 8080:8080 trinodb/trino - Создайте файл
catalog/postgres.propertiesрядом с запуском Trino:connector.name=postgresql connection-url=jdbc:postgresql://postgres:5432/postgres username=postgres password=pass - Перезапустите Trino (или добавьте каталог и сделайте
docker restart trino).
Задание 2. Simple push‑down
- В PostgreSQL:
CREATE TABLE public.users (id INT PRIMARY KEY, name TEXT); INSERT INTO public.users VALUES (1,'A'),(2,'B'),…(100,'Z'); - В Trino CLI:
docker exec -it trino trino --server localhost:8080 --catalog postgres --schema public - Выполните:
EXPLAIN (TYPE DISTRIBUTED) SELECT * FROM users WHERE id < 10; - Убедитесь, что в плане видно
Filter[ id < 10 ]внутриTableScan, то есть условие «пушнено» в Postgres.
Задание 3. Federated JOIN
- Подключите Hive‑каталог над локальными CSV:
- Создайте
catalog/hive.properties:connector.name=hive-hadoop2 hive.metastore.uri=thrift://<адрес-metastore>:9083 - В метасторе зарегистрируйте внешний каталог, например
hive.default.meta, с CSV-файлами, где есть колонкаuser_id.
- Создайте
- В Trino:
EXPLAIN (TYPE DISTRIBUTED) SELECT u.id, m.info FROM postgres.public.users u JOIN hive.default.meta m ON u.id = m.user_id; - Замерьте время выполнения
SELECT …, потом добавьте фильтрWHERE u.id < 50и снова замерьте.
Задание 4. Анализ и оптимизация
- Отключите в
postgres.properties:predicate-pushdown.enabled=false - Повторите запрос из Задания 2 и замерьте время.
- Включите снова и сравните: сколько времени с push‑down против без.
- Обсудите, в каком случае federated JOIN оправдан, а когда лучше сначала сделать ETL.
Ad‑hoc‑слой: когда использовать Spark vs Trino
Когда выбирают Spark, а когда Trino?
Характер нагрузки
- Trino идеально подходит для быстрых, «интерактивных» запросов (< 1 сек) по небольшим или средним фрагментам данных (десятки–сотни мегабайт).
- Spark — выбор для тяжёлых джобов, обрабатывающих сотни гига‑ и терабайт: сложные трансформации, ML‑пайплайны, большие компакты.
Язык и удобство
- Trino
– чистый SQL‑интерфейс, знакомый большинству аналитиков.
– нет гибкости кода — только SQL. - Spark
– DataFrame/DSL на Python, Scala или SQL.
– можно писать любую логику: UDF, сложные функции, обращаться к ML‑библиотекам.
Как устроен «движок»
- Trino
– обрабатывает запросы потоком, пушит фильтры и проекции вниз, минимизирует копирование. - Spark
– строит ленивый DAG, оптимизирует его Catalyst‑ом, исполняет через Tungsten‑движок, может кешировать промежуточные данные.
Источники данных
- Trino отлично для federated–JOIN’ов: берет куски из разных источников (S3, базы, Elasticsearch).
- Spark удобнее, когда всё «сырьё» уже лежит в Data Lake (Parquet/Iceberg), и нужно сделать серию сложных шагов ETL.
Стоимость и масштаб
- Trino
– маленький CPU, быстрый старт, платите за короткое время работы.
– невыгоден для джобов на сотни гига из‑за накладных расходов плана. - Spark
– гибко масштабируется, можно использовать spot‑инстансы, чинить параметры памяти/числа ядер.
– на мелких задачах долго запускается и жрёт ресурсы.
Гарантии и устойчивость
- Trino
– каждая задача — самостоятельный запрос, нет перезапуска внутри.
– при сбое нужно перезапустить весь запрос. - Spark
– поддерживает checkpoint и восстановление DAG’ов.
– умеет перезапускать упавшие куски (tasks) и продолжать с точки остановки.
Устный опрос vs мини‑практика
Устный опрос
- «Нужно раз в час посчитать 10 GB новых логов → Spark или Trino?»
- «Срочно посмотреть top‑10 продуктов за вчера — Spark или Trino?»
- Мини‑практика (рекомендуется сочетать оба)
- Покажите на одном наборе данных оба инструмента: измерьте время, почувствуйте удобство и накладные расходы.
Задание 1. Сравнение простого фильтра
- Подготовьте файл
events.csv(~100 MB) с колонкамиuser_id,status,ts. - Spark (pyspark):
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("FilterDemo").getOrCreate() df = spark.read.csv("events.csv", header=True, inferSchema=True) import time; t0 = time.time() cnt = df.filter(df.status == 'OK').count() print("Count:", cnt, "Time:", time.time()-t0) spark.stop() - Trino:
- Создайте внешний каталог
hiveи таблицу над тем жеevents.csv. - В CLI выполните:
SELECT COUNT(*) FROM hive.default.events WHERE status = 'OK'; - Засеките время (Trino UI или shell).
- Создайте внешний каталог
- Сравните времена и обсудите: где был накладной старт, где — быстрая интерактивность.
Задание 2. Агрегация и группировка
- Spark:
stats = df.groupBy("user_id") \ .agg({"value":"avg", "value":"max"}) stats.show() - Trino:
SELECT user_id, AVG(value), MAX(value) FROM hive.default.events GROUP BY user_id; - Сравните удобство: сколько строк кода vs SQL‑запрос, и производительность в секундах.
Задание 3. Принятие решений по сценарию
| Сценарий | Выбор |
|---|---|
| 1. Ежечасный отчёт по 10 GB новых логов | Spark или Trino? |
| 2. Разовый ad‑hoc‑анализ исторических данных в 1 TB | Spark или Trino? |
| 3. Быстрый federated‑JOIN между MySQL и S3 | Spark или Trino? |
Speed‑layer & Presentation
ClickHouse: колонковое хранилище под низколатентные запросы
Зачем ClickHouse в Speed‑layer?
- Колонковое хранение и сжатие
– Данные записаны «столбцами», а не «строками», поэтому при запросе нужных полей читаются лишь соответствующие фрагменты диска.
– Дополнительное сжатие (LZ4) уменьшает объём I/O и ускоряет чтение. - OLAP‑запросы в реальном времени
– Суммы, средние, группировки по миллионам строк выполняются за миллисекунды, идеально для дашбордов.
Архитектура хранения и исполнения
- MergeTree‑движок
– Партиционирование: данные разбиваются по дате (или другому полю), чтобы при запросе читать только нужный диапазон.
– Слияние частей: фоновые процессы объединяют мелкие файлы в большие для эффективности.
– TTL: автоматически удаляет старые данные по расписанию. - Индексы
– Primary key даёт sparse index: ClickHouse знает, на каких файлах искать нужные диапазоны.
– Skip‑indexes (minmax, bloom‑filter) помогают ещё быстрее пропустить ненужные данные. - Векторное исполнение
– Операции над данными выполняются целыми «батчами» значений (векторы), это эффективнее циклов по отдельным строкам.
Кластер и отказоустойчивость
- Шардинг и репликация
– Таблица разбивается на «шарды» (части), каждый имеет копии‑реплики для надёжности.
– ZooKeeper координирует лидеров‑реплик и синхронизацию. - Read/write‑splitting
– Чтение можно направлять на любую реплику, запись — только на лидера. - Placement rules и quorum‑replication
– Гибкие правила, на каких узлах какие данные хранятся, и запись ждёт подтверждения от кворума реплик.
Materialized Views и агрегаты
- Pre‑aggregations
– Можно создать материализованное представление, где данные уже агрегированы по дате/категории и отвечают мгновенно. - CollapsingMergeTree
– Позволяет накапливать события «добавить»/«удалить» и в результате получить конечное состояние.
Инструменты доступа
- clickhouse-client — консольный клиент.
- HTTP‑API — простой REST‐интерфейс, удобно скриптовать.
- JDBC/ODBC — интеграция с BI (Metabase, Grafana, Tableau).
Безопасность и ресурсы
- Пользователи и пароли, network policies — ограничение доступа по ролям и сетям.
- Профили ресурсов
–max_memory_usage,max_threads, квоты на конкретные таблицы или пользователей.
Устный опрос vs мини‑практика
- Устный опрос (5 мин)
– «Что даёт MergeTree‑движок?»
– «Как работают skip‑indexes?»
– «Когда используют materialized view?» - Мини‑практика обязательна: только на практике вы прочувствуете скорость ClickHouse и увидите, как векторы данных обрабатываются мгновенно.
Задание 1. Quickstart на Docker
- Запустить:
docker run -d --name clickhouse-server -p 9000:9000 -p 8123:8123 clickhouse/clickhouse-server docker run -it --rm --link clickhouse-server:clickhouse-server clickhouse/clickhouse-client --host clickhouse-server - Подключиться:
-- В клиенте SELECT version();
Задание 2. Создание и наполнение таблицы
- Создать таблицу:
CREATE TABLE default.events ( event_date Date, user_id UInt64, amount Float64 ) ENGINE = MergeTree() PARTITION BY toYYYYMM(event_date) ORDER BY (event_date, user_id); - Вставить 1000 строк:
INSERT INTO default.events VALUES ('2025-04-01', 1, 12.5), ('2025-04-01', 2, 7.3), …; -- повторите или сгенерируйте
Задание 3. Низколатентный запрос
- Выполнить:
SELECT event_date, sum(amount) AS total FROM default.events WHERE event_date = today() GROUP BY event_date; - Измерить время:
\watch 1 -- обновлять каждую секунду
Задание 4. Materialized View для pre‑aggregate
- Создать MV:
CREATE MATERIALIZED VIEW default.daily_sales ENGINE = SummingMergeTree() PARTITION BY event_date ORDER BY event_date AS SELECT event_date, sum(amount) AS total FROM default.events GROUP BY event_date; - Сравнить:
-- Мгновенный: SELECT * FROM default.daily_sales; -- На лету: SELECT event_date, sum(amount) FROM default.events GROUP BY event_date;
Задание 5. TTL‑политика
- Установить TTL:
ALTER TABLE default.events MODIFY TTL event_date + INTERVAL 7 DAY; - Проверить после 7 дней (или вручную смоделировать дату) удаление старых партиций:
SHOW PARTITIONS FROM default.events;
Архитектура Speed Layer: конвейер в ClickHouse и подходы Lambda vs Kappa
Что такое Speed Layer и зачем он нужен?
Представьте, что в вашем интернет‑магазине каждую секунду происходят события (клики, заказы, статусы). Для отчётов в реальном времени нам нужно не ждать, пока ночью большой пакет данных обработается, а показывать свежие цифры почти сразу — с задержкой в секунды, а не часы. Это и есть задача Speed Layer (“слоя скорости”).
- Low‑latency (низкая задержка) — данные доходят от события до отчёта за секунды–минуты, а не за часы.
Lambda vs Kappa: два способа «договориться» о Speed Layer
Lambda‑архитектура
- Batch Layer (пакетная часть)
– обрабатывает большие объёмы данных (например, логи за прошедший день) с задержкой (раз в час). - Speed Layer (быстрая часть)
– непрерывно получает новые события и пишет их в хранилище. - Serving Layer
– объединяет результаты batch и speed, отдаёт единый отчёт.
Плюс: при сбое Speed Layer есть резерв в Batch‑слое.
Минус: две раздельные обработки — две программы, два набора кода.
Kappa‑архитектура
- Только поток: нет отдельного batch и speed.
- Все данные (и старые, и новые) обрабатываются единым потоком:
- Исторические события “перематываются” через очередь (Kafka) как будто только что пришли.
- Новые события идут тем же путём.
Плюс: один конвейер, один код.
Минус: при очень большом объёме “перемотка” (replay) может занять много времени.
Как данные попадают в ClickHouse
Пакетная загрузка (batch)
- Кто? Airflow или Xenosphere + Spark.
- Что делает? Раз в час (или день) читает файлы (CSV/Parquet) и выполняет в ClickHouse:
INSERT INTO events FORMAT CSV - Когда? Для исторических данных, когда нужно “дозарядить” большие объёмы.
Потоковая загрузка (stream)
- Кто? Spark Streaming, Flink или Debezium CDC.
- Что делает? Принимает каждое событие из Kafka и сразу добавляет или обновляет строку в ClickHouse:
curl 'http://localhost:8123/?query=INSERT%20INTO%20events%20FORMAT%20JSONEachRow' \ --data-binary @event.json - Когда? Для свежих данных, чтобы отчёты сразу отражали изменения.
Объединение batch и stream
- В Lambda мы храним две версии таблицы:
- events_batch (из пакетной загрузки)
- events_stream (из потока)
Затем в Serving Layer объединяем их запросом:
SELECT * FROM events_batch UNION ALL SELECT * FROM events_stream; - В Kappa только events_stream, но мы периодически “перематываем” Kafka‑топик, чтобы заново внести старые данные.
Надёжность: дубликаты и согласованность
- Дубликаты
– При повторном запуске batch‑job или при перезапуске stream конвейера одни и те же события могут записаться несколько раз. - Как бороться
- Deduplication: храним уникальный ключ (например,
event_id) и вставляем upsert вместо insert. - CollapsingMergeTree: движок ClickHouse, который понимает “+” и “–” события и сам убирает дубли.
- Deduplication: храним уникальный ключ (например,
Формат закрепления
- Устный опрос (5 мин)
– «Что делает Speed Layer?»
– «Чем Kappa отличается от Lambda?» - Мини‑практика
– Необходимо: даже простейшие hands‑on покажут, как строятся оба конвейера.
Задание 1. Sketch‑тренинг
На бумаге или доске нарисуйте два потока:
Источник данных ──▶ Batch ETL ──▶ ClickHouse
│
└──▶ Stream ETL ──▶ ClickHouse
Отметьте на схеме, где могут появиться дубликаты и где «опоздавшие» записи.
Задание 2. Batch → ClickHouse
- Подготовьте CSV (~100 MB)
events.csv. - Запишите его в ClickHouse из командной строки:
clickhouse-client --query="INSERT INTO default.events FORMAT CSV" < events.csv - Проверьте:
SELECT count(*) FROM default.events;
Задание 3. Stream → ClickHouse
- Запустите Docker‑Kafka и Spark Streaming (локально или в Data Proc).
- Скрипт
stream_to_ch.py:from pyspark.sql import SparkSession spark = SparkSession.builder.appName("StreamToCH").getOrCreate() df = spark.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers","localhost:9092") \ .option("subscribe","events") \ .load() \ .selectExpr("CAST(value AS STRING) as json") # Предполагаем, что json – строка CSV: "2025-04-17,user1,12.3" query = df.writeStream.foreachBatch(lambda batch, _: batch.write \ .format("jdbc") \ .option("url","jdbc:clickhouse://localhost:8123/default") \ .option("driver","ru.yandex.clickhouse.ClickHouseDriver") \ .option("dbtable","events") \ .mode("append") \ .save() ).start() query.awaitTermination() - Сгенерируйте 100 событий в Kafka:
seq 1 100 | xargs -I{} echo "2025-04-17,user{},5.0" | \ kafka-console-producer --topic events --bootstrap-server localhost:9092 - Проверьте в ClickHouse:
SELECT count(*) FROM default.events WHERE user_id >= 1 AND user_id <= 100;
Задание 4. Lambda vs Kappa анализ
- Lambda
- Запустите batch‑job (задание 2) и stream‑job (задание 3).
- Сгенерируйте дубли (повторите загрузку CSV) и посчитайте, сколько строк появится в таблице.
- Kappa
- Откатьте Kafka‑топик командой:
kafka-consumer-groups --bootstrap-server localhost:9092 \ --group StreamToCH --reset-offsets --to-earliest --execute --topic events - Перезапустите только stream‑job.
- Убедитесь, что ровно те же 100 событий попали в ClickHouse (без batch).
- Откатьте Kafka‑топик командой:
Presentation: схемы таблиц, материализованные представления, агрегаты
Как проектировать схемы таблиц
Star Schema («звезда»)
- Представьте центр (факт) и лучи (справочники).
- Fact-таблица хранит «сырьё» — каждую продажу, клик или событие.
- Dimension-таблицы (справочники) описывают товары, пользователей, даты.
- Почему «звезда»? Все справочники «смотрят» на факт, как лучи на солнце.
- Плюс: просто писать запросы — JOIN факт↔справочник один раз.
- Минус: при очень сложных справочниках может быть много колонок.
Snowflake Schema («снежинка»)
- То же, что «звезда», но справочники сами могут ссылаться друг на друга.
- Например, таблица «товары» содержит
category_id, а отдельный справочник «категории» хранит название категории.
- Например, таблица «товары» содержит
- Плюс: меньше дублирования данных.
- Минус: сложнее JOIN’ить, потому что JOIN’ов больше.
Гранулярность фактов
- По событию (самая мелкая): каждая строка = один клик или одна продажа.
- По дню/часу (более грубая): скатываете все события за час в одну строку.
- По сессии: объединяете все действия одного пользователя за визит.
Materialized Views (материализованные представления)
- Что это?
– Это заранее готовая таблица с результатами тяжёлой агрегации, например ежедневной суммы продаж. - Зачем
– Не каждый раз пересчитывать миллионы строк — просто читаем готовое число. - Отличие от обычного VIEW
- Обычный VIEW: «картинка» поверх фактов, пересчитывается на каждый запрос.
- MATERIALIZED VIEW: данные записаны и обновляются по расписанию или при появлении новых фактов.
- Где
- В СУБД (Postgres, Oracle): требует явного
REFRESH MATERIALIZED VIEW. - В ClickHouse/Trino/BigQuery: могут обновляться автоматически при вставке в исходную таблицу.
- В СУБД (Postgres, Oracle): требует явного
Агрегированные таблицы
- Уровни агрегации
- По дате: сумма продаж в день.
- По часу: почасовая динамика.
- По категории: продажи на разные группы товаров.
- По региону: продажи по городам.
- Баланс
– Чем больше разных уровней агрегации, тем быстрее отчёты, но дольше загрузка и больше места. - Slowly Changing Aggregates
– Инкрементально дополняете только новую порцию данных (например, час за часом), а не всю таблицу.
Как обновлять представления и агрегаты
- Инкрементально (stream/CDC)
– Когда поступает новый заказ, сразу же обновляете нужные MV или агрегатную строку. - Полная перезагрузка (batch)
– Раз в ночь считываете всё с нуля (или за последние сутки) и полностью перезаписываете агрегаты. - Без простоев
– Создаёте новую таблицу/представление с тем же именем на лету или используете swap‑операции (ALTER TABLE … RENAME).
Интеграция с BI
- Как дашборды видят схемы
– В Metabase, DataLens и других инструментах вы указываете базу и схему (например,analytics.fact_sales).
– Далее выбираете таблицы и поля из справочников и фактов. - Рекомендации по названиям
– Факт-таблицы называйтеfact_<имя>, справочникиdim_<имя>.
– MV —mv_<описание>, агрегаты —agg_<уровень>_<что>(например,agg_daily_revenue). - Метаданные
– В описании поля (comment) указывайте единицы измерения, формат даты, возможные значения.
Устный опрос vs мини‑практика
-
- «В чём отличие star schema от snowflake?»
- «Что делает materialized view?»
- «Когда использовать инкрементальную агрегацию?»
- Мини‑практика
Обязательно: только в практике на SQL/DDL убедитесь, как легко или сложно запустить MV и агрегатные таблицы.
Задание 1. Проектирование star schema
- Задача: есть сырые события продаж
-- Факт CREATE TABLE fact_sales ( order_id String, order_date Date, product_key Int, user_key Int, total_amount Decimal(10,2) ); -- Справочник товаров CREATE TABLE dim_product ( product_key Int PRIMARY KEY, product_name String, category String ); -- Справочник пользователей CREATE TABLE dim_user ( user_key Int PRIMARY KEY, user_name String, region String );
Задание 2. Создание materialized view
- В ClickHouse или Trino:
CREATE MATERIALIZED VIEW mv_daily_revenue TO agg_daily_revenue AS SELECT order_date AS dt, sum(total_amount) AS revenue FROM fact_sales GROUP BY dt; - Проверка:
INSERT INTO fact_sales VALUES ('o1','2025-04-18',1,1,100.0); SELECT * FROM agg_daily_revenue WHERE dt = '2025-04-18';
Задание 3. Инкрементальный aggregate table
- Создать таблицу:
CREATE TABLE agg_hourly_sales ( hour DateTime, product_key Int, amount Decimal(10,2) ); - SQL‑скрипт (запуск раз в час):
INSERT INTO agg_hourly_sales SELECT toStartOfHour(order_date) AS hour, product_key, sum(total_amount) AS amount FROM fact_sales WHERE order_date >= yesterday() GROUP BY hour, product_key; - Airflow‑DAG: настройте запуск этого скрипта каждый час.
Задание 4. Сравнение производительности
- Прямой запрос:
SELECT sum(total_amount) FROM fact_sales WHERE order_date = today(); - Через MV:
SELECT revenue FROM agg_daily_revenue WHERE dt = today(); - Замеры:
– Засеките время в SQL‑клиенте или ClickHouse UI для обоих запросов.
– Обсудите, насколько быстрее выступает MV по сравнению с пересчётом «на лету».
BI & Visualization
Yandex DataLens: коннекторы, дашборды, управление доступом
Коннекторы: «трубопровод» данных
DataLens работает как радио‑приёмник: ему нужно «настроиться» на источник сигнала.
- Что подключаем
- ClickHouse, PostgreSQL — ваши быстрые базы
- S3/CSV — файлы в облачном хранилище
- HTTP API — данные, которые приходят по Интернету
- Как настраивать
- Указываете адрес (URL), логин/пароль или ключ, иногда путь к SSL‑сертификату.
- Проверяете, что DataLens может «достучаться» до сервера: в VPC и firewall разрешены нужные порты.
- После подключения автоматом подтягиваются схемы — таблицы и их колонки. При их изменении можно нажать «Обновить схему».
Построение дашбордов: «картинки из данных»
- Виджеты
- Линейные графики (динамика по времени)
- Столбчатые/круговые диаграммы (сравнение категорий)
- Таблицы с подведение итогов
- Карты (если есть геоданные)
- Параметры визуализации
- Фильтры: пусть пользователь выберет нужный товар или регион
- Drill‑down: клик по столбцу разворачивает детальный отчёт
- Drill‑through: ссылка на внешний отчёт (например, в другом дашборде)
- Layout‑дизайн
– Располагаете виджеты на сетке, подгоняете их размер, группируете по смыслу: динамика вверху, таблица снизу.
Пользовательский доступ и безопасность
- Роли
- Viewer — только смотреть
- Editor — может редактировать дашборды
- Admin — полные права
- Шеринг
- По группе или пользователю: даёте доступ только «Analysts»
- Публичная ссылка: всем в интернете можно видеть, без входа
- Row‑level security (RLS)
– Можно настроить, чтобы разные пользователи видели разные строки (например, только свой регион). - Data Masking
– Скрытие части значений (например, последние цифры кредитной карты).
Авто‑обновление и алерты
- Cache TTL
– DataLens кэширует результаты запросов: указываете «обновлять каждые N часов», чтобы не перегружать базу. - Алерты
– Задаёте правило, например: «если суммарные продажи за вчера < X, отправить email/Slack».
– DataLens проверит это условие по расписанию и оповестит нужных людей.
Задание 1. Подключение источника
- Откройте DataLens UI и перейдите в «Источники данных» → «Добавить».
- Выберите ClickHouse (или CSV):
- Если ClickHouse:
- Введите адрес
http://<host>:8123 - Укажите логин/пароль (или оставьте пустым, если нет).
- Введите адрес
- Если CSV:
- Загрузите файл или укажите S3‑путь
s3://bucket/path/data.csv.
- Загрузите файл или укажите S3‑путь
- Если ClickHouse:
- Проверьте соединение и сохраните. В списке таблиц появится ваш источник.
Задание 2. Создание простого дашборда
- Создайте новый дашборд «Продажи за месяц».
- Добавьте виджет:
- Тип «Линейный график».
- Источник: таблица
sales. - X‑ось:
order_date, Y‑ось:SUM(amount).
- Добавьте фильтр по полю
category. - Опубликуйте дашборд и посмотрите, как работает фильтр.
Задание 3. Настройка прав доступа
- Перейдите в «Пользователи и группы».
- Создайте группу «Analysts» и добавьте в неё тестового пользователя.
- Откройте настройки дашборда «Продажи за месяц» → «Доступ», дайте группе Analysts роль Viewer.
- Проверьте под учётной записью другого пользователя, что без доступа дашборд не виден.
Задание 4. Авто‑обновление и алерты
- В настройках дашборда установите Cache TTL = 1 час.
- Перейдите в раздел «Алерты» и создайте правило:
- Условие:
SUM(amount) < 1000за вчера. - Расписание: каждый день в 08:00.
- Получатели: ваша почта или Slack‑канал.
- Условие:
- Проверьте: выполните вручную «Проверить сейчас» и убедитесь, что уведомление отправилось (или увидели результат проверки).
Metabase: быстрые опросы, embedding, alerting
Что такое Metabase?
Metabase — это простой и доступный инструмент бизнес‑аналитики с веб‑интерфейсом. Представьте, что вам нужно быстро получить отчёт по продажам или кликам, и при этом вы не хотите писать сложные скрипты или настраивать дорогие BI‑системы.
Metabase:
- Устанавливается за пару минут на вашем сервере или локальной машине (есть Docker‑образ).
- Подключается к любым популярным базам (PostgreSQL, MySQL, ClickHouse и др.) или даже к CSV‑файлам.
- Позволяет «на лету» строить простые отчёты (Questions) щёлканиями мышки и более сложные — на чистом SQL (Native queries).
- Объединяет эти отчёты в дашборды, даёт возможность быстро фильтровать данные и переходить внутрь деталей.
- Поддерживает встраивание отчётов в ваши сайты и уведомления (alerts), если что‑то пошло не так (например, объём продаж упал ниже порога).
Metabase очень дружелюбен к новичкам: основную работу можно делать без единой строчки кода — достаточно выбрать таблицу, перетащить нужные поля и получить график или таблицу.
Основные возможности Metabase
- Быстрые опросы (Questions)
– Simple: выбор полей и автоматическая визуализация.
– Custom: графики и группировки через интерфейс без SQL.
– Native: полный контроль через SQL‑запросы.
– Все Questions можно сохранять, группировать по коллекциям и добавлять фильтры. - Дашборды
– Сборник виджетов (Questions) на одном экране.
– Общие фильтры, drill‑down (клик по точке графика ведёт к деталям).
– Настройка сетки, размера и порядка блоков. - Embedding (встраивание)
– Публичные ссылки для внешних пользователей.
– Защищённые iframe‑виджеты с signed JWT — можно встроить в ваш сайт или приложение, передавать параметры (например, ID пользователя). - Alerting (уведомления)
– Подписка на Question или весь Dashboard.
– Триггеры: пороговые условия (например,orders < 100).
– Доставка в email, Slack или на любой webhook. - Управление доступом
– Роли: Viewer (просмотр), Editor (редактирование), Admin (управление).
– Группы пользователей, row‑level security для ограничения видимых строк и data masking. - Коннекторы
– ClickHouse, PostgreSQL, MySQL, MongoDB, Google BigQuery и др.
– CSV/JSON‑файлы, HTTP API.
– Обновление схемы («обновить таблицу») при изменении структуры источника.
Быстрые опросы (Questions)
Metabase называет «опросом» любую единичную визуализацию или таблицу данных — это как “робкий мини‑отчёт”.
- Simple Question («Простой опрос»)
– Вы кликаете на таблицу, выбираете несколько полей и Metabase сам подбирает виджет (график или таблицу).
– Не требует писать код. - Custom Question («Настраиваемый опрос»)
– Вы вручную выбираете, какие колонки и агрегации показывать (сумма, среднее, число строк) и где их разместить.
– Можно добавлять фильтры и группировки, но всё ещё без SQL. - Native Query («Нативный запрос»)
– Полный SQL‑код, который вы пишете сами.
– Даёт максимальную гибкость: любые сложные выборки, JOIN‑ы и подзапросы.
Организация
- Коллекции — папки, куда складываете вопросы по темам (
Sales Analytics,Marketing,Support). - Фильтры и ползунки — вынесенные наружу элементы управления: например, на дашборде есть слайдер по датам, и он влияет на все вложенные вопросы.
Embedding (встраивание отчётов)
Как показать свой отчёт не только в Metabase, но и на вашем сайте или в приложении:
- Публичная ссылка
– Простая URL‑ссылка, чтобы любой, у кого есть адрес, мог видеть отчёт.
– Минус — нет защиты, любой с ссылкой просмотрит данные. - Iframe с signed JWT
– Metabase генерирует специальный токен (JWT), который подписан и действителен ограниченное время.
– В вашем HTML-коде помещаете:<iframe src="https://metabase.company.com/embed/question/<QUESTION_ID>?jwt=<SIGNED_TOKEN>" width="800" height="600"></iframe>– Безопасность:
- Ограничение домена (только ваш сайт).
- Время жизни токена (например, 5 минут).
– Параметры: можно передать в токене фильтр по пользователю или дате.
Alerting (уведомления)
Metabase умеет следить за изменением чисел в вопросе или дашборде и рассылать уведомления.
- Подписка
– Можно подписаться на отдельный Question или весь Dashboard. - Триггеры
– Условие: например, «если количество заказов за сегодня < 100».
– Частота: раз в час, раз в день, раз в 10 минут. - Каналы доставки
- Email — письмо на почту.
- Slack — сообщение в нужный канал.
- Webhook — отправить POST‑запрос в ваше приложение.
Устный опрос vs мини‑практика
- Устный опрос (5 мин)
- Как в Metabase создать Native query?
- В чём разница между Question и Dashboard?
- Какие есть каналы для alerting?
- Мини‑практика
Обязательно: Metabase интуитивен, но только вживую вы увидите, как работают конструктор вопросов, встраивание и алерты.
Задание 1. Создание быстрого опроса
- Войдите в Metabase и нажмите «New → Question».
- Выберите таблицу
orders. - Настройте:
- Metric:
SUM(amount) - Breakout:
product_name
– получите «топ‑5 товаров по продажам».
- Metric:
- Сохраните вопрос в коллекцию Sales Analytics.
Задание 2. Native SQL‑запрос
- Нажмите «New → Native query».
- Впишите SQL:
SELECT date_trunc('day', order_date) AS day, COUNT(*) AS orders FROM orders WHERE order_date >= current_date - interval '7' day GROUP BY 1 ORDER BY 1; - Запустите и сохраните как «Weekly Orders».
Задание 3. Embedding в HTML‑страницу
- Найдите ваш Question и нажмите «Embed → Embed this question».
- Выберите вариант Signed embedding и скопируйте iframe‑код.
- Создайте простой файл
embed.html:<!DOCTYPE html> <html><body> <h1>Embedded Report</h1> <!-- вставьте сюда iframe‑код --> </body></html> - Откройте
embed.htmlв браузере и убедитесь, что отчёт отображается.
Задание 4. Настройка алерта
- Откройте Question «Weekly Orders».
- Нажмите «Subscribe» → «Alert».
- Условие:
orders < 100 AND day = current_date. - Частота: раз в час.
- Канал: email или Slack (введите тестовый адрес/канал).
- Нажмите «Save» и «Test now», чтобы убедиться, что уведомление уходит.
Оркестрация данных
Apache Airflow: DAG‑ы, операторы, XCom, SLA, RBAC
Что такое Airflow и где он нужен?
Представьте, что у вас есть набор шагов, которые нужно выполнить с определённым расписанием и в чётком порядке: например, каждую ночь собирать данные, обрабатывать их, загружать отчёты. Airflow — это как диспетчер задач для таких конвейеров (ETL, ML‑pipeline и т. д.):
- Работает по расписанию (как cron),
- Видит зависимости между задачами (если шаг A упал, не пиши в базу шаг B),
- Имеет удобный веб‑интерфейс для мониторинга.
Архитектура Airflow
- Scheduler («планировщик») — запускает задачи точно по расписанию и следит за зависимостями.
- Webserver — веб‑панель, где вы видите свои DAG’и, логи и состояние.
- Metadata DB — база (Postgres/SQLite), где Airflow хранит информацию о запусках.
- Executors — механизмы выполнения:
- LocalExecutor — всё на одной машине,
- CeleryExecutor — задачи раздаются воркерам через очередь (Redis/RabbitMQ),
- KubernetesExecutor — каждая задача — свой Pod в Kubernetes.
DAG: Directed Acyclic Graph
- DAG (ориентированный ациклический граф) — ваша программа в Airflow.
- Узлы — задачи (tasks),
- Ребра — зависимости (
task1 >> task2).
- Как определить:
from airflow import DAG from datetime import datetime, timedelta default_args = { 'owner': 'analyst', 'email': ['you@example.com'], 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'my_first_dag', default_args=default_args, start_date=datetime(2025,4,18), schedule_interval='@daily' ) - Best practices
– Храните DAG’и в отдельных файлах по смыслу,
– Используйте шаблоны (TaskGroups),
– Версионируйте в Git.
Операторы и сенсоры
- Операторы — атомарные шаги:
BashOperator— выполнить команду Linux,PythonOperator— вызвать функцию на Python,SparkSubmitOperator— запустить Spark‑задачу.
- Сенсоры — ждут какого‑то события:
HttpSensor— пока не вернётся HTTP 200,S3KeySensor— пока в бакете не появится нужный файл.
XCom: обмен данными между тасками
- XCom (cross‑communication) — небольшие сообщения, которые одна задача push делает, а другая pull читает.
- Пример:
def producer(**ctx): ctx['ti'].xcom_push(key='result', value=42) def consumer(**ctx): val = ctx['ti'].xcom_pull(task_ids='task_producer', key='result') print("Got:", val)
SLA и оповещения
- SLA (Service Level Agreement) — максимальное время выполнения задачи.
- Задаётся в
default_argsили у оператора:task = BashOperator( task_id='task_2', bash_command='echo world', sla=timedelta(minutes=1), dag=dag ) - on_sla_miss_callback позволяет отправить письмо или записать в лог, если задача не уложилась.
RBAC и безопасность
- RBAC (Role‑Based Access Control) — по ролям в UI:
- Admin — полные права,
- User — выполнять и просматривать свои DAG’и,
- Op — операционные действия (например, перезапуск задач).
- Интеграция с LDAP или OAuth для единого входа.
Устный опрос vs мини‑практика
- Устный опрос
– «Что такое оператор и чем он отличается от сенсора?»
– «Как задать SLA для задачи?»
– «Для чего нужен XCom?»
– «Как включить RBAC в Airflow?» - Мини‑практика
– Обязательно: только создав свой первый DAG и прогнав задачи, вы поймёте, как всё работает на самом деле.
Задание 1. Локальный Airflow через Docker‑Compose
- Скопируйте готовый
docker-compose.ymlс сервисами:airflow-webserver,airflow-scheduler,postgres(Metadata DB),redis(для CeleryExecutor).
- Запустите:
docker-compose up -d - Откройте UI:
http://localhost:8080
Задание 2. Простейший DAG из двух тасков
- В папке
dags/создайте файлhello_dag.py:from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from datetime import datetime, timedelta default_args = {'start_date': datetime(2025,4,18)} dag = DAG('hello_dag', default_args=default_args, schedule_interval=None) task_1 = BashOperator(task_id='say_hello', bash_command='echo "hello"', dag=dag) task_2 = PythonOperator(task_id='say_world', python_callable=lambda: print("world"), dag=dag) task_1 >> task_2 - В UI нажмите Trigger DAG, посмотрите логи.
Задание 3. Использование XCom
- Измените
say_helloна Python‑таску:def push_fn(ti): ti.xcom_push(key='msg', value='hello!') push = PythonOperator(task_id='push', python_callable=push_fn, dag=dag) - В
say_worldдостаньтеmsgи выведите:def pull_fn(ti): val = ti.xcom_pull(task_ids='push', key='msg') print("Pulled:", val) pull = PythonOperator(task_id='pull', python_callable=pull_fn, dag=dag) push >> pull
Задание 4. Настройка SLA и callback
- В
default_argsдобавьте:'sla': timedelta(minutes=1), 'on_sla_miss_callback': lambda context: print("SLA missed!", context) - Сделайте
say_worldзадержку:bash_command='sleep 90' - Запустите — в логах должно появиться «SLA missed!».
Задание 5. Включение RBAC и создание ролей
- В
airflow.cfgустановите:rbac = True - Перезапустите webserver.
- В UI Security → List Roles создайте роли analyst (Viewer) и engineer (Editor).
- Импортируйте тестовых пользователей и назначьте им роли.
- Войдите за «analyst» — убедитесь, что можно смотреть, но нельзя редактировать DAG’и.
Kubernetes: управление Spark‑, Trino‑, ClickHouse‑кластерами, Helm‑чарты, auto‑scaling
Почему Kubernetes для Big Data‑кластеров?
- Единая платформа
– Вместо каждого сервиса под свои виртуалки вы получаете общий «пул» ресурсов, где запускаются и Spark‑задачи, и Trino, и ClickHouse. - Декларативность
– Вы описываете желаемое состояние в манифестах (YAML), а Kubernetes сам следит, чтобы оно было выполнено («self‑healing»). - Масштабирование
– Легко добавить реплики или ноды, а потом так же быстро их убрать.
CRD и операторы
- CRD (Custom Resource Definition)
– Расширяет Kubernetes, добавляя новые типы объектов. - Операторы
– Специальные контроллеры, которые знают, как запускать и управлять конкретным приложением:- Spark Operator
- Позволяет описать Spark‑задачу через ресурс
SparkApplication. - Автоматически создаёт поды‑driver и поды‑executor, следит за их логами и завершением.
- Позволяет описать Spark‑задачу через ресурс
- Trino Operator / Helm‑чарт
- Создаёт StatefulSet для coordinator и Deployment для workers, настраивает кластер в один командой.
- ClickHouse Operator
- Управляет StatefulSet’ами, шардированием и репликами, создаёт сервисы и PVC.
- Spark Operator
Helm‑чарты и values.yaml
- Helm‑чарт — это «шаблон» Kubernetes‑манифестов с переменными.
- values.yaml — файл, где вы задаёте конкретные параметры (образ, число реплик, ресурсы).
- Override
– Для разных сред (dev,test,prod) вы можете иметь своиvalues-dev.yaml,values-prod.yamlи подставлять их при установке:helm install my-spark spark-operator/spark-operator \ --namespace spark --values values-prod.yaml - Зависимости
– Чарт может подтягивать другой (например, Spark Operator и вместе с ним Hive Metastore).
Хранение данных и конфигурация
- PersistentVolumeClaim (PVC)
– Часть диска кластера, чтобы ClickHouse мог хранить данные «вечно», даже если под умер. - ConfigMap
– Для хранения общих настроек (например,trino.properties). - Secret
– Для паролей и ключей (образы с учётными данными к базе).
Auto‑scaling
- Horizontal Pod Autoscaler (HPA)
– Следит за загрузкой CPU/памяти в подах (например, Trino‑coordinator) и автоматически увеличивает или уменьшает число реплик:kubectl autoscale deployment trino-coordinator \ --cpu-percent=50 --min=1 --max=3 -n trino - Spark dynamic allocation
–spark.kubernetes.executor.autoscaling.enabled=trueпозволяет Spark самому просить больше executors, когда нужно. - Cluster Autoscaler
– Подключается к облачному провайдеру (Yandex Cloud, GKE, EKS) и добавляет узлы в node pool, когда поды не помещаются из‑за ресурсов.
Устный опрос vs мини‑практика
-
- Что такое CRD и зачем он нужен?
- В чём отличие HPA и Cluster Autoscaler?
- Зачем ConfigMap и Secret?
- Мини‑практика
– Обязательно: только развёртывая реальные чарты и CRD‑объекты вы поймёте, как это всё работает вместе.
Задание 1. Установка Spark Operator
- Добавьте репозиторий и установите оператор:
helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator helm install spark-operator spark-operator/spark-operator \ --namespace spark-operator --create-namespace - Создайте манифест
spark-pi.yaml:apiVersion: sparkoperator.k8s.io/v1beta2 kind: SparkApplication metadata: name: spark-pi namespace: spark-operator spec: type: Scala mode: cluster image: bitnami/spark:latest mainClass: org.apache.spark.examples.SparkPi arguments: - "1000" driver: cores: 1 memory: "512m" executor: cores: 1 instances: 2 memory: "512m" - Запустите и проверьте статус:
kubectl apply -f spark-pi.yaml kubectl -n spark-operator get sparkapplications kubectl -n spark-operator logs spark-pi-driver
Задание 2. Деплой Trino через Helm
- Добавьте репозиторий и установите:
helm repo add trino https://trinodb.github.io/charts helm install trino trino/trino --namespace trino --create-namespace - Откройте UI (локальный форвард):
kubectl port-forward svc/trino-coordinator 8080:8080 -n trino - Измените
values.yaml(например, установитьworker.replicas: 2) и примените:helm upgrade trino trino/trino \ --namespace trino --values values.yaml
Задание 3. Развёртывание ClickHouse Operator
- Добавьте репозиторий и установите оператор:
helm repo add altinity https://altinity.github.io/altinity-helm-charts helm install clickhouse-operator altinity/clickhouse-operator \ --namespace clickhouse --create-namespace - Создайте CR манифест
chi.yaml:apiVersion: clickhouse.altinity.com/v1 kind: ClickHouseInstallation metadata: name: clickhouse-cluster namespace: clickhouse spec: configuration: clusters: - name: cluster1 layout: shardsCount: 3 replicasCount: 2 - Примените и проверьте StatefulSet’ы:
kubectl apply -f chi.yaml kubectl -n clickhouse get statefulsets
Задание 4. Настройка auto‑scaling для Trino
- Установите
metrics-serverв кластер (если ещё нет). - Создайте HPA:
kubectl autoscale deployment trino-coordinator \ --cpu-percent=60 --min=1 --max=3 -n trino - Сгенерируйте нагрузку:
hey -n 10000 -c 100 http://localhost:8080/v1/status - Просмотрите изменение числа реплик:
kubectl get hpa -n trino
Задание 5. Cluster Autoscaler в облаке
- Включите Cluster Autoscaler на уровне node pool (Yandex Cloud/GKE/EKS).
- Создайте нагрузку, которая требует больше ресурсов (например, запустите SparkApplication с 5 executors).
- Убедитесь, что добавились новые ноды в пул:
kubectl get nodes
Data Catalog & Governance
Подраздел «OpenMetadata: сбор метаданных, lineage, data‑stewardship»
Что такое OpenMetadata и зачем он нужен?
Представьте, что у вас в компании десятки баз данных, таблиц и отчётов. Никто не знает, где лежат нужные данные, как они связаны и кто за них отвечает. OpenMetadata — это единый каталог метаданных, который помогает:
- Централизовать описание всех таблиц, колонок, дашбордов и ETL‑джобов.
- Автоматически собирать структуру и профилировать качество (сколько пустых строк, типы полей).
- Видеть lineage — откуда данные пришли, какими преобразованиями они прошли и куда ушли дальше.
- Назначать ответственных (data stewards) за каждую сущность: кто владелец, кто куратор, кто проверяет качество.
Архитектура OpenMetadata
- Ingestion сервисы
- Коннекторы к разным источникам: PostgreSQL, Hive, BigQuery, S3 и др.
- Можно «тянуть» по расписанию (polling) или слушать события изменений (event‑driven).
- Metadata Store
- Обычно Postgres для сущностей и Elasticsearch для быстрого поиска.
- API‑Gateway
- REST API + Python‑SDK, через которые можно автоматизировать любую работу.
- UI
- Веб‑интерфейс, где виден граф lineage (связи между таблицами и скриптами), профили качества и атрибуты (теги, SLA‑метрики).
Сбор метаданных (Ingestion)
- Коннекторы
– Настраиваете «источник» с адресом базы и учётными данными. - Запуск сбора
– По расписанию (например, раз в день) подключается и собирает схему, статистику (количество строк, уникальных значений).
– Можно настроить реакцию на события, чтобы собирать изменения сразу после выпуска нового ETL‑джоба.
Lineage (прослеживаемость)
- Автоматическая
– OpenMetadata умеет встраиваться в SQL‑джобы и ETL‑пайплайны, видеть в них источники и цели. - Ручная коррекция
– В UI можно добавить связь, если коннектор что‑то не заметил. - Impact analysis
– По графу lineage вы сразу увидите, какие отчёты или downstream‑таблицы затронет изменение в исходной таблице.
Data Stewardship
- Роли
- Data Owner – «владелец» таблицы, отвечает за бизнес‑логику.
- Data Steward – куратор качества и терминологии, назначает теги, проверяет SLA.
- Процесс
- Engineer вносит новое описание или изменяет схему.
- Steward получает уведомление, проверяет соответствие стандартам.
- После одобрения изменения становятся «официальными» в каталоге.
API и интеграции
- REST API позволяет программно:
- запускать ingestion,
- получать список таблиц,
- читать и изменять lineage и теги.
- Python‑SDK для удобной работы в скриптах.
- Webhooks — оповещают внешние системы при изменении метаданных.
Устный опрос vs мини‑практика
- Устный опрос (5 мин)
- Что такое сущность (Entity) и связь (Relationship) в OpenMetadata?
- Чем отличается lineage от impact analysis?
- Кто такой Data Steward и какие у него задачи?
- Мини‑практика
– Обязательно: только зайдя в UI, настроив коннектор и увидев граф lineage, слушатели поймут, как каталог оживает.
Мини‑задания
- Развёртывание OpenMetadata
- Склонировать пример Docker‑Compose из репозитория OpenMetadata.
- Запустить команды
docker-compose up. - Открыть UI на
http://localhost:8585.
- Ingestion метаданных
- Настроить коннектор к тестовой базе PostgreSQL (адрес, логин, пароль).
- Запустить сбор (Ingestion) и убедиться, что таблицы и колонки появились в UI.
- Просмотр и анализ lineage
- В тестовой БД создать VIEW, который объединяет две таблицы:
CREATE VIEW orders_users AS SELECT o.order_id, o.amount, u.region FROM orders o JOIN users u ON o.user_id = u.id; - В OpenMetadata найти граф lineage этого VIEW и проследить связь до таблиц
ordersиusers.
- В тестовой БД создать VIEW, который объединяет две таблицы:
- Назначение Data Steward
- В UI создать пользователя с ролью Data Steward.
- Перейти к таблице
orders, добавить тег „Critical” и в комментарии описать политику качества (например, «sales_amount > 0»).
Роли и политики: кто меняет схему и проверяет качество
Важные роли в Data Governance
| Роль | Основные задачи |
|---|---|
| Data Owner | Определяет смысл таблицы, принимает решения по её изменению |
| Data Steward | Следит за терминологией, качеством, SLA, назначает теги |
| Data Custodian | Обеспечивает техническое хранение и безопасность данных |
| Data Engineer | Реализует изменения схем, пишет ETL, обеспечивает доставку данных |
Процесс изменения схемы
- Инициатор (Analyst или Engineer) предлагает изменение (документ, Pull Request).
- Ревью (Steward/Architect) проверяет корректность, совместимость и бизнес‑логику.
- Одобрение (Change Board) согласовывает сроки и уведомляет Custodian.
- Внедрение (Engineer) деплоит новую схему в систему.
- Мониторинг (Steward) отслеживает метрики качества и SLA.
Политики качества данных
- SLA‑метрики
- Полнота: доля пустых значений < 1 %
- Актуальность: задержка обновления < 5 мин
- Корректность: валидация бизнес‑правил (например, суммы > 0)
- Threshold’ы и алерты
– Если метрика выходит за порог, Steward получает уведомление и начинает расследование.
Коммуникация и эскалация
- Уведомления
– При нарушении SLA автоматом рассылаются письма/Slack‑сообщения Steward и Owner. - Data Council
– Регулярная встреча (раз в месяц) ответственных по данным: обсуждение инцидентов, планов изменений.
Устный опрос vs мини‑практика
- Устный опрос
- Кто отвечает за изменение схемы и кто за её хранение?
- Как выглядит процесс одобрения в Change Board?
- Какая метрика качества данных важнее всего?
- Мини‑практика
– Рекомендуется: разыграть ролевую игру Change Board, чтобы прочувствовать сложность согласования и важность коммуникации.
Мини‑задания
- RACI‑матрица для изменения схемы
- Составить таблицу, где по шагам процесса (предложение→ревью→деплой→мониторинг) указать:
- Responsible (кто делает),
- Accountable (кто отвечает),
- Consulted (кого нужно спросить),
- Informed (кого уведомить).
- Составить таблицу, где по шагам процесса (предложение→ревью→деплой→мониторинг) указать:
- Шаблон политики качества данных
- Опишите метрику “потеря записей”:
- Как её считать (разница ожиданий и факта),
- Пороговые значения,
- Процесс реагирования при нарушении.
- Опишите метрику “потеря записей”:
- Ролевая игра «Change Board»
- Раздайте участникам роли: Engineer, Analyst, Steward, Architect.
- Предложите реальный кейс изменения схемы (например, добавление новой колонки).
- Проведите обсуждение и голосование: утвердить или отклонить изменение, определив сроки и условия.
Monitoring & Alerting
Grafana: сбор метрик из Spark, Trino, ClickHouse, Airflow
Что такое Grafana?
Grafana — это веб‑инструмент для визуализации метрик и логов.
Вы подключаете к ней источник метрик (Prometheus, ClickHouse, Elasticsearch и др.), а Grafana рисует графики, таблицы и шильдики в удобных дашбордах.
Data Sources
- Prometheus: собирает метрики с экспортеров.
- Graphite: старая, но распространённая система.
- ClickHouse/Elasticsearch: можно записывать метрики сразу в колонковое хранилище.
- Как добавить: в UI → Configuration → Data Sources → Add → выбираете тип, указываете URL и аутентификацию, нажимаете Save & Test.
Панели и дашборды
- Панель (Panel) — единичная визуализация: time series, gauge, table.
- Дашборд (Dashboard) — набор панелей, расположенных на сетке.
- Построение:
- Создаёте дашборд → Add Panel.
- Пишете запрос к источнику (PromQL, SQL, Elasticsearch DSL).
- Выбираете тип графика и настраиваете оси, легенду, цвета.
Variables & Templating
- Переменные (Variables) дают интерактивность:
- Например, список кластеров Spark или namespaces в Kubernetes.
- В UI дашборда → Variables, выбираете источник и запрос, Grafana подтянет варианты.
- Использование: в запросе пишете
$clusterили${cluster}, и при смене переменной все панели меняются под выбранный кластер.
Alerting в Grafana
- Alert‑rule создаётся прямо на панели.
- В редакторе панели переходим на вкладку Alert.
- Задаём условие:
WHEN avg() OF query(A, 5m, now) IS ABOVE 80. - Настраиваем Notification channel (email, Slack, webhook).
- Отличие от Prometheus‑rule: Grafana умеет делать более «красивые» дашборд‑алерты и объединять несколько запросов в одном правиле.
Пользовательский доступ
- Роли:
- Viewer — только просмотр.
- Editor — может править панели и дашборды.
- Admin — полные права.
- Папки и права: дашборды группируются в папки, и к каждой папке можно задать права для группы пользователей.
Задание 1. Docker Compose‑стек
- Подготовьте
docker-compose.ymlс серверами Grafana и Prometheus:version: '3' services: prometheus: image: prom/prometheus ports: ['9090:9090'] volumes: - ./prometheus.yml:/etc/prometheus/prometheus.yml grafana: image: grafana/grafana ports: ['3000:3000'] - Запустите:
docker-compose up -d - Откройте Grafana:
http://localhost:3000(логин по умолчанию admin/admin).
Задание 2. Добавление Data Source
- В Grafana UI → Configuration → Data Sources → Add data source.
- Выберите Prometheus, введите URL
http://prometheus:9090, нажмите Save & Test. - Убедитесь, что статус «Data source is working».
Задание 3. Построение дашборда
- Создайте новый дашборд → Add Panel → Prometheus.
- Введите запрос
spark_executor_cpu_seconds_total(или аналог вашей метрики). - Перейдите во вкладку Transform → Add variable → создайте переменную
cluster, тип Query, запросlabel_values(spark_executor_cpu_seconds_total, cluster)→ Save. - В панели замените часть запроса на
$cluster, чтобы выбирать кластер из выпадающего списка.
Задание 4. Настройка Alert‑rule
- Перейдите в панель CPU и выберите Alert → Create Alert Rule.
- Условие:
- WHEN avg() OF query(A, 5m, now) IS ABOVE 80
- Добавьте Notification channel (email или webhook).
- Simulate load:
- На Prometheus-экспортер
node_exporter - Запустите
stressилиheyпо API, чтобы CPU вырос.
- На Prometheus-экспортер
- Дождитесь срабатывания алерта и проверьте уведомление.
Prometheus: экспортёры и alert‑rules
Что такое Prometheus?
Prometheus — это система мониторинга с базой временных рядов. Он scrape‑ит (собирает) метрики с экспортёров и хранит их в своей базе, а потом по этим метрикам можно строить отчёты и alert‑rules.
Exporters
- node_exporter: метрики ОС (CPU, память, диск).
- jmx_exporter: метрики Java‑приложений (Spark, Trino, Airflow) через JMX.
- clickhouse_exporter, airflow-exporter: специфичные для сервисов.
Scrape configuration
В prometheus.yml описываем, откуда брать метрики:
scrape_configs:
- job_name: 'node'
static_configs:
- targets: ['host.docker.internal:9100']
- job_name: 'spark'
static_configs:
- targets: ['spark-driver:8080']
- job_name — имя группы.
- targets — список адресов
host:port. - relabeling и discovery помогают автоматически находить поды в Kubernetes.
Alerting rules
В отдельном файле, например alerts.yml:
groups:
- name: spark.rules
rules:
- alert: SparkExecutorDown
expr: up{job="spark"} == 0
for: 2m
labels:
severity: critical
annotations:
summary: "Executor {{ $labels.instance }} is down"
- expr — PromQL выражение.
- for — время, в течение которого условие должно держаться, прежде чем сработает alert.
- labels и annotations — метаданные уведомления.
Alertmanager получает эти срабатывания, маршрутизирует по routes и отправляет в receivers (email, Slack, webhook).
Metric naming & labels
- snake_case:
spark_executor_cpu_seconds_total. - labels:
instance,job,cluster,namespace. - Яркое имя + понятные метки облегчают фильтрацию и агрегацию.
Задание 1. Подключение node_exporter
- Запустите:
docker run -d -p 9100:9100 prom/node-exporter - Добавьте в
prometheus.yml:scrape_configs: - job_name: 'node' static_configs: - targets: ['host.docker.internal:9100'] - Перезапустите Prometheus и проверьте на
http://localhost:9090/targets, что jobnodeактивен.
Задание 2. JMX‑exporter для Spark
- Скачайте
jmx_prometheus_javaagent.jarиconfig.yml(пример из репозитория). - Запустите Spark‑приложение с опцией:
--jvm-opts="-javaagent:/opt/jmx_prometheus.jar=8080:/opt/config.yml" - В
prometheus.ymlдобавьте:- job_name: 'spark' static_configs: - targets: ['spark-driver-host:8080'] - Убедитесь, что метрики видны в
http://localhost:9090/metrics.
Задание 3. Написание alert‑rule
- Создайте
alerts.ymlрядом сprometheus.yml:groups: - name: airflow.rules rules: - alert: AirflowSchedulerDown expr: up{job="airflow-scheduler"} == 0 for: 2m labels: severity: warning annotations: summary: "Airflow scheduler is down on {{ $labels.instance }}" - В
prometheus.ymlподключите:rule_files: - 'alerts.yml' - Перезапустите Prometheus и в UI проверьте Alerts — должна быть правило
AirflowSchedulerDown.
Задание 4. Интеграция с Alertmanager
- Запустите Alertmanager (через Docker).
- В
alertmanager.ymlдобавьте ресивер (email или Slack). - В Prometheus
prometheus.ymlукажите:alerting: alertmanagers: - static_configs: - targets: ['alertmanager:9093'] - Убедитесь, что при проверке правила (
ALERTSв UI) уведомления уходят в указанный канал.
Итоги и лучшие практики
CI/CD для ETL/SQL
CI/CD (Continuous Integration / Continuous Deployment) — это автоматизация проверки и развёртывания вашего кода.
Для ETL‑процессов и SQL‑скриптов это значит:
- Хранить весь код (DAG‑и, SQL, конфиги) в системе контроля версий (Git).
- Проверять каждое изменение автоматически: линтеры для SQL, unit‑ и integration‑тесты для Python/SQL.
- Деплоить в тестовую среду и затем в production без ручного копирования.
Задания
- Инициализировать Git‑репо
git init git checkout -b feature/etl-pipeline # добавить пару файлов (dag.py, script.sql), сделать коммит git add . git commit -m "Начальный ETL‑пакет" git push origin feature/etl-pipeline # открыть pull request в GitHub - Настроить SQLFluff
- Установить:
pip install sqlfluff sqlfluff-postgres - Создать
.sqlfluff:[sqlfluff] dialect = postgres - Запустить линтинг:
sqlfluff lint scripts/transform.sql
- Установить:
- GitHub Actions для Python‑ETL
- В
.github/workflows/ci.yml:name: CI on: [pull_request] jobs: lint: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - name: SQLFluff Lint run: sqlfluff lint scripts/transform.sql test: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - name: Run pytest run: pytest tests/ - Сделать PR и убедиться, что workflow запускается.
- В
- dbt‑модель и тест
- В проекте dbt создайте модель
models/my_model.sql:select * from {{ ref('raw_events') }} - В
schema.yml:models: - name: my_model columns: - name: id tests: - not_null - unique - Добавьте шаг в CI:
- name: dbt test run: | pip install dbt dbt deps dbt seed dbt run dbt test
- В проекте dbt создайте модель
Стоимость и оптимизация (compute, storage, network)
Ваш счёт в облаке складывается из трёх частей:
- Compute: VM‑часы, слоты BigQuery, CPU/Spark executors.
- Storage: гигабайты в S3 или в Data Lake, операции PUT/GET.
- Network: исходящий трафик (egress) и межрегиональные передачи.
Оптимизировать можно так:
- Правильно размерить кластер и включить auto‑scaling.
- Хранить старые данные в более дешёвом классе (Infrequent/Archive).
- Использовать lifecycle‑правила и partition pruning, чтобы читать и платить только за нужное.
Задания
- Анализ billing‑report
- Скачать CSV отчёт за последний месяц из cloud console.
- Открыть в Excel/Google Sheets:
- Найти топ‑3 самых дорогих cost centers (Team, Project).
- Настройка lifecycle‑правила в S3/YCS
- В консоли S3/YCS создайте правило:
- Перевод в Infrequent Access через 30 дней.
- Переход в Archive через 180 дней.
- В консоли S3/YCS создайте правило:
- Проверка auto‑scaling
- Запустите на кластере Spark‑job, постепенно увеличивающий нагрузку.
- Убедитесь, что количество executors растёт и уменьшается автоматически.
- Теги ресурсов
- Добавьте к VM/кластеру теги
team=analytics,env=prod. - Постройте отчёт по расходам в billing dashboard по тегам.
- Добавьте к VM/кластеру теги
Безопасность и управление доступом (IAM, RBAC, шифрование)
- IAM‑принцип — «минимальные права»: даёте пользователю только те доступы, что ему нужны.
- RBAC — доступ к интерфейсам Airflow, Trino, ClickHouse и BI‑сервисам по ролям (чтение, правка, админ).
- Шифрование
- At‑rest: SSE‑KMS или свой ключ (BYOK).
- In‑transit: TLS/SSL между клиентами и сервисами.
- Аудит: храните логи доступа в Cloud Audit Logs или SIEM, следите за изменениями через OpenMetadata.
Конкретные задания
- Создать роль
trino_readonlyв IAM- Дать минимальные права на чтение схемы и таблиц в S3/Glue.
- Включить RBAC в Airflow
- В
airflow.cfg:rbac = True. - Перезапустить сервис, создать группу
analystс ролью Viewer, проверить ограничения.
- В
- Настроить SSE‑KMS для бакета
- В консоли S3/YCS включить шифрование на уровне бакета с использованием KMS‑ключа.
- Просмотреть аудит в OpenMetadata
- В UI найти недавние события изменения схемы и доступа к таблицам.
Как выбирать инструмент под задачу
При выборе между Spark, Trino, ClickHouse и другими, учитывайте:
| Критерий | Spark | Trino | ClickHouse |
|---|---|---|---|
| Объём данных | > 100 GB (batch/ML) | < 100 GB (interactive) | < 10 TB (real‑time OLAP) |
| Задержка | минуты–часы | < 1 сек | миллисекунды–секунды |
| Язык | Python/Scala + SQL | SQL only | SQL only |
| Управление | свой кластер / managed | lightweight (serverless) | managed/Operator в K8s |
| Стоимость | spot‑инстансы, auto‑scale | pay‑per‑query | resource‑based |
Задания
- Матрица оценки инструментов
- Составить таблицу по приведённым критериям для вашего проекта (скопируйте шаблон выше и заполните).
- PoC Spark vs Trino
- На наборе данных ~100 GB:
- Spark: запустить
spark-sqlсоcount(*). - Trino: подключить тот же файл через Hive‑каталог и выполнить
SELECT COUNT(*).
- Spark: запустить
- Сравнить время и ресурсы.
- На наборе данных ~100 GB:
- Mini‑workshop
- Разделить участников на команды, дать кейс и варианты инструментов, собраться и защитить своё решение (5 мин на команду).
- Документирование
- В Wiki/Confluence или в репозитории создать страницу «Выбор Big Data инструментов» с вашими выводами и обоснованиями.



