Типовой Big Data стек на примере компании-маркетплейса

Стек Big Data

Стек Big Data — это совокупность технологий и инструментов, организованных в логичные уровни, которые вместе обеспечивают сбор, хранение, обработку, анализ и визуализацию очень больших, разнообразных и быстро меняющихся объёмов данных.

Ключевые характеристики Big Data (5 «V»):

  • Volume (объём) – хранение и обработка петабайт–экзабайт данных;
  • Velocity (скорость) – приём и обработка потоковых данных в реальном времени;
  • Variety (разнообразие) – структурированные, полуструктурированные и неструктурированные источники;
  • Veracity (достоверность) – обеспечение качества, чистоты и целостности данных;
  • Value (ценность) – быстрое извлечение бизнес‑инсайтов.

Основные слои стека Big Data:

  1. Sources (источники): реляционные/NoSQL‑БД, API, логи, IoT‑датчики, файлы, очереди сообщений.
  2. Ingest (погружение): пакетная (Spark, Hadoop MapReduce) и потоковая (Spark Streaming, Kafka Streams, Flink) загрузка.
  3. Storage (хранилище): «озеро» (Data Lake) на S3/Object Storage + табличные форматы (Hudi, Iceberg, Delta), специализированные базы (HBase, Cassandra).
  4. Compute (вычисления): распределённый SQL‑движок (Trino/Presto), фреймворки для ML и ETL (Spark, Flink), графовые движки.
  5. Serving / Speed Layer: быстрые OLAP‑решения (ClickHouse, Druid) для низколатентных запросов.
  6. BI & Visualization: дашборды и отчёты (Tableau, Looker, Metabase, Yandex DataLens).
  7. Orchestration & Infra: планирование и автоматизация (Airflow, Kubernetes, Terraform), мониторинг (Prometheus, Grafana), каталог метаданных (Glue, OpenMetadata).

Каждый уровень стека тесно интегрируется по следующим принципам:

  • Масштабируемость: линейное добавление ресурсов.
  • Устойчивость: отказоустойчивые кластеры и репликация.
  • Гибкость: возможность подменять компоненты без глобального рефакторинга.
  • Автоматизация: единые пайплайны CI/CD для кода и инфраструктуры.

Такой модульный подход позволяет строить конвейеры от источника до отчёта, адаптируя стек под бизнес‑задачи и технологические требования компании.

Содержание

Архитектурные слои типового Big Data‑стека

Ниже мы разберём основные слои, через которые проходит типичный поток данных в компании. Представьте себе процесс, как конвейер на фабрике: в начале приходят «сырьё» (данные), дальше оно обрабатывается, хранится, рассчитывается аналитика и, наконец, покупатель (или аналитик) видит готовый «товар» (отчёт или дашборд).


1. Источники (Sources)

Что это?
Все точки, где данные изначально генерируются.

Пример:

  • Пользователь заходит на сайт или в мобильное приложение и кликает на товар.
  • Загрузка фотографий товаров от поставщиков.
  • Интеграция с логистическими партнёрами (API для статуса отправлений).
  • Системы оплаты (банк‑эквайринг).

Простой язык: это как разные краны на кухне – один даёт воду из колодца (сайт), другой – фильтрованную воду (партнёрские API), третий – газированную (сервисы платежей).


2. Приём и сбор данных (Ingest)

Что это?
Компоненты, которые «ловят» эти данные в реальном времени или пакетно и доставляют дальше.

Как бывает устроено:

  • Stream‑инжест (Kafka, AWS Kinesis, Google Pub/Sub) – для кликов пользователей и логов.
  • Batch‑ингест (Airflow + Sqoop, Spark‑job, AWS Glue) – для периодического импорта ценников из ERP-системы.

Жизненный пример: представьте, что вы ставите сетчатое сито под краны: мелкие частицы (нужные данные) проходят через сито дальше, а всё лишнее отбрасывается.


3. Хранилище (Storage)

Что это?
Где данные «ждут», пока их не возьмут для обработки.

Основные виды:

  1. Data Lake (HDFS, S3, Google Cloud Storage) – «сырой» склад: хранятся всё подряд (логи, json‑файлы, csv).
  2. Data Warehouse (Amazon Redshift, Snowflake, Google BigQuery) – «чистый склад»: данные уже очищены и структурированы для аналитики.

Аналогия: похож на два склада: на одном – мешки с необработанным зерном (Data Lake), на другом – отборная мука в мешочках (Data Warehouse).


4. Обработка и вычисления (Compute)

Что это?
Где данные готовят: чистят, трансформируют, агрегируют, запускают машинное обучение.

Типовые инструменты:

  • Batch‑вычисления: Apache Spark, Hadoop MapReduce.
  • Stream‑вычисления: Flink, Spark Streaming, Kafka Streams.
  • ML‑платформы: MLflow, SageMaker, TFX.

Пример:

  • Расчёт рекомендаций: «вам может понравиться…» на основе истории просмотров.
  • Подсчёт продаж по категориям за день.
  • Обучение моделей прогнозирования спроса на украшения к празднику.

Упрощённо: это кухня, где из муки и других ингредиентов (данных) пекут хлеб (аналитику, модели).


5. Инструменты презентации и BI (Presentation/BI)

Что это?
Инструменты, в которых конечный пользователь (аналитик, маркетолог, менеджер) видит готовые отчёты, дашборды, визуализации.

Популярные BI‑решения:

  • Tableau, Power BI, Looker, Superset.
  • Self‑service‑платформы внутри компании.

Пример:

  • Дашборд «Конверсия сайта»: показывает, сколько человек добавили товар в корзину и оформили заказ.
  • Отчёт «Самые продаваемые товары по регионам» для руководителя по продажам.

Жизненная аналогия: это витрина магазина, где покупатель (пользователь отчёта) видит уже упакованные и красиво разложенные товары.


6. Оркестрация, инфраструктура и мониторинг (Orchestration/Infra/Monitoring)

Что это?
Набор сервисов и процессов, которые управляют всем этим «конвейером», следят, чтобы он не остановился и работал без сбоев.

  • Оркестрация: Apache Airflow, Prefect, Dagster – отвечает за расписание и зависимость задач.
  • Инфраструктура: кластерные менеджеры (YARN, Kubernetes), облачные сервисы (AWS EMR, GCP Dataproc).
  • Мониторинг: Prometheus, Grafana, ELK‑стек – собирают метрики (время работы задач, ошибки, задержки).

Пример:

  • Ежедневно в 3:00 запускается ETL‑пайплайн – Airflow проверяет успешность каждого шага, в случае сбоя отправляет уведомление в Slack.
  • Kubernetes автоматически создаёт новые поды для Spark‑задач, если нагрузка растёт.
  • Grafana показывает, что задержка доставки кликов в Kafka выросла выше порога, и команда SRE сразу получает оповещение.

Как мы заботимся о конвейере: представьте диспетчерскую, где операторы следят за всеми линиями, перенастраивают оборудование и мгновенно реагируют на аварии.


Краткая схема потока

Краткая схема потока

Описание технологического процесса

Какие события (events) мы собираем?

  1. Типы событий
    • Click – любое нажатие (кнопка «Добавить в корзину», «Купить», навигация по категориям).
    • View (просмотр) – открытие карточки товара, просмотр баннера, пролистывание ленты.
    • Add to cart (добавление в корзину) – момент, когда товар попал в корзину.
    • Purchase (покупка) – успешное завершение заказа и оплата.
  2. Метаданные каждого события
    Поле Описание Пример
    user_id Уникальный идентификатор пользователя u_12345
    session_id Сессия – группа действий за один заход на сайт sess_20250417_987
    timestamp Временная метка в формате ISO 8601 2025-04-17T14:22:05Z
    product_id Идентификатор товара или баннера p_67890
    additional Доп. поля: device (mobile/desktop), referrer (откуда пришли) mobile, google.com

Пример события в JSON:

{
  "event_type": "add_to_cart",
  "user_id": "u_12345",
  "session_id": "sess_20250417_987",
  "timestamp": "2025-04-17T14:22:05Z",
  "product_id": "p_67890",
  "device": "mobile"
}

Где и как фиксируются эти события?

  1. Клиентская сторона (JS‑трекинг)
    • В браузере или мобильном приложении встроен код, который “подслушивает” клики/просмотры и отправляет события на сервер.
    • Инструменты: Google Tag Manager, собственные SDK.
  2. Лог‑сервисы
    • Сервер приложения тоже пишет логи (NGINX, backend‑сервисы) на дисковый кластер.
    • Логи собираются через Filebeat/Logstash и сразу “продавливаются” в очередь.
  3. Очередь сообщений (Kafka)
    • Все события попадают в топики Kafka:
      • clicks, views, purchases.
    • Плюсы: масштабируемость, гарантированная доставка, возможность “проиграть” логи заново.

Жизненный пример:
JS‑скрипт на вашем сайте – это будильник, который мгновенно “звонит” при каждом клике и диктует его детали в Kafka.


Как из сырых логов формируется «сырое» хранилище?

  1. Парсинг
    • Читаем сообщения из Kafka или файлов лога, разбираем JSON/строку на поля.
  2. Валидация
    • Проверяем, что обязательные поля (user_id, timestamp, event_type) есть и валидны.
    • Отбрасываем “битые” или подозрительные записи (например, события с будущей датой).
  3. Первичная агрегация
    • По необходимости объединяем мелкие события в “сессию” или считаем счётчики на лету (например, пустые клики фильтруем).
  4. Запись в Data Lake
    • Сохраняем чистый, но необработанный датасет в формате Parquet/ORC (каталог по дате и топику).

Какие метрики и KPI нужны бизнес‑пользователям?

  1. Фундаментальные показатели
    • Просмотры (Views)CTR (Click‑Through Rate) = clicks / views
    • Конверсия = purchases / sessions или purchases / clicks
    • ARPU (Average Revenue Per User) = total_revenue / number_of_users
    • LTV (Lifetime Value) – суммарная выручка с одного клиента за всё время
  2. Разрезы отчётов
    • Ассортимент: какие категории или SKU приносят больше кликов/продаж.
    • География: регионы с наибольшей активностью.
    • Время: часы, дни недели, сезонность.

Пример:
Если в понедельник на мобильном CTR упал до 1 %, это может сигнализировать о неисправности трекинга или изменении в дизайне.


Какие этапы ETL/ELT‑пайплайна?

  1. Ingestion – сбор и загрузка сырых логов (Kafka → Data Lake).
  2. Чистка (Cleaning)
    • Удаление дубликатов.
    • Приведение типов (строка → дата, число).
    • Удаление аномалий (нелогичные timestamps).
  3. Обогащение (Enrichment)
    • Привязка product_id → категория, бренд, цена (склеивание со справочником).
    • Привязка user_id → сегмент пользователя (гость, зарегистрированный, VIP).
  4. Агрегация
    • Считаем daily/hourly показатели: клики, просмотры, добавления в корзину.
    • Сохраняем агрегаты в Speed Layer или Data Warehouse.

Где и в каком формате хранятся результирующие таблицы?

  • Data Lake (долговременное хранение)
    • Форматы: Apache Hudi, Iceberg, Delta Lake на S3/HDFS.
    • Идеально для “исторических” данных и large‑scale анализов.
  • Speed Layer (горячее хранилище)
    • ClickHouse, Druid или Elasticsearch для быстрых OLAP‑запросов.
    • Отвечает на вопросы “real‑time”: сколько кликов за последние 5 минут.

Как строятся дашборды и отчёты?

  1. Выбор источника
    • OLAP‑хранилище (Snowflake, BigQuery) – для исторических отчётов.
    • Speed Layer (ClickHouse) – для near‑real‑time мониторинга.
  2. Визуализация
    • Таблицы, графики временных рядов, воронки (CTR, конверсия).
    • Фильтры: по дате, региону, категории.
  3. Расписание обновлений
    • Batch‑дашборды — раз в день в 01:00.
    • Real‑time мониторинг — каждые 5 минут.

Как организовать проверку качества данных?

  1. Контрольные выборки
    • Случайным образом сверять сырые логи с данными в хранилище.
  2. Мониторинг тайм‑серий
    • Автоматически отслеживать разрывы или резкие скачки (меньше 10 кликов за минуту → тревога).
  3. Алерты
    • Интеграция с Grafana/Prometheus → уведомления в Slack/почту.

Данные для выполнения заданий.

Вот Python-скрипт для генерации тестовых логов пользовательских событий с учётом:

  • разных вероятностей для каждого типа события;
  • добавления категорий товаров, основанных на ассортименте Lamoda;
  • сохранения данных в форматах JSON и CSV.

Используйте его для генерации фейковых логов и выполнения тренировочных заданий.


📦 Установка зависимостей

Убедитесь, что установлены необходимые библиотеки


pip install faker pandas


🐍 Обновлённый скрипт генерации логов

import random
import json
import pandas as pd
from faker import Faker
from datetime import datetime, timedelta

fake = Faker()

# Типы событий с соответствующими вероятностями
EVENT_TYPES = ["click", "view", "add_to_cart", "purchase"]
EVENT_WEIGHTS = [0.4, 0.3, 0.2, 0.1]  # click: 40%, view: 30%, add_to_cart: 20%, purchase: 10%

# Устройства и источники перехода
DEVICES = ["mobile", "desktop"]
REFERRERS = ["google.com", "yandex.ru", "facebook.com", "instagram.com", "direct", "email"]

# Категории товаров на основе ассортимента Lamoda
CATEGORIES = [
    "Одежда", "Обувь", "Аксессуары", "Красота", "Дом", "Спорт", "Premium", "Resale"
]

# Генерация списка товаров с уникальными идентификаторами и категориями
PRODUCTS = [
    {"product_id": f"p_{i}", "category": random.choice(CATEGORIES)}
    for i in range(1000, 1100)
]

# Список пользователей
USERS = [f"u_{i}" for i in range(10000, 10100)]

def generate_event():
    user_id = random.choice(USERS)
    session_id = f"sess_{datetime.utcnow().strftime('%Y%m%d')}_{random.randint(100, 999)}"
    timestamp = (datetime.utcnow() - timedelta(seconds=random.randint(0, 86400))).isoformat() + "Z"
    product = random.choice(PRODUCTS)
    event_type = random.choices(EVENT_TYPES, weights=EVENT_WEIGHTS, k=1)[0]
    device = random.choice(DEVICES)
    referrer = random.choice(REFERRERS)

    event = {
        "event_type": event_type,
        "user_id": user_id,
        "session_id": session_id,
        "timestamp": timestamp,
        "product_id": product["product_id"],
        "category": product["category"],
        "device": device,
        "referrer": referrer
    }

    return event

# Генерация N событий
def generate_events(n=1000):
    return [generate_event() for _ in range(n)]

# Сохранение в JSON
def save_to_json(events, filename="events.json"):
    with open(filename, "w", encoding="utf-8") as f:
        json.dump(events, f, ensure_ascii=False, indent=2)

# Сохранение в CSV
def save_to_csv(events, filename="events.csv"):
    df = pd.DataFrame(events)
    df.to_csv(filename, index=False)

if __name__ == "__main__":
    events = generate_events(1000)
    save_to_json(events)
    save_to_csv(events)
    print("✔ Логи успешно сгенерированы и сохранены в файлы 'events.json' и 'events.csv'")

### 📁 Пример строки в `CSV`

event_type,user_id,session_id,timestamp,product_id,category,device,referrer
click,u_10023,sess_20250417_442,2025-04-17T13:45:23Z,p_1035,Обувь,desktop,yandexru

Задания (низкий порог входа)

  1. SQL‑челлендж на готовом CSV‑журнале кликов
    • Задача 1: посчитать общее число кликов и уникальных сессий.
      SELECT COUNT(*) AS total_clicks,
             COUNT(DISTINCT session_id) AS unique_sessions
      FROM clicks_csv;
      
    • Задача 2: найти топ‑5 самых популярных товаров по кликам.
      SELECT product_id,
             COUNT(*) AS clicks
      FROM clicks_csv
      GROUP BY product_id
      ORDER BY clicks DESC
      LIMIT 5;
      
  2. Построить простой отчёт в Metabase/Excel
    • Что сделать:
      1. Построить график «клики и продажи по дням».
      2. Добавить интерактивный фильтр по категории товара.
    • Польза: научиться работать с дашборд‑инструментом, увидеть, как одни и те же данные можно представить разными способами.
  3. Распределить KPI‑метрики на листе бумаги/доске
    • Задача: нарисовать схему, какие поля из событий нужны для расчёта:
      • CTR: views и clicks
      • Конверсия: clicks и purchases
    • Цель: понять взаимосвязь данных и метрик до погружения в код.

Роли и зоны ответственности в процессе работы с Big Data

Data Engineer

Задачи

  1. Сбор данных
    • Настройка потоков — Kafka, API-коннекторы, файловые загрузки.
    • Пример: пишете коннектор на Python, который раз в час вычитывает CSV от логистов и шлёт в Kafka.
  2. Очистка и трансформация
    • Парсинг сырых событий, удаление дубликатов, приведение типов.
    • Пример: фильтруете «битые» JSON-события с некорректным timestamp.
  3. Построение ETL/ELT‑пайплайнов
    • Пишете DAG в Airflow: ingestion → cleaning → enrichment → загрузка в DW.
    • Параметризуете расписание (ежедневно в 2:00), retry-политику, уведомления.
  4. Деплой и поддержка
    • Конфигурируете Kubernetes‑Job или EMR‑кластеры, чтобы скрипты запускались автоматически.
    • Следите, чтобы при росте данных масштабирование работало без ручных правок.

Инструменты

  • Spark/Flink — крупномасштабная обработка.
  • Trino — интерактивные ad‑hoc‑запросы по Data Lake.
  • Airflow/Dagster — оркестрация.
  • Kubernetes/EMR/Dataproc — инфраструктура выполнения.

Артефакты

  • Таблицы в Data Lake (Parquet/Hudi/Iceberg).
  • Скрипты ETL на Python/Scala/SQL.
  • DAG‑файлы (Python-модули или YAML).
  • Документация: READMEs, схемы полей, инструкции по запуску.

Взаимодействие

  • с DevOps — согласование CI/CD‑pipeline, контейнеризация, секреты и учётные данные.
  • с Data Analyst — контракт на формат таблиц, SLA свежести данных, обзор бизнес‑терминов.
  • с Data Steward — описание полей в каталоге, управление версионированием схем.

Data Analyst

Основные обязанности

  1. Анализ и отчёты
    • Строит дашборды по ключевым метрикам (CTR, конверсия, ARPU).
    • Пишет ad‑hoc‑запросы для маркетологов или продукта: «Как изменился ARPU после рассылки?»
  2. Визуализация
    • Настраивает графики временных рядов, воронки, картограммы регионов.
    • Пример: в Metabase делает дашборд «Динамика продаж по городам» с фильтрами.

Инструменты

  • ClickHouse, Snowflake, BigQuery — для быстрых OLAP‑запросов.
  • Metabase, Tableau, Power BI — BI‑конструкторы.
  • Excel/Google Sheets, Jupyter — для ad‑hoc‑отчётов.
  • SQL (+ Python/R при необходимости).

Какие метрики и дашборды строит

  • Конверсия: users→sessions→purchases.
  • Воронки: просмотры→клики→добавления в корзину→покупки.
  • ARPU и LTV: анализ распределения выручки по когорте.

Где берёт данные и как проверяет качество

  • Источник: таблицы в DW или Speed Layer, подготовленные Data Engineer.
  • Проверка:
    • Сравнивает итоговые суммы с «сырыми» логами.
    • Строит контрольные выборки (результаты за вчера vs за неделю назад).
    • Настраивает простые правила (например, если кликов < 100/минуту → алерт).

DevOps

Зона ответственности

  1. Инфраструктура
    • Кластеры для Spark, Trino, ClickHouse, Airflow, Kubernetes.
  2. CI/CD
    • Собирает образы контейнеров, пишет пайплайны в Terraform/Helm charts.
    • Автоматизирует тестирование ETL‑скриптов (unit и интеграционные).
  3. Мониторинг и авто‑скейлинг
    • Настраивает Prometheus + Grafana → собирает метрики задач, узлов кластеров.
    • Пишет правила автоскейлинга: при CPU > 70 % добавлять ноды.

Инструменты и практики

  • Terraform, Helm — инфраструктура как код.
  • GitLab CI, Jenkins, Argo CD — деплой.
  • Prometheus, Grafana, Alertmanager — мониторинг и оповещения.
  • SRE‑подход: SLO/SLI, run‑books, post‑mortems.

Взаимодействие с Data Engineer

  • Утверждают формат Docker‑образов и переменных окружения.
  • Настраивают секреты и доступы (IAM, Vault).
  • Организуют уведомления о падении пайплайнов (Slack, Email).

Data Steward

Ключевые задачи

  1. Метаданные и каталог
    • Поддерживает бизнес‑глоссарий, описания полей, схемы в OpenMetadata/Glue.
  2. Data Governance
    • Управляет правами доступа, классифицирует данные (PII, GDPR).
  3. Качество данных
    • Запускает профилирование (частота NULL, статистики распределения).
    • Фиксирует SLA на задержку обновления.

Инструменты

  • OpenMetadata, Glue Data Catalog, Alation — хранение метаданных и lineage.
  • Great Expectations, Deequ — автоматические проверки качества.
  • Collibra, Erwin — корпоративное управление политиками.

Lineage и коммуникация

  • Строит граф: от сырых логов к конечным дашбордам.
  • Вовлекает Data Engineer (изменение схем), Data Analyst (новые поля) и юридический отдел (регуляции).
  • Обрабатывает запросы на доступ: кто, зачем, на какой срок.

Задание 1. Короткий устный опрос (5 минут)

Разбейтесь на пары. Один задаёт вопросы, другой отвечает, затем меняетесь ролями. По очереди задайте друг другу:

  1. «Кто настроит, чтобы по ночам автоматически приходили новые данные?»
  2. «Кто нарисует график продаж в Metabase?»
  3. «Кому написать, если в описании таблицы пропали значения?»
  4. «Кто включит новые серверы, если нагрузка сильно выросла?»

Цель: прогреть память и проверить понимание ролей.


Задание 2. Практика: RACI‑таблица для простого процесса (20 минут)

Что такое RACI? Это таблица, где для каждого шага процесса мы указываем:

  • R (Responsible) — кто выполняет работу;
  • A (Accountable) — кто отвечает за итог;
  • C (Consulted) — с кем советуются;
  • I (Informed) — кого ставят в известность.

Сценарий:
Процесс «1) Сбор кликов → 2) Чистка и подготовка → 3) Загрузка в хранилище → 4) Публикация дашборда».
Ваша задача: заполнить таблицу, выбрав роли из списка: Data Engineer, Data Analyst, DevOps, Data Steward.

Шаг процесса R A C I
1. Сбор кликов из сайта
2. Чистка и проверка данных
3. Загрузка готовых таблиц в хранилище
4. Обновление дашборда в Metabase

После выполнения обсудите в группе: всё ли однозначно, кому что назначить.


Задание 3. Практика: карточки‑ролей (15 минут)

  1. Каждому участнику выдаётся карточка с описанием одной роли (Data Engineer, Data Analyst, DevOps, Data Steward).
  2. Внутри группы (4 – 5 человек) обменяйтесь карточками и прочитайте чужие описания.
  3. Постройте в группе «цепочку передачи данных» — кто после кого работает:
    1. Карточку «Data Engineer» кладёте первой, затем «Data Analyst», и т.д.
  4. Объясните друг другу, почему именно такой порядок, и какие точки взаимодействия между ролями.

Задание 4. Мини‑тест «Кто за что?» (10 минут)

Ответьте письменно на 5 вопросов (каждый — один из четырёх вариантов):

  1. Кто отвечает за настройку ночного обновления данных?
    A) Data Engineer
    B) Data Analyst
    C) DevOps
    D) Data Steward
  2. Кто строит графики и дашборды для руководителя?
    A) Data Engineer
    B) Data Analyst
    C) DevOps
    D) Data Steward
  3. Кто следит, чтобы в таблицах не было «пустых» или неверных значений?
    A) Data Engineer
    B) Data Analyst
    C) DevOps
    D) Data Steward
  4. Кто автоматически добавит новые серверы при росте нагрузки?
    A) Data Engineer
    B) Data Analyst
    C) DevOps
    D) Data Steward
  5. Кто согласует изменения в описании колонок (metadata)?
    A) Data Engineer
    B) Data Analyst
    C) DevOps
    D) Data Steward

Цель: самостоятельно проверить и сразу обсудить правильные ответы.

Источники данных

Object Storage

Принцип «ключ–значение»

  • Объект = блоб (blob) данных + его ключ (уникальное имя) + метаданные.
  • Представьте: вы храните семейные фото не в папках, а в наборе пронумерованных ячеек в шкафу. Ячейка имеет свой номер (ключ), а внутри – любой файл (объект).

Чем отличается от файловых и блочных хранилищ

Хранилище Структура Когда удобно
Файловая система Файлы и папки Когда нужна иерархия папок, блокировка файлов
Блочное хранилище Блоки одинакового размера Для виртуальных дисков, где операционная система формирует файловую систему поверх блока
Объектное хранилище Ключ–объект (нет папок!) Для больших объёмов: логи, бэкапы, статические файлы сайта

Архитектура и сравнение Yandex Cloud Object Storage vs AWS S3

Параметр AWS S3 Yandex Cloud Object Storage
Совместимость API Полная S3‑совместимость Почти полная S3‑совместимость
Региональность (regions) 25+ географических зон (например, us-east-1, eu-central-1) Зоны в России и Европе (например, ru-central1, eu-central-1)
SLA (гарантии доступности) 99.9 % (Standard), 99.99 % (Standard‑IA) 99.9 % (Basic), 99.95 % (Enhanced)
Ценообразование плата за хранение, за PUT/GET‑запросы, за исходящий трафик аналогично, но цены и пороги могут отличаться по регионам

Совет новичку: выбирайте регион ближе к вашим серверам и пользователям — так будет меньше задержка и дешевле трафик.


Основные концепции и операции

  1. Bucket (контейнер)
    • Как «коробка» для объектов. Имя должно быть уникальным в пределах провайдера.
  2. Object
    • Состоит из:
      • ключа (например, logs/2025-04-17/events.csv),
      • тела (сам файл),
      • метаданных (дата создания, размер, собственные теги).
  3. Versioning (версии)
    • Если включить, старые версии объекта не удаляются автоматически, можно откатиться к предыдущему состоянию.
  4. CRUD‑операции
    • PUT – загрузить объект;
    • GET – скачать объект;
    • DELETE – удалить;
    • LIST – получить список ключей (с поддержкой префиксов и пагинации).
  5. Префиксы и «псевдодиректории»
    • Хотя папок нет, ключи с общим началом (images/, logs/) позволяют группировать объекты.
  6. Lifecycle‑правила
    • Автоматизация: через настройки можно переводить старые объекты в «холодное» хранение или удалять их по расписанию.

Интеграция с ETL/SQL‑движками

  • Spark
    • Через spark-hadoop-connector читаем и пишем объекты S3/YCS:
      df = spark.read.csv("s3a://my-bucket/logs/2025-*.csv")
      
  • Trino / Hive
    • Подключаем S3‑коннектор в конфигурации сервера, работаем как с любой таблицей.
  • Табличные форматы
    • Храним данные в Parquet или ORC, а ещё в дата‑лейках типа Iceberg и Hudi для версионирования и оптимизации.

Безопасность и доступ

  1. IAM‑политики и роли
    • Задают, кто может читать/писать/удалять объекты.
  2. Подписи запросов (SigV4)
    • При прямом HTTP‑доступе к S3 каждый запрос подписывается ключом и секретом.
  3. Временные токены (STS)
    • Вместо постоянных ключей выдаём короткоживущие креденшалы (удобно для сервисов).
  4. Шифрование
    • На сервере: SSE‑S3 (ключ провайдера), SSE‑KMS (ваш ключ в KMS).
    • На клиенте: вы шифруете данные перед отправкой.

Best Practices

  • Префиксы для равномерной нагрузки
    • Разбивайте ключи: logs/2025/04/17/… вместо logs/20250417…, чтобы запросы не «накапливались» на одном месте.
  • Lifecycle‑правила
    • Горячие данные (последние 30 дней) оставляйте в Standard.
    • Старше – переводите в «холодное» хранилище или архив (Glacier/Cold HDD).
  • Оптимизация загрузок
    • Small‑objects (< 5 МБ): мелкие файлы лучше группировать перед загрузкой.
    • Large‑objects (> 100 МБ): используйте multipart‑upload, чтобы параллельно загружать части.

Практические задания (низкий порог входа)

Выберите среду: AWS или Yandex Cloud. Везде используется единый S3‑совместимый интерфейс.


Задание 1. Работа через CLI/консоль
  1. Создать bucket:
    • AWS:
      aws s3 mb s3://my-test-bucket-$(date +%s)
      
    • Yandex Cloud:
      yc storage bucket create --name my-test-bucket --default-storage-class standard
      
  2. Загрузить набор CSV‑логов (скачайте примеры из репозитория курса):
    aws s3 cp logs/ s3://my-test-bucket/logs/ --recursive
    
  3. Перечислить объекты:
    aws s3 ls s3://my-test-bucket/logs/
    
  4. Скачать один файл локально:
    aws s3 cp s3://my-test-bucket/logs/2025-04-17.csv ./2025-04-17.csv
    

Задание 2. Мини‑скрипт на Python (boto3 или yc‑sdk)
  1. Установите библиотеку:
    pip install boto3 pandas
    
  2. Напишите script.py:
    import boto3
    import pandas as pd
    
    # Настройка клиента (AWS или Yandex)
    s3 = boto3.client('s3',
        aws_access_key_id='ВАШ_КЛЮЧ',
        aws_secret_access_key='ВАШ_СЕКРЕТ',
        endpoint_url='https://storage.yandexcloud.net'  # для Yandex
    )
    
    bucket = 'my-test-bucket'
    prefix = 'logs/'
    
    # Получить список объектов
    objs = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
    keys = [item['Key'] for item in objs.get('Contents', [])]
    print("Найдено объектов:", len(keys))
    
    # Прочитать первый CSV в pandas
    first_key = keys[0]
    obj = s3.get_object(Bucket=bucket, Key=first_key)
    df = pd.read_csv(obj['Body'])
    print(df.head())
    
  3. Запустите и убедитесь, что данные считались.

Задание 3. Spark‑бомбардировка
  1. В окружении с PySpark (локально или в кластере):
    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
        .appName("ReadS3") \
        .config("spark.hadoop.fs.s3a.endpoint", "https://storage.yandexcloud.net") \
        .config("spark.hadoop.fs.s3a.access.key", "ВАШ_КЛЮЧ") \
        .config("spark.hadoop.fs.s3a.secret.key", "ВАШ_СЕКРЕТ") \
        .getOrCreate()
    
    # Чтение CSV прямо из bucket
    df = spark.read.csv("s3a://my-test-bucket/logs/*.csv", header=True, inferSchema=True)
    print("Всего записей:", df.count())
    
  2. Проверьте, что Spark видит все файлы и возвращает общее число строк.

Реляционные базы данных

Представьте себе обычную табличку в Excel:

id name age
1 Анна 28
2 Борис 35
3 Виктория 22
  • Таблица – это как лист Excel.
  • Строка – одна запись, как строка в таблице («Анна, 28»).
  • Столбец – одно свойство, например «name» или «age».
  • СУБД (система управления базами данных) – программа, которая хранит такие таблицы и позволяет быстро находить, изменять и связывать их между собой.

Почему «реляционная»?

«Реляционная» означает, что таблицы связаны друг с другом через специальные поля (ключи). Например, есть две таблицы:

customers

customer_id name
1 Анна
2 Борис

orders

order_id customer_id total
10 1 2500
11 2 1800
12 1 3900

Здесь поле customer_id связывает заказы с покупателями: заказ №10 принадлежит Анне.


Зачем нужны реляционные СУБД?

  1. Надёжность транзакций (ACID)
    – Если вы переводите деньги, важно, чтобы либо обе части операции прошли, либо ни одна. Аналогично, в базе «всё или ничего».
  2. Целостность данных
    – База сама следит, чтобы вы не вставили заказ с несуществующим customer_id.
  3. OLTP‑нагрузки
    – Быстрые операции: вставить новый заказ, обновить статус, проверить баланс.
  4. Лёгкие OLAP‑выборки
    – Небольшие отчёты: сколько заказов было вчера, средний чек и т.п.

Коротко о трёх популярных СУБД

СУБД Простыми словами
PostgreSQL «Умная» база: можно хранить в ячейках даже целые документы (JSON), добавлять собственные функции.
MySQL «Быстрая» база: отличная для сайтов, где важно быстро читать информацию (например, списки товаров).
Oracle «Тяжёлая артиллерия»: поддерживает сразу несколько серверов в кластере, очень защищена, но сложнее в настройке.

Как правильно проектировать таблицы (моделирование данных)

  1. Нормализация (1NF–3NF)
    – Не повторяйте одни и те же данные в разных местах.
    – Пример: вместо того чтобы в таблице заказов дублировать имя покупателя, храните customer_id, а имя берите из таблицы customers.
  2. Денормализация
    – Иногда копия данных ускоряет чтение. Например, вы можете сразу добавить customer_name в orders, чтобы не делать JOIN, но тогда надо помнить об обновлениях.
  3. Схема «звезда» для отчётов
    • Таблица фактов – основная, где хранятся числа (суммы заказов).
    • Таблицы-измерения – вокруг неё как лучи звезды: dim_customers, dim_date, dim_products.

Как реляционные БД вписываются в Big Data‑стек

  1. CDC‑стриминг (Debezium → Kafka)
    – Каждый раз, когда в таблице меняются строки, появляется сообщение в очереди.
  2. Доступ из Spark/Trino
    – Через JDBC (как подключиться к СУБД из Java‑программы) читаем таблицы прямо в большой кластер.
  3. Дампы (резервные копии)
    pg_dump или mysqldump сохраняют всю базу в файл, который потом можно восстановить.

Ежедневные операции с базами

  • Запуск и настройка
    – Локально: легко через Docker, чтобы потренироваться.
    – В облаке: сервис сам поддерживает базу (AWS RDS, Yandex Managed Service).
  • Бэкапы и восстановление
    – Делаем полную копию раз в день и журналы изменений, чтобы откатиться на любой момент.
  • Мониторинг
    – Смотрим: сколько сейчас подключений, не слишком ли долго выполняются запросы, нет ли «узких мест» (locks).

Безопасность и права доступа

  • Пользователи и роли
    – Как раздавать ключи от дверей: кто может читать данные, кто добавлять, кто изменять структуру.
  • Шифрование
    – Данные «на диске» и в пути (между вашим компьютером и сервером) шифруются, чтобы никто не подсмотрел.
  • Аудит
    – Запись того, кто и когда поменял что в базе.

Задание 1. Локальное развертывание СУБД (Docker, 10 минут)

  1. Выберите СУБД: PostgreSQL или MySQL.
  2. Запустите контейнер:
    # PostgreSQL
    docker run -d --name pg-test -e POSTGRES_PASSWORD=secret -p 5432:5432 postgres
    
    # MySQL
    docker run -d --name mysql-test -e MYSQL_ROOT_PASSWORD=secret -p 3306:3306 mysql
    
  3. Подключитесь через командную строку или DBeaver.
  4. Проверьте: создайте базу testdb и выполните SELECT version();.

Задание 2. Базовые SQL‑операции (15 минут)

  1. Создайте таблицы:
    -- для PostgreSQL
    CREATE TABLE customers (
      id SERIAL PRIMARY KEY,
      name TEXT,
      signup_date DATE
    );
    CREATE TABLE orders (
      id SERIAL PRIMARY KEY,
      customer_id INT REFERENCES customers(id),
      total NUMERIC(10,2),
      order_date DATE
    );
    
  2. Заполните 5–10 строк командой INSERT.
  3. Напишите и выполните запрос с JOIN, например:
    SELECT c.name, o.total
    FROM customers c
    JOIN orders o ON c.id = o.customer_id
    WHERE o.order_date >= '2025-04-01';
    
  4. Проверьте план через EXPLAIN (одно нажатие), чтобы увидеть, используются ли индексы.

Задание 3. Интеграция через JDBC в Spark (10 минут)

  1. В файле Spark‑сессии (pyspark или spark-shell) добавьте:
    df = spark.read \
      .format("jdbc") \
      .option("url", "jdbc:postgresql://localhost:5432/testdb") \
      .option("dbtable", "orders") \
      .option("user", "postgres") \
      .option("password", "secret") \
      .load()
    print("Записей в orders:", df.count())
    
  2. Запустите и убедитесь, что Spark видит таблицу и возвращает число строк.

Задание 4 (опционально для продвинутых). CDC → Kafka с Debezium (20 минут)

  1. Запустите Debezium‑коннектор в Docker, подключив его к вашему PostgreSQL/MySQL.
  2. Вставьте новую строку через SQL.
  3. Проверьте в топике Kafka: должно появиться сообщение с данными вставки.

NoSQL & Search: Elasticsearch, HBase, ClickHouse

Зачем нужны NoSQL и поисковые движки?

  • Когда реляционных БД мало
    – Очень быстро записывать тысячи сообщений лога в секунду;
    – Делать свободный поиск по тексту (например, найти все товары, где в описании встречается «хлопок»);
    – Запускать аналитические запросы по большим объёмам данных ближе к реальному времени.
  • Паттерны доступа
    1. Full‑text search (поиск по словам и фразам).
    2. Fast writes (быстрые вставки новых записей).
    3. OLAP‑запросы (сводные отчёты, агрегации).

Как выбрать NoSQL‑хранилище?

  • Документное (Elasticsearch)
    – Хранит JSON‑документы, хорошо ищет по тексту.
  • Ширококолоночное (HBase)
    – Таблицы с «супер‑столбцами» для огромных наборов строк, оптимально для time‑series.
  • Колоночное OLAP (ClickHouse)
    – Хранит данные по столбцам, очень быстро считает агрегаты.

Кратко про каждый инструмент

Elasticsearch
  • Что внутри
    • Индекс = аналог папки, где лежат документы.
    • Шард = кусок индекса (для масштабирования).
    • Реплика = копия, чтобы выдерживать сбои.
    • Inverted index = список «в каких документах встречается слово».
  • Сценарии
    • Лог‑аналитика (смотрим, какие ошибки выпали в приложении).
    • Поиск по товарным карточкам.
    • Быстрые сводки (агрегации по категориям, регионам).
  • Как подключить
    • Filebeat или Logstash → Kafka → Elasticsearch.
    • Дашборды в Kibana.
HBase
  • Что внутри
    • Хранит данные в HDFS (как большой файловый склад).
    • RegionServer = процесс, который обслуживает часть таблицы.
    • Master = координирует RegionServer‑ы.
  • Когда использовать
    • Огромные таблицы (миллиарды строк).
    • Чтение/запись по ключу (например, история кликов пользователя).
  • Как подключить
    • Spark‑коннектор читает/пишет прямо в HBase.
    • Trino может читать таблицы HBase.
ClickHouse
  • Что внутри
    • Хранение по столбцам → быстрее читать отдельные поля.
    • Движки MergeTree: автоматически сортируют и сливают файлы.
    • Партиции по дате (разбитие данных на месяцы/дни).
  • Сценарии
    • Speed Layer: real‑time сводки за последние минуты/часы.
    • OLAP‑отчёты с миллионами строк.
  • Как подключить
    • Запись из Airflow (batch) или Kafka (stream).
    • BI‑инструменты через JDBC.

Задание 1. Elasticsearch Quick Start

  1. Запустить Docker:
    docker run -d --name es -p 9200:9200 -e "discovery.type=single-node" elasticsearch:8.8.1
    docker run -d --name kibana -p 5601:5601 --link es kibana:8.8.1
    
  2. Создать индекс:
    curl -X PUT "localhost:9200/products?pretty"
    
  3. Добавить 5 JSON‑документов:
    curl -X POST "localhost:9200/products/_doc/1" -H 'Content-Type: application/json' -d'
    { "id": 1, "name": "Футболка хлопок", "category": "Одежда" }'
    

    (повторить для id=2…5)

  4. Поиск по тексту:
    curl -X GET "localhost:9200/products/_search?pretty" -H 'Content-Type: application/json' -d'
    { "query": { "match": { "name": "хлопок" } } }'
    
  5. Агрегация по категории:
    curl -X GET "localhost:9200/products/_search?pretty" -H 'Content-Type: application/json' -d'
    {
      "size": 0,
      "aggs": {
        "by_category": { "terms": { "field": "category.keyword" } }
      }
    }'
    

Задание 2. HBase Hello‑World

  1. Запустить Docker:
    docker run -d --name hbase -p 16010:16010 harisekhon/hbase
    
  2. Открыть shell:
    docker exec -it hbase bash
    hbase shell
    
  3. Создать таблицу:
    create 'events:clicks', 'cf'
    
  4. Вставить строки:
    put 'events:clicks', 'row1', 'cf:data', 'user1,2025-04-17T10:00'
    put 'events:clicks', 'row2', 'cf:data', 'user2,2025-04-17T10:05'
    
  5. Прочитать данные:
    scan 'events:clicks'
    

Задание 3. ClickHouse Simple OLAP

  1. Запустить Docker:
    docker run -d --name clickhouse -p 8123:8123 clickhouse/clickhouse-server
    
  2. Создать таблицу:
    CREATE TABLE default.sales (
      sale_date Date,
      product String,
      amount UInt32
    ) ENGINE = MergeTree()
    PARTITION BY toYYYYMM(sale_date)
    ORDER BY (sale_date);
    
  3. Загрузить CSV (файл sales.csv):
    curl 'http://localhost:8123/?query=INSERT%20INTO%20default.sales%20FORMAT%20CSV' --data-binary @sales.csv
    
  4. Посчитать продажи по дате:
    SELECT sale_date, sum(amount) AS total
    FROM default.sales
    GROUP BY sale_date
    ORDER BY sale_date;
    

Structured explanation for BigQuery

Для раздела про «Data‑warehouse/SaaS: Google BigQuery», мне нужно создать детальную теоретическую часть с простыми объяснениями. Я начну с введения: что такое хранилище данных и что такое BigQuery. Каждую тему (компоненты, загрузка данных, SQL Engine и т.п.) нужно описать понятным языком, с примерами и аналогиями. Готов добавить простые схемы или фрагменты кода с объяснениями. Важно подобрать практические задания и мини-задачи, с реальными действиями, а не рекомендациями, чтобы сделать процесс обучения более наглядным.

Data‑warehouse/SaaS: Google BigQuery


Зачем BigQuery?

  • Serverless‑архитектура
    Представьте, что вам не нужно покупать и настраивать серверы — вы просто платите за обработанные данные. Как такси вместо личного автомобиля: не нужно думать о ремонте и парковке, платите только за поездку.
  • Отдельное хранилище и вычисления
    Хранить данные можно дешево и надёжно, а за вычисления (запросы) — платить отдельно.
    • Pay‑per‑query: платите, когда запускаете запрос, по объёму прочитанных терабайт.
    • Flat‑rate slots: фиксированная ежемесячная плата за «бронь» определённого числа параллельных вычислительных юнитов (slots), если у вас много запросов.

Основные компоненты BigQuery

  • Dataset
    Как папка в Google Drive — внутри неё лежат таблицы.
  • Table
    Как файл Excel — набранная строка за строкой таблица с названиями колонок.
  • Partition
    Разбиение таблицы на «куски» по дате (или другому полю).
    • Преимущество: запрос читает только нужный «кусок», а не всю таблицу.
  • Cluster
    Сортировка внутри партиции по выбранным колонкам, как индекс, помогает ещё быстрее искать.
  • Slots и Reservation
    • Slot — единица параллельной обработки (как повар, который готовит один заказ).
    • Reservation — бронирование определённого числа «поваров» под ваши задачи, чтобы они не ушли к чужим «заказам».

Загрузка данных

  • Batch‑import (пакетное)
    1. Загрузить CSV/JSON в Google Cloud Storage (GCS) или прямо из локального файла через веб‑интерфейс.
    2. В UI или с помощью bq load указать, в какую таблицу и с какими настройками (разделитель, кодировка) загрузить.
  • Streaming inserts (поток)
    – По одной строке через API (например, из приложения), чтобы сразу можно было запросить свежие данные без ожидания пакета.

SQL‑движок BigQuery

  • ANSI SQL
    Полный набор стандартных команд (SELECT, JOIN, GROUP BY).
  • Аналитические функции
    WINDOW: например, ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY event_time) нумерует события внутри каждого пользователя.
    ARRAY: можно собирать список значений в одну ячейку, например все продукты в одном заказе.
  • Federated queries (внешние таблицы)
    – Запросы прямо к данным в GCS или Cloud SQL без предварительной загрузки в BigQuery.

Стоимость и оптимизация

  • Хранение
    – Около $0.02 за GB в месяц.
    – Оплата за фактический объём: если таблица 100 GB, это $2 в месяц.
  • Обработка
    – $5 за TB прочитанных данных при on‑demand.
    – Сократить стоимость помогут:
    1. Partitioned tables (читаем только нужный период).
    2. Clustered tables (читаете только нужные столбцы быстрее).
    3. Materialized views (предварительно посчитанные результаты).
    4. Query optimizer автоматически выбирает лучший план выполнения.

Безопасность и управление доступом

  • IAM‑роли
    – Наборы прав: кто может читать таблицы, кто — писать или администрировать.
  • Authorized views
    – Представления, через которые пользователи видят только разрешённые колонки или строки.
  • VPC Service Controls
    – Ограничивают доступ к BigQuery из заданных сетей и сервисов.
  • Шифрование
    – На стороне Google (по‑умолчанию) и возможность «принести свой ключ» (BYOK) для шифрования данных.

Интеграция с остальным стеком

  • ELT‑пайплайны
    – Dataflow или Airflow запускают загрузку и трансформации, результат пишет в BigQuery.
  • BI‑коннекторы
    – Google Data Studio, Looker или Metabase напрямую читают таблицы и строят дашборды.
  • Дополнительные инструменты
    – Dataproc/Spark может читать из BigQuery и писать обратно.

Задание 1. Console Quickstart

  1. Создать проект в Google Cloud (если ещё нет).
  2. Перейти в BigQuery UI: нажать «Создать Dataset», дать ему имя my_dataset.
  3. Загрузить CSV
    • В разделе my_dataset нажать «Create table» → Upload, выбрать файл sample.csv из репозитория.
    • Указать разделитель (запятая), в первом ряду заголовки.
  4. Выполнить запрос:
    SELECT category, COUNT(*) AS cnt
    FROM `PROJECT_ID.my_dataset.sample`
    WHERE event_date >= '2025-04-01'
    GROUP BY category
    ORDER BY cnt DESC;
    
  5. Посмотрите объём прочитанных данных — это и будет ваша плата за запрос (MB, GB).

Задание 2. Работа через bq CLI

  1. Установить и инициализировать CLI:
    pip install google-cloud-bigquery
    gcloud init
    gcloud components install bq
    
  2. Создать dataset:
    bq mk my_dataset
    
  3. Загрузить CSV:
    bq load --autodetect --source_format=CSV \
      my_dataset.sample_cli gs://my-bucket/sample.csv
    
  4. Показать схему:
    bq show --schema --format=prettyjson my_dataset.sample_cli
    

Задание 3. Python + pandas

  1. Установить пакет:
    pip install google-cloud-bigquery pandas
    
  2. Написать query.py:
    from google.cloud import bigquery
    import pandas as pd
    
    client = bigquery.Client()
    
    sql = """
      SELECT category, COUNT(*) AS cnt
      FROM `PROJECT_ID.my_dataset.sample`
      GROUP BY category
      ORDER BY cnt DESC
      LIMIT 5
    """
    
    df = client.query(sql).to_dataframe()
    print(df.head())
    
  3. Запустить и убедиться, что вы видите первые пять строк результата в виде DataFrame.

Change Data Capture: Debezium (CDC‑коннекторы к Kafka)

Что такое CDC и зачем оно нужно?

  • CDC (Change Data Capture) – это механизм, который следит за тем, что происходит в базе (вставки, обновления, удаления) и моментально передаёт эти изменения дальше.
  • Зачем:
    • Не нужно каждую ночь полностью перекачивать таблицы – изменения приходят по кусочкам.
    • Минимизируется задержка («latency») от момента, когда кто‑то нажал INSERT, до того, как данные стали доступны в других системах (аналитика, мониторинг).

Архитектура Debezium + Kafka

Архитектура Debezium + Kafka


  1. Kafka Connect cluster – набор сервисов, которые запускают коннекторы.
  2. Debezium Connector – специальный плагин, который читает журнал изменений (binlog/WAL) из СУБД.
  3. Source Task – рабочий поток внутри коннектора, от которого зависит количество параллельных задач.
  4. Kafka topics – тема dbserver1.orders или dbserver1.customers, куда летят события.

Поддерживаемые источники и механизмы

  • СУБД: MySQL, PostgreSQL, Oracle, SQL Server, MongoDB.
  • Как читаем изменения:
    • MySQL: binlog;
    • PostgreSQL: WAL;
    • Коннектор запоминает, до какого места («offset») он дочитал, и при перезапуске продолжает с этого же места.

Формат сообщений и схемы

  • JSON vs Avro/Protobuf + Schema Registry (хранилище схем)
  • Поля сообщения:
    • before – то, что было до изменения;
    • after – что стало после;
    • key – значение первичного ключа (например, order_id);
    • transaction metadata – время, ID транзакции.

Обработка изменений схемы

  • Если вы добавили новый столбец в таблицу, Schema Registry может автоматически обновить схему или потребует ручного согласования.
  • downstream‑сервисы (которым приходят события) должны уметь пропускать неизвестные поля или получать уведомление об изменении.

Гарантии и отказоустойчивость

  • At‑least‑once: каждое изменение придёт минимум один раз (иногда может дублироваться).
  • Exactly‑once (труднее настраивать): приходит ровно одна копия.
  • Масштабирование: каждый Source Task обслуживает свой набор шардов базы (или таблиц), можно увеличить число задач, чтобы читать быстрее.

Среда: локальная машина с Docker и Docker Compose.


Задание 1. Поднять Docker Compose‑стек

  1. Создать docker-compose.yml в пустой папке:
    version: '3.8'
    services:
      zookeeper:
        image: confluentinc/cp-zookeeper:7.4.0
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
    
      kafka:
        image: confluentinc/cp-kafka:7.4.0
        depends_on: [zookeeper]
        environment:
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    
      connect:
        image: debezium/connect:2.4
        depends_on: [kafka]
        environment:
          BOOTSTRAP_SERVERS: 'kafka:9092'
          GROUP_ID: '1'
          CONFIG_STORAGE_TOPIC: 'connect-configs'
          OFFSET_STORAGE_TOPIC: 'connect-offsets'
          STATUS_STORAGE_TOPIC: 'connect-status'
          KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
          VALUE_CONVERTER: 'io.confluent.connect.avro.AvroConverter'
          VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
          PLUGIN_PATH: '/kafka/connect/debezium-connector-mysql'
        volumes:
          - ./connect-plugins:/kafka/connect/debezium-connector-mysql
    
      schema-registry:
        image: confluentinc/cp-schema-registry:7.4.0
        depends_on: [kafka]
        environment:
          SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:9092'
          SCHEMA_REGISTRY_HOST_NAME: 'schema-registry'
          SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8081'
    
      mysql:
        image: mysql:8.0
        environment:
          MYSQL_ROOT_PASSWORD: root
          MYSQL_USER: debezium
          MYSQL_PASSWORD: dbz
          MYSQL_DATABASE: testdb
        ports:
          - '3306:3306'
    
  2. Запустить стек:
    docker-compose up -d
    
  3. Проверить, что все сервисы поднялись:
    docker-compose ps
    

Задание 2. Настроить Debezium‑коннектор для MySQL

  1. Создать файл connector.json рядом с docker-compose.yml:
    {
      "name": "mysql-connector",
      "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.name": "dbserver1",
        "database.include.list": "testdb",
        "table.include.list": "testdb.orders",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.testdb"
      }
    }
    
  2. Подать конфигурацию в Connect API:
    curl -X POST -H "Content-Type: application/json" \
      --data @connector.json \
      http://localhost:8083/connectors
    

Задание 3. Генерация и чтение событий

  1. Войти в MySQL:
    docker exec -it $(docker-compose ps -q mysql) mysql -udebezium -pdbz testdb
    
  2. Создать таблицу и генерить данные:
    CREATE TABLE orders (
      order_id INT PRIMARY KEY,
      total DECIMAL(10,2),
      status VARCHAR(20)
    );
    
    INSERT INTO orders VALUES (1, 100.00, 'NEW');
    UPDATE orders SET status='PAID' WHERE order_id=1;
    DELETE FROM orders WHERE order_id=1;
    
  3. Читать события из Kafka:
    docker exec -it $(docker-compose ps -q kafka) \
      kafka-console-consumer --bootstrap-server kafka:9092 \
      --topic dbserver1.testdb.orders \
      --from-beginning \
      --property print.key=true \
      --property value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer \
      --property schema.registry.url=http://schema-registry:8081
    

Задание 4. Python‑читатель Avro‑сообщений (опционально)

  1. Установить зависимости:
    pip install kafka-python fastavro requests
    
  2. Написать cdc_reader.py:
    from kafka import KafkaConsumer
    from fastavro import schemaless_reader
    import io, requests
    
    # Получаем схему из Schema Registry
    resp = requests.get('http://localhost:8081/subjects/dbserver1.testdb.orders-value/versions/latest')
    schema = resp.json()['schema']
    
    consumer = KafkaConsumer(
        'dbserver1.testdb.orders',
        bootstrap_servers='localhost:9092',
        value_deserializer=lambda m: schemaless_reader(io.BytesIO(m), schema)
    )
    
    for msg in consumer:
        print("Key:", msg.key, "Value:", msg.value)
    
  3. Запустить и убедиться, что вы видите словари Python с полями before и after.

Подготовка теории и практических задач

Нужно подготовить раздел «Событийные источники и очередь: Apache Kafka». Виктор просит продолжить в том же стиле, с простыми объяснениями и примерами. Мы начнем с основ: что такое Kafka, для чего она используется и как устроена. Чтобы было легче понять, будем использовать аналогии, такие как почта или конвейер.

Затем разберемся с такими вопросами, как архитектура Kafka, типы доставки и гарантии, а также масштабируемость и устойчивость.

Событийные источники и очередь: Apache Kafka

Зачем нужен Kafka?

  • Развязка (decoupling)
    Представьте конвейер на фабрике: производственные линии (микросервисы) могут класть посылки (события) в общие ящики (topics), а потребители (аналитика, отчёты) – забирать их в своём темпе, не мешая друг другу.
  • Высокая пропускная способность
    Kafka легко выдерживает десятки тысяч сообщений в секунду, как скоростная автомагистраль для данных.
  • Устойчивость к сбоям
    Дублирование сообщений на разных серверах (реплики) позволяет не потерять посылки, даже если один сервер упал.

Архитектура Kafka

Архитектура Kafka

  • Broker – отдельный сервер Kafka.
  • Topic – «ящик» для посылок‑сообщений.
  • Partition – внутри каждого ящика несколько секций, чтобы писать и читать параллельно.
  • Replica – копии каждой секции на других брокерах для надёжности.
  • Zookeeper vs KRaft – «дирижёр» кластера, который следит за тем, кто главный (leader) и кто подчинён (followers). Современные версии могут работать без Zookeeper, используя встроенный KRaft‑контроллер.
  • Producer – тот, кто отправляет сообщения.
  • Consumer – тот, кто читает.
  • Connect API – готовые коннекторы к базам и другим системам, чтобы не писать свой код.
  • Streams API – библиотека для обработки событий на лету (фильтрация, агрегация).

Гарантии доставки сообщений

  • at‑most‑once – сообщение может быть потеряно, но не продублировано.
  • at‑least‑once – сообщение придёт минимум один раз, может дублироваться.
  • exactly‑once (EOS) – ровно одна копия, без пропусков и дубликатов.
  • Offset – номер последнего прочитанного сообщения в разделе. Представьте счётчик писем в почтовом ящике: вы запомнили, сколько взяли, и при следующем визите начинаете с того же места.

Ретеншн и дедупликация

  • Политики хранения (retention)
    – по времени (например, держим сообщения 7 дней)
    – по размеру (например, не более 100 GB)
  • Compacted topics
    – автоматически оставляют только последнее сообщение для каждого ключа (что удобно для CDC‑сценариев: хранится текущее состояние объекта, а не вся история).

Масштабирование и отказоустойчивость

  • Добавление брокеров и партиций
    – больше партиций = больше параллельных «дорожек» для чтения и записи.
  • ISR (In-Sync Replicas)
    – список реплик, которые в курсе последних изменений.
  • Leader election
    – в каждой партиции выбирается главный «лидер», от которого другие (followers) получают копии.

Интеграция в стек

  1. Производители событий
    – Микросервисы выкладывают события о заказах, кликах, статусах доставки в Kafka.
  2. Downstream‑потребители
    • Spark Streaming, Flink – для real‑time‑аналитики.
    • Debezium Sink – записывает CDC‑события в базы.
    • Custom‑приложения – мониторинг, алерты, архивация в хранилище.

Задание 1. Поднять Docker Compose‑кластер

  1. Создать docker-compose.yml:
    version: '3.8'
    services:
      zookeeper:
        image: confluentinc/cp-zookeeper:7.4.0
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
    
      kafka:
        image: confluentinc/cp-kafka:7.4.0
        depends_on: [zookeeper]
        environment:
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    
  2. Запустить:
    docker-compose up -d
    
  3. Убедиться, что всё работает:
    docker-compose ps
    

Задание 2. Консольные операции

  1. Создать topic:
    docker exec -it $(docker-compose ps -q kafka) \
      kafka-topics --create --topic events --bootstrap-server kafka:9092 --partitions 3 --replication-factor 1
    
  2. Отправить сообщения:
    echo -e "msg1\nmsg2\nmsg3" | \
      docker exec -i $(docker-compose ps -q kafka) \
        kafka-console-producer --topic events --bootstrap-server kafka:9092
    
  3. Прочитать сообщения:
    docker exec -it $(docker-compose ps -q kafka) \
      kafka-console-consumer --topic events --bootstrap-server kafka:9092 --from-beginning
    

Задание 3. Простой код‑продьюсер и консьюмер на Python

  1. Установить:
    pip install kafka-python
    
  2. producer.py:
    from kafka import KafkaProducer
    producer = KafkaProducer(bootstrap_servers='localhost:9092')
    for i in range(10):
        msg = f"message-{i}".encode('utf-8')
        producer.send('events', msg)
    producer.flush()
    print("Отправили 10 сообщений")
    
  3. consumer.py:
    from kafka import KafkaConsumer
    consumer = KafkaConsumer('events', bootstrap_servers='localhost:9092',
                             auto_offset_reset='earliest', enable_auto_commit=True)
    for msg in consumer:
        print("Прочитали:", msg.value.decode('utf-8'))
    
  4. Запустить оба скрипта и убедиться в обмене сообщениями.

Задание 4. Изучение производительности

  1. Измерить время отправки и чтения 1 000 сообщений в тех же скриптах (используйте time.time() вокруг цикла).
  2. Поэкспериментировать с числом партиций (1 → 3 → 6) и сравнить время.

Артефакты и файлы: JFrog Artifactory, файловая система, внешние API, выгрузки Power BI

JFrog Artifactory

  • Зачем нужен репозиторий артефактов?
    Представьте склад готовых деталей: вместо того, чтобы каждый раз собирать программу заново, вы храните готовые «библиотеки» (JAR, Python‑whl), контейнеры (Docker‑образы) и скрипты в одном месте.
  • Типы артефактов
    • JAR / WAR – библиотеки Java
    • Python‑wheel (.whl)
    • Docker‑образы
    • SQL‑скрипты и другие сборки
  • Организация репозиториев
    • Локальный – хранит ваши собственные сборки
    • Удалённый – «проксирует» внешние репозитории (Maven Central, PyPI)
    • Виртуальный – объединяет локальный и удалённый в один «витринный» URL
  • Управление версиями и чистка
    • Каждая сборка хранится с номером версии: my-lib-1.0.0.whl, my-lib-1.0.1.whl.
    • Retention policy и Garbage collection автоматически удаляют старые неиспользуемые артефакты.

Файловая система

  • Когда нужен NFS/SMB
    – Если несколько машин должны одновременно читать/писать файлы так же, как на локальном диске.
  • Object Storage как «файловый» источник
    – S3‑совместимый «шкаф» тоже можно «прикрутить» как диск, но там нет POSIX‑блокировок.
  • Паттерны доступа
    • Монтирование NFS: все видят одинаковые пути /mnt/data
    • SMB на Windows: сетевые папки \\server\share
  • Отказоустойчивость и бэкапы
    – Для NFS ставят RAID, репликацию и регулярные резервные копии на другой сервер.

Внешние API

  • Типы API‑источников
    • REST (URL + методы GET/POST)
    • GraphQL (запросы описывают, какие поля нужны)
  • Аутентификация
    • API‑key – простой ключ в заголовке или параметре
    • OAuth – токены с ограниченным сроком жизни
  • Пагинация и rate‑limit
    • Большие списки «на порции»: ?page=2&per_page=100
    • Ограничение запросов в минуту / час: не больше 1000 вызовов в минуту
  • Форматы ответа
    • Чаще всего JSON, иногда XML
  • Сериализация/десериализация
    • Библиотеки на Python: requests или встроенный http.client
    • Преобразование JSON в словарь: data = response.json()

Выгрузки Power BI

  • Как получить данные из отчёта
    • В веб‑интерфейсе Power BI: Export → CSV или XLSX
    • Через Power BI REST API: настроить сервис-приложение, получить токен и вызвать эндпоинт /reports/{reportId}/export
  • Расписание и автоматизация
    • В самом Power BI можно запланировать обновление и выгрузку
    • Через PowerShell или Python можно по расписанию скачивать готовые файлы
  • Ограничения
    • Максимальный объём одного экспортируемого файла (~100 MB)
    • Формат CSV не всегда сохраняет формат дат и чисел — проверяйте локальные настройки

Задание 1. Работа с Artifactory (CLI)

  1. Войти:
    jfrog rt config
    # ввести URL вашего Artifactory, имя пользователя и API‑ключ
    
  2. Залить Python‑whl:
    jfrog rt u my_package-0.1-py3-none-any.whl my-python-local/
    
  3. Залить Docker‑образ:
    docker tag my-app:latest your.artifactory.local/my-docker-local/my-app:latest
    docker push your.artifactory.local/my-docker-local/my-app:latest
    
  4. Скачать и проверить:
    jfrog rt dl my-python-local/my_package-0.1-py3-none-any.whl .
    sha256sum my_package-0.1-py3-none-any.whl
    # сверить с исходным
    

Задание 2. Чтение из внешнего API (Python)

  1. Установить:
    pip install requests pandas
    
  2. Скрипт fetch_api.py:
    import requests, pandas as pd
    
    url = 'https://api.example.com/data'
    headers = {'Authorization': 'Bearer YOUR_API_KEY'}
    params = {'page': 1, 'per_page': 10}
    
    resp = requests.get(url, headers=headers, params=params)
    data = resp.json()  # список словарей
    df = pd.DataFrame(data[:10])  # первые 10 записей
    df.to_csv('api_data.csv', index=False)
    print("Сохранено 10 строк в api_data.csv")
    

Задание 3. Загрузка выгрузки Power BI

  1. Скачать вручную из Power BI Service:
    • Открыть отчёт → File → Export → CSV
  2. PowerShell‑скрипт (Windows):
    # Установить Power BI cmdlets
    Install-Module -Name MicrosoftPowerBIMgmt.Profile
    Connect-PowerBIServiceAccount
    Export-PowerBIReport -Scope Organization -Id REPORT_ID -OutFile report.csv
    
  3. Открыть в Excel/Sheets и выполнить:
    • Подсчитать общее число строк (стрелка вниз или COUNTA).
    • Среднее по полю (FORMULA: =AVERAGE(B2:B101)).

Задание 4. Файловая система

  1. Смонтировать NFS (Linux):
    sudo mkdir /mnt/data
    sudo mount -t nfs server:/export/data /mnt/data
    
  2. Скопировать JSON:
    cp ~/local_jsons/*.json /mnt/data/
    
  3. Проверить:
    ls /mnt/data | wc -l
    # убедиться, что число файлов совпадает с локальным
    

Ingest данных

Пакетная обработка: Apache Spark (YARN/Kubernetes)

Что такое пакетная обработка и зачем Spark?

  • Batch vs Stream
    Batch (пакетная): собираем большой набор сырых данных за период (например, логи за сутки) и обрабатываем их разом.
    Stream (потоковая): обрабатываем каждое событие сразу, по мере поступления.
  • Почему Spark?
    Многопоточные вычисления на сотнях машин: как фабрика, где каждый работник делает часть работы.
    Ленивые трансформации: вы описываете, что хотите сделать, но реальная работа начинается только при «действительном запуске» (action).
    API DataFrame/RDD:
    • RDD (Resilient Distributed Dataset) – «цепочки данных», которые восстанавливаются при сбое.
    • DataFrame – таблица с именованными колонками, как Excel на кластере.
      Catalyst и Tungsten: внутренние оптимизаторы, которые переписывают ваш код и упаковывают данные для максимальной скорости.

Аналогия: вы составляете рецепт (последовательность шагов обработки), но кухня (Spark) ждёт, когда вы соберёте весь рецепт, и готовит партию блюд одним заходом, оптимизируя маршруты поваров и использование духовок.


Режимы запуска

a) YARN (Hadoop)
  • Интеграция: Spark работает как приложение в экосистеме Hadoop.
  • Очереди (queues): как разные залы в ресторане — заказы (job’ы) могут ждать своей очереди.
  • Scheduler (Capacity/Fair) распределяет ресурсы (ядра и память) между разными командами.
b) Kubernetes
  • Spark Operator: «электронный менеджер», который запускает нужное количество «поваров» (executors) в виде Pod’ов.
  • Dynamic allocation: Spark сам просит у Kubernetes больше или меньше ресурсов в зависимости от нагрузки.
  • Sidecars: дополнительные «помощники» в Pod‑ах для логирования и мониторинга.

Пример:

  • На YARN вы сдаёте «заказ» кухне Hadoop;
  • На Kubernetes ваша «кухня» разворачивается в изолированных контейнерах прямо в облаке.

Архитектура кластера

  • Driver: главный процесс, который читает ваш код и распределяет задачи.
  • Executors: рабочие процессы на узлах, которые выполняют конкретные операции (map, filter, reduce).
  • Fault‑tolerance: если один executor упал, Driver заново отправит этот кусок работы другому — никакой строки не потеряется.

Настройка ресурсов:

--executor-cores 2      # сколько «поваров» одновременно работает в одном executor
--executor-memory 4G    # сколько «ингредиентов» (памяти) выделено на executor

Чтение и запись данных

  1. Коннекторы
    • S3/HDFS: читаем файлы прямо из облака или распределённого хранилища.
    • JDBC: берём данные из любой реляционной БД.
    • Kafka: используем spark.readStream/writeStream для получения/отправки сообщений.
  2. Форматы
    • Parquet/ORC: колоночные форматы с метаданными, очень эффективны для аналитики.
    • Iceberg/Hudi: «лейк‑сторы» с версионностью и возможностью UPDATE/DELETE.
  3. Шардирование
    • df.write.partitionBy("date") — разбиваем большие файлы по датам, чтобы одновременные запросы не читали лишнее.
  4. Сжатие
    • При записи указываем compression="snappy" или "gzip" для экономии диска и сети.

Оптимизация и отладка

  • Spark UI: веб‑интерфейс по адресу http://driver:4040 показывает, сколько времени заняли стадии и задачи.
  • Настройки шифла:
    --conf spark.sql.shuffle.partitions=200  
    

    – контролирует число файлов после shuffle;

  • Join‑стратегии:
    • Broadcast‑join: маленькую таблицу раздаём всем executors (быстро, без shuffle).
    • Shuffle‑join: оба набора данных перегоняются по сети и пересортировываются (медленнее, но подходит для больших таблиц).
  • Кеширование:
    df.cache()
    

    – хранит DataFrame в памяти между несколькими действиями, как охлаждённая заготовка в холодильнике.


  • Устный опрос (5 мин)
    1. Что делает Driver?
    2. Чем RDD отличается от DataFrame?
    3. Что такое ленивые трансформации?
  • Мини‑практика
    Обязательно: локальный пробный запуск помогает «ощутить» распределённые вычисления.

Задание 1. Локальный Spark Quickstart

  1. Запустить Docker:
    docker run -d --name spark-master -p 7077:7077 -p 8080:8080 bitnami/spark:latest
    
  2. Открыть spark-shell:
    docker exec -it spark-master spark-shell --master spark://localhost:7077
    

Задание 2. Простой PySpark‑скрипт

  1. Создать файл clicks_stats.py:
    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
        .appName("ClicksCount") \
        .master("local[*]") \
        .getOrCreate()
    
    # Считаем CSV (можно заменить на локальный путь или s3a://…)
    df = spark.read.csv("data/clicks.csv", header=True, inferSchema=True)
    total = df.count()
    unique_users = df.select("user_id").distinct().count()
    
    print(f"Всего записей: {total}, уникальных пользователей: {unique_users}")
    
    # Сохраняем результат в Parquet
    stats_df = spark.createDataFrame(
        [(total, unique_users)], ["total_clicks", "unique_users"]
    )
    stats_df.write.mode("overwrite").parquet("output/stats.parquet")
    
    spark.stop()
    
  2. Запустить:
    spark-submit --master local[*] clicks_stats.py
    

Задание 3. Запуск на YARN или Kubernetes (опционально)

  1. YARN:
    spark-submit --master yarn --deploy-mode cluster clicks_stats.py
    

    – зайдите в YARN UI (обычно http://<rm-host>:8088) и найдите ваш job.

  2. Kubernetes:
    spark-submit \
      --master k8s://https://<k8s-api>:6443 \
      --deploy-mode cluster \
      --conf spark.kubernetes.container.image=bitnami/spark:latest \
      clicks_stats.py
    

    – посмотрите логи в kubectl get pods.

Задание 4. Тюнинг‑челлендж

  1. Изменить параметры:
    spark-submit \
      --master local[*] \
      --conf spark.sql.shuffle.partitions=10 \
      --conf spark.executor.cores=2 \
      clicks_stats.py
    
  2. Замерить время выполнения (до и после).
  3. Определить, какая конфигурация быстрее на ваших данных, и пояснить, почему.

Потоковая обработка: Spark Streaming

Что такое потоковая обработка и чем она отличается от пакетной?

  • Batch (пакетная): собираем «кучу» записей (например, логи за сутки) и обрабатываем всех сразу.
  • Streaming (потоковая): обрабатываем события по мере их поступления, чуть задерживая их в «микропакетах» (micro‑batches) или «непрерывно» (Continuous Processing).
  • Плюс низкой задержки: результаты появляются почти сразу, как в реальном времени.

Основы Structured Streaming

  • Spark Session → StreamingQuery
    1. Вы создаёте SparkSession.
    2. Определяете источник (readStream) и приёмник (writeStream).
    3. Запускаете query.start() и он работает «до кнопки стоп».
  • Event time vs Processing time
    • Processing time — когда Spark получил событие.
    • Event time — когда событие реально произошло (по полю timestamp).
  • Watermarks
    Позволяют игнорировать очень «запоздавшие» события: вы говорите «ждать до 1 минуды» (максимальный лаг), и более старые события не портят результаты.
  • Triggers и Checkpointing
    • Trigger задаёт интервал между микробатчами, например Trigger.ProcessingTime("30 seconds").
    • Checkpoint сохраняет состояние (offset’ы, агрегаты) в папку, чтобы после сбоя возобновить работу без дубликатов.
  • Гарантии доставки
    Spark Structured Streaming может обеспечить exactly‑once, если правильно настроить источник/приёмник и чекпойнты.

  • Устный опрос (5 мин):
    1. Что такое микробатч?
    2. Чем event time отличается от processing time?
    3. Зачем нужны watermarks?
    4. Как Spark помнит, что уже обработал (checkpoint)?

Задание 1. File‑stream Quickstart

  1. Создайте папку в вашем проекте:
    mkdir input
    
  2. Запустите скрипт file_stream.py:
    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
        .appName("FileStreamDemo") \
        .master("local[*]") \
        .getOrCreate()
    
    df = spark.readStream \
        .option("header", "true") \
        .csv("input/")  # читает новые CSV-файлы
    
    query = df.writeStream \
        .format("console") \
        .option("truncate", False) \
        .start()
    
    query.awaitTermination()
    
  3. В другом терминале скопируйте туда тестовый CSV:
    cp sample1.csv input/
    sleep 5
    cp sample2.csv input/
    
  4. Увидьте в консоли, как Spark немедленно выводит строки из каждого нового файла.

Задание 2. Kafka‑stream Starter

  1. Поднимите Kafka (Docker‑Compose или локально).
  2. Producer отправляет:
    echo -e "user1,click\nuser2,view" | \
    kafka-console-producer --topic events --bootstrap-server localhost:9092
    
  3. Скрипт kafka_stream.py:
    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
        .appName("KafkaStreamDemo") \
        .master("local[*]") \
        .getOrCreate()
    
    df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "events") \
        .load()
    
    # предполагаем, что value — строка "user,event"
    parsed = df.selectExpr("CAST(value AS STRING) as v") \
        .selectExpr("split(v,',')[0] as user", "split(v,',')[1] as evt")
    
    counts = parsed.groupBy("evt").count()
    
    query = counts.writeStream \
        .outputMode("complete") \
        .format("console") \
        .start()
    
    query.awaitTermination()
    
  4. Запустите скрипт и наблюдайте, как меняются счётчики для click и view.

Задание 3. Windowed‑aggregation

  1. Расширьте kafka_stream.py, добавив имитацию поля времени:
    from pyspark.sql.functions import current_timestamp, window
    
    # после parsed:
    withTime = parsed.withColumn("ts", current_timestamp())
    
    windowed = withTime \
        .withWatermark("ts", "1 minute") \
        .groupBy(window("ts", "1 minute", "30 seconds"), "evt") \
        .count()
    
  2. Запустите и отправляйте новые сообщения в топик, наблюдая, как каждые 30 секунд Spark выводит число событий за последнюю минуту.

Задание 4. Checkpoint и exactly‑once

  1. Добавьте в writeStream опцию:
    .option("checkpointLocation", "chkpoint/")
    
  2. Запустите скрипт, отправьте несколько сообщений, остановите (Ctrl+C) и перезапустите.
  3. Убедитесь, что при перезапуске Spark не выводит уже обработанные сообщения повторно.

Ad‑hoc SQL: Trino (распределённый SQL‑движок)

Что такое Trino и зачем он нужен?

  • Trino — это сервис, который позволяет писать обычный SQL и получать данные сразу из разных хранилищ (S3‑хранилище, базы, Elasticsearch и т. д.).
  • Почему он полезен:
    • Не нужно копировать всю информацию в одну базу — Trino подаёт вам данные на лету.
    • Вы можете делать быстрые запросы по большим таблицам без разворачивания тяжёлых кластеров.

Как он устроен?

  • Coordinator («координатор»)
    – Принимает ваш SQL, разбивает его на маленькие задачи и раздаёт их рабочим.
  • Workers («рабочие узлы»)
    – Выполняют часть работы: читают файлы или таблицы и пересылают промежуточные результаты обратно.
  • Shuffle / pipelines
    – Если нужно соединить данные из разных workers, они обмениваются кусочками «по сети» и продолжают счёт.

Каталоги и коннекторы

  • Каталог — файл настроек, в котором прописано, где и как лежат данные.
    Примеры:

    • Hive — для таблиц на S3 с метаданными в Glue или Hive Metastore.
    • Iceberg/Hudi — версии таблиц и быстрые обновления.
    • JDBC — подключение к обычным базам (PostgreSQL, MySQL).
    • Kafka/Elastic/MongoDB — другие хранилища подключаем «на лету».

Возможности SQL в Trino

  • Полный ANSI SQL, плюс:
    • Оконные функции: ROW_NUMBER(), SUM() OVER (PARTITION BY …)
    • ROLLUP, GROUPING SETS для расширенных отчётов
    • Массивы (ARRAY) и структуры (ROW) для сложных данных

Оптимизация запросов

  • Pushdown
    – Trino старается отправить фильтр и выборку колонок как можно ближе к данным, чтобы меньше передавать по сети.
  • Партиционирование
    – Если таблица разбита по дате или другому полю, Trino читает только нужные папки.
  • Параметры
    • query.max-memory — сколько памяти может занять запрос.
    • spill-to-disk — если памяти не хватает, Trino сбрасывает часть данных на диск.
    • task.concurrency — сколько параллельных потоков у каждого worker.

Безопасность и доступ

  • Аутентификация: LDAP, Kerberos или JWT — вы доказываете, кто вы, и Trino пускает вас дальше.
  • RBAC и ACL
    – Можно дать права «только читать таблицы в каталоге analytics» или «писать только в каталог staging».

Когда использовать Trino?

  • Ad‑hoc‑аналитика
    – Если нужно быстро проверить гипотезу «сколько товаров продали в марте», без настройки ETL.
  • Federated queries
    – Соединить данные из S3 (логи кликов) и из базы заказов одним запросом.

  1. Устный опрос (5 мин)
    • «Кто расписывает план выполнения SQL‑запроса?»
    • «Для чего нужны коннекторы?»
    • «Что такое pushdown?»
  2. Мини‑практика
    Непременно запускаем Trino, пробуем свои SQL и смотрим результаты «живьём».

Задание 1. Запуск Trino в Docker

  1. Старт:
    docker run -d --name trino -p 8080:8080 trinodb/trino
    
  2. Подключение через CLI:
    docker exec -it trino trino \
      --server localhost:8080 \
      --catalog tpch \
      --schema tiny
    

Задание 2. Простые аналитические запросы

В CLI или в UI Trino выполните:

  1. Посчитайте число стран:
    SELECT nationkey, COUNT(*) 
    FROM tpch.tiny.nation 
    GROUP BY nationkey;
    
  2. Вычислите выручку по датам:
    SELECT l_shipdate AS ship_date, 
           SUM(l_extendedprice * (1 - l_discount)) AS revenue 
    FROM tpch.tiny.lineitem 
    WHERE l_shipdate >= DATE '1995-01-01' 
    GROUP BY ship_date 
    ORDER BY ship_date;
    

Задание 3. Федерация источников

  1. Настройте Hive‑каталог над папкой с локальными CSV или Parquet.
    • Создайте файл catalog/hive.properties:
      connector.name=hive-hadoop2
      hive.metastore.uri=thrift://<ваш-metastore>:9083
      
  2. Выполните JOIN:
    SELECT n.name, c.value 
    FROM tpch.tiny.nation n 
    JOIN hive.default.local_table c 
      ON n.nationkey = c.nationkey
    LIMIT 10;
    

Задание 4. Изучение производительности

  1. Сравните один и тот же запрос на tpch.tiny (малый объём) и на tpch.sf1 (scale-factor 1):
    EXPLAIN (TYPE DISTRIBUTED) 
    SELECT COUNT(*) 
    FROM tpch.sf1.lineitem 
    WHERE l_shipdate >= DATE '1995-01-01';
    
  2. Добавьте фильтр по статусу и снова выполните EXPLAIN:
    EXPLAIN (TYPE DISTRIBUTED)
    SELECT COUNT(*) 
    FROM tpch.sf1.lineitem 
    WHERE l_shipdate >= DATE '1995-01-01' 
      AND l_returnflag = 'R';
    
  3. Сравните планы и отметьте, как pushdown сокращает обмен данными.

Хранилище (озеро данных)

Общая инфраструктура: Yandex Object Storage (или S3)

Роль Object Storage в Data Lake

  • Почему S3‑совместимое?
    – Большие объёмы «сырых» данных (логи, экспорт из БД) удобно держать не в базе, а в «облачном шкафу», который масштабируется от гигабайт до петабайт.
    – Простой HTTP‑интерфейс (S3 API) понятен многим инструментам: Spark, Trino, Airflow.
  • Характеристики
    Масштабируемость: автоматически растёт по мере добавления данных.
    Долговечность: данные хранятся в нескольких копиях внутри региона.
    Консистентность: после успешной записи вы гарантированно увидите свежий объект.

Архитектурные принципы

  • Flat‑namespace vs псевдодиректории
    – На самом деле у бакета нет папок — есть ключи logs/2025/04/17.csv. Мы просто используем слеши / для группировки.
  • Префиксы
    – Чтобы распределить запросы равномерно по «шкафам», ключи строят с учётом хэша или разбивки по дате:
    logs/2025/04/17/events.csv
    backup/2025/04/daily.parquet
    
  • Regions и AZ
    – Данные реплицируются внутри зоны доступности (AZ).
    – Для повышенной надёжности можно настроить кросс‑региональную репликацию.

Управление данными и стоимость

  • Классы хранения
    1. Standard – горячие данные (часто читаются/пишутся).
    2. Infrequent Access – редко читаются (например, старые логи).
    3. Archive – почти не читаются, только для долгосрочных бэкапов.
  • Модель оплаты
    • Хранение – платите за объём GB⋅мес.
    • Запросы – за количество операций PUT/GET.
    • Исходящий трафик – за каждый гигабайт, вышедший из региона.

Подключение и доступ

  • S3‑API и CLI/SDK
    # Yandex:
    yc storage bucket create --name my-bucket
    # AWS:
    aws s3 mb s3://my-bucket
    
  • Права
    IAM‑роли и политики на уровне бакета/ключа: кто может читать, кто писать.
  • Шифрование
    SSE‑S3 (ключ провайдера) или SSE‑KMS (ваш ключ в KMS).
    Client‑side: шифруете данные до отправки.

Надёжность и отказоустойчивость

  • SLA
    – Гарантия доступности (обычно 99.9 %).
  • Репликация
    – Внутри региона (несколько копий), опционально между регионами.
  • Versioning и MFA Delete
    – Включаете версионирование, и при удалении старые версии сохраняются.
    – MFA Delete требует двухфакторную авторизацию при удалении версий.

Устный опрос vs практика

  • Устный опрос
    – Проверяем ключевые термины: «что такое префикс?», «какие есть классы хранения?», «зачем versioning?».
  • Мини‑практика
    – Обязательно: только при личном опыте работы через CLI/SDK вы почувствуете разницу между классами и научитесь быстро управлять бакетом.

Задание 1. Создание и настройка бакета

  1. Через CLI
    # Yandex Cloud
    yc storage bucket create --name my-data-lake \
      --default-storage-class standard
    
    # AWS S3
    aws s3 mb s3://my-data-lake
    
  2. Включить версионирование
    # Yandex Cloud
    yc storage bucket versioning enable --name my-data-lake
    
    # AWS
    aws s3api put-bucket-versioning \
      --bucket my-data-lake \
      --versioning-configuration Status=Enabled
    
  3. Добавить lifecycle‑правило для перевода старых объектов в архив:
    # Пример для AWS:
    aws s3api put-bucket-lifecycle-configuration \
      --bucket my-data-lake \
      --lifecycle-configuration '{
        "Rules":[
          {
            "ID":"ArchiveOld",
            "Filter": {"Prefix":""},
            "Status":"Enabled",
            "Transitions":[
              {"Days":30, "StorageClass":"STANDARD_IA"},
              {"Days":365, "StorageClass":"GLACIER"}
            ]
          }
        ]
      }'
    

Задание 2. Загрузка и чтение данных

  1. Загрузить небольшой файл:
    aws s3 cp sample.csv s3://my-data-lake/data/sample.csv
    
  2. Скачать и проверить checksum:
    aws s3 cp s3://my-data-lake/data/sample.csv ./downloaded.csv
    sha256sum sample.csv downloaded.csv
    

Задание 3. Эксперимент с классами хранения

  1. Перевести объект в Infrequent Access:
    aws s3 cp s3://my-data-lake/data/sample.csv \
      s3://my-data-lake/data/sample_ia.csv \
      --storage-class STANDARD_IA
    
  2. Перевести в архив (Glacier или Archive):
    aws s3 cp s3://my-data-lake/data/sample.csv \
      s3://my-data-lake/data/sample_archive.csv \
      --storage-class GLACIER
    
  3. Замерить время чтения каждого варианта:
    time aws s3 cp s3://my-data-lake/data/sample_*.csv ./ 
    

Задание 4. Подключение из аналитики

  1. В Trino/Hive прописать каталог, например в hive.properties:
    connector.name=hive-hadoop2
    hive.s3.aws-access-key=YOUR_KEY
    hive.s3.aws-secret-key=YOUR_SECRET
    hive.s3.bucket=my-data-lake
    hive.s3.endpoint=s3.your-region.amazonaws.com
    
  2. Запрос в Trino:
    SELECT * 
    FROM hive.default."data/sample.csv" 
    LIMIT 10;
    
  3. Убедитесь, что Trino читает строки из файла напрямую.

Табличные форматы: Apache Hudi, Delta Lake, Apache Iceberg

Зачем нужны табличные форматы в Data Lake?

Когда мы просто складываем файлы (CSV, Parquet) в «облачный шкаф» (S3, YCS), каждый раз при обновлении данных приходится перезаписывать целые файлы. Табличные форматы решают следующие задачи:

  1. ACID‑транзакции
    – Все или ничего: при записи данных вы либо полностью сохраните новый вариант таблицы, либо оставите старый без «половинчатых» изменений (как банковские переводы).
  2. Time‑travel (возврат в прошлое)
    – Можно запросить, как выглядели данные «на вчерашний день» или в любой момент в прошлом.
  3. Схемовая эволюция
    – Добавлять или убирать колонки без ручного переписывания всех файлов.
  4. Компакты
    – Сливать мелкие файлы в крупные для скорости чтения.

Apache Hudi

  • Merge On Read (MOR) vs Copy On Write (COW)
    • COW: при каждом обновлении перезаписываем файлы Parquet целиком.
    • MOR: изменения сначала пишем в «журнал» (лог), а затем периодически «сливаем» их с основными файлами.
  • Инкрементальные upsert‑операции
    – Добавляете новые строки и обновляете старые без переработки всей таблицы.
  • Встроенный индекс
    – Hudi хранит метаданные о местоположении строк, чтобы быстро находить, что нужно обновить.
  • Кластеризация (Clustering)
    – Позволяет группировать похожие по ключу записи в одни файлы для ускорения запросов.

Delta Lake

  • Транзакционный log‑файл _delta_log
    – Каждое изменение (insert/update/delete) фиксируется в отдельном JSON‑журнале, а данные хранятся в Parquet.
  • Schema enforcement vs schema evolution
    • Enforcement: не дадим записать данные, которые не соответствуют текущей таблице.
    • Evolution: можно безопасно добавить новую колонку.
  • OPTIMIZE / VACUUM
    OPTIMIZE объединяет мелкие файлы в крупные «сгустки».
    VACUUM удаляет устаревшие файлы после того, как их версии вам больше не нужны.

Apache Iceberg

  • Manifest‑ и метафайлы (snapshot‑based)
    – Iceberg хранит списки файлов (manifests) и снимки (snapshots), что упрощает управление версиями.
  • Hidden partitioning
    – Вы можете задать правила разбиения данных без «захардкоженных» директорий.
  • Плавающие типы колонок
    – Возможность изменять типы колонок (например, intlong) без переезда таблицы.

Каталог метаданных

Чтобы Spark, Trino или другие движки «видели» ваши таблицы, нужно хранить информацию о них в Hive Metastore или AWS Glue Catalog. Каталог содержит:

  • Имя таблицы и её расположение в S3/YCS
  • Схему (список колонок и их типы)
  • Партитции (если есть)

Сравнение и случаи применения

Формат Обновления Time‑travel Интеграция Когда выбирать
Hudi MOR/COW Да Spark, Flink, Hive, Trino Частые инкременты и быстрые upsert‑задачи
Delta COW + журнал Да Databricks, Spark, Trino Активное редактирование и time‑travel
Iceberg Snapshot‑based Да Spark, Trino, Flink, Hive Большие таблицы, сложные эволюции схем

Устный опрос vs мини‑практика

  • Устный опрос (5 мин)
    – Сравнить ключевые характеристики:
    1. Как Hudi понимает, где обновлять строку?
    2. Чем журнал Delta отличается от манифестов Iceberg?
    3. Как задать новую колонку без ошибок?
  • Мини‑практика
    Обязательно – только hands‑on покажет, как делаются upsert, compaction и time‑travel на самом деле.

Задание 1. Delta Lake Quickstart

  1. Локально запустить Spark с поддержкой Delta (загрузите delta-core JAR).
  2. Создать Delta‑таблицу:
    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
        .appName("DeltaDemo") \
        .config("spark.jars.packages", "io.delta:delta-core_2.12:2.2.0") \
        .getOrCreate()
    
    data = spark.createDataFrame([(1,"Alice"),(2,"Bob")], ["id","name"])
    data.write.format("delta").save("/tmp/delta-table")
    
  3. Вставить и обновить:
    spark.read.format("delta").load("/tmp/delta-table") \
        .createOrReplaceTempView("t")
    spark.sql("INSERT INTO delta.`/tmp/delta-table` VALUES (3,'Carol')")
    spark.sql("UPDATE delta.`/tmp/delta-table` SET name='Alice Cooper' WHERE id=1")
    
  4. Time‑travel:
    history = spark.sql("DESCRIBE HISTORY delta.`/tmp/delta-table`")
    history.show()
    # Предположим, предыдущий version = 0
    df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
    df.show()
    

Задание 2. Apache Hudi Hello‑World

  1. Spark + Hudi‑коннектор (добавьте hudi-spark-bundle):
    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
        .appName("HudiDemo") \
        .config("spark.jars.packages","org.apache.hudi:hudi-spark3-bundle_2.12:0.12.0") \
        .getOrCreate()
    
    data = spark.createDataFrame([(1,"x")], ["id","value"])
    data.write.format("hudi") \
        .option("hoodie.table.name","hudi_table") \
        .option("hoodie.datasource.write.recordkey.field","id") \
        .option("hoodie.datasource.write.table.name","hudi_table") \
        .option("hoodie.datasource.write.operation","insert") \
        .save("/tmp/hudi-table")
    
  2. Upsert:
    updated = spark.createDataFrame([(1,"y")], ["id","value"])
    updated.write.format("hudi") \
        .option("hoodie.datasource.write.operation","upsert") \
        ... \
        .save("/tmp/hudi-table")
    
  3. Посмотреть коммиты через Hoodie CLI:
    hudi-cli> connect --path /tmp/hudi-table
    hudi-cli> commits show
    

Задание 3. Iceberg Simple Table

  1. Spark + Iceberg (добавьте iceberg-spark3-runtime):
    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
        .appName("IcebergDemo") \
        .config("spark.jars.packages","org.apache.iceberg:iceberg-spark3-runtime:1.2.0") \
        .config("spark.sql.catalog.local","org.apache.iceberg.spark.SparkCatalog") \
        .config("spark.sql.catalog.local.type","hadoop") \
        .config("spark.sql.catalog.local.warehouse","/tmp/iceberg-warehouse") \
        .getOrCreate()
    
    # Создать таблицу
    spark.sql("CREATE TABLE local.db.ice_test (id INT, name STRING) USING iceberg")
    # Вставить данные
    spark.sql("INSERT INTO local.db.ice_test VALUES (1,'A'), (2,'B')")
    
  2. Time‑travel:
    # Получить snapshot-id
    snaps = spark.sql("CALL local.system.snapshots('db.ice_test')").show()
    # Предположим snapshot_id = 1
    df_old = spark.read \
        .format("iceberg") \
        .option("snapshot-id", 1) \
        .load("local.db.ice_test")
    df_old.show()
    

Задание 4. Сравнительный SELECT

  1. Для одного набора данных (например, изменение записей за последние 24 ч) выполните аналогичный запрос в каждой таблице:
    SELECT COUNT(*) FROM <format>.table WHERE ts >= current_timestamp() - interval '1' day
    
  2. Сравните
    • Время выполнения
    • Объём прочитанных данных (см. Spark UI или df.queryExecution.logicalStats)
  3. Сделайте вывод, какой формат на ваших данных работает быстрее и почему.

Метаданные: Hive Metastore / AWS Glue Catalog

Зачем нужен единый каталог метаданных?

Представьте библиотеку, где книжки (таблицы) лежат на полках (S3/файловые хранилища), но без картотеки вы не найдёте нужную книгу быстро. Каталог метаданных — это как картотека: в ней хранится информация, где лежит каждая таблица, какие у неё колонки, как она разбита на части (партиции). Spark, Trino, Athena и другие инструменты читают эту картотеку, а не «гадывают» по именам файлов.

Hive Metastore

  • Как устроен
    – Сервис на Thrift, который отвечает на запросы «где эта таблица?» и «какая у неё схема?»
    – Сам Metastore хранит данные в реляционной базе (MySQL или Postgres).
  • Основные объекты
    1. Database: логическая группа таблиц (как папка).
    2. Table: имя, список колонок и типы.
    3. Partition: подтаблицы по ключам (например, дата).
    4. StorageDescriptor: где лежат файлы, в каком формате, какая сериализация (SerDe).
  • Регистрация таблиц
    Internal: данные лежат в стандартном каталоге Metastore.
    External: вы сами указываете путь в S3/на диск — Metastore только хранит мета‑информацию.

AWS Glue Catalog

  • Serverless‑альтернатива
    – Вместо своего Metastore вы используете управляемый AWS‑сервис — не нужно ставить и обновлять свой сервер.
  • Интеграция
    – Athena, Redshift Spectrum, EMR, Glue ETL автоматически читают и пишут в Glue Catalog.
  • Glue Crawlers
    – Специальные «ползунки», которые ходят по вашему S3‑бакету, находят файлы CSV/Parquet и сами создают таблицы в каталоге.

Управление схемами и эволюция

  • Добавление колонок
    – Можно дописать новую колонку без изменения старых файлов — инструменты будут её «видеть».
  • Удаление или изменение типа
    – Требует осторожности: некоторые движки не поддерживают «сужение» типа (например, из string в int).

Безопасность и права доступа

  • IAM‑права
    – Кто может читать каталог, кто — писать новые таблицы.
  • Lake Formation (AWS)
    – Дополнительный уровень контроля: кто из пользователей может видеть определённые строки или колонки.

Надёжность и производительность

  • SLA
    – У Glue Catalog высокая доступность, вам не нужно «держать» свой Metastore.
  • Кэширование
    – Некоторые движки (Spark, Trino) могут кэшировать мета‑информацию, чтобы не спрашивать каждый раз каталог.

Устный опрос и мини‑практика

Комбинируем оба подхода: сначала короткий устный опрос, чтобы проверить основные понятия, а затем практические упражнения.

  • Устный опрос (5 мин)
    1. Где хранится информация о физическом расположении файлов таблицы?
    2. В чём отличие Hive Metastore от Glue Catalog?
    3. Как добавить новую колонку без потери старых данных?
  • Мини‑практика
    Обязательно: только на реальных примерах вы почувствуете, как движки находят и читают таблицы через каталог.

Задание 1. Hive Metastore локально

  1. Запустите Docker‑Metastore (например, с Postgres):
    # Запустить Postgres
    docker run -d --name hive-metastore-db -e POSTGRES_PASSWORD=pass -p 5432:5432 postgres
    
    # Запустить простой Hive Metastore
    docker run -d --name hive-metastore \
      -e METASTORE_DB_HOST=host.docker.internal \
      -e METASTORE_DB_NAME=postgres \
      -e METASTORE_DB_PASSWORD=pass \
      -p 9083:9083 \
      your-hive-metastore-image
    
  2. Через Beeline или Spark SQL:
    -- Подключитесь к Metastore
    !connect jdbc:hive2://localhost:9083/default
    
    CREATE DATABASE IF NOT EXISTS demo;
    CREATE EXTERNAL TABLE demo.events (
      id STRING,
      event_time TIMESTAMP,
      payload STRING
    )
    STORED AS PARQUET
    LOCATION 's3a://your-bucket/demo/events/';
    SHOW TABLES IN demo;
    

Задание 2. AWS Glue Catalog

  1. Создайте базу demo_glue через AWS CLI:
    aws glue create-database --database-input '{"Name":"demo_glue"}'
    
  2. Запустите Glue Crawler на S3‑префиксе s3://your-bucket/demo/events/:
    aws glue create-crawler \
      --name demo-crawler \
      --role AWSGlueServiceRole \
      --database-name demo_glue \
      --targets '{"S3Targets":[{"Path":"s3://your-bucket/demo/events/"}]}'
    aws glue start-crawler --name demo-crawler
    
  3. Проверьте в AWS Console → Glue → Databases → demo_glue → Tables.

Задание 3. Чтение через Trino и Spark

  1. В Trino в catalog/hive.properties или glue.properties пропишите доступ к вашему Metastore/Glue:
    connector.name=hive-hadoop2
    hive.metastore.uri=thrift://<hive-host>:9083
    # или для Glue:
    connector.name=glue
    aws.region=us-east-1
    
  2. Запрос в Trino:
    SELECT * 
    FROM demo.events 
    LIMIT 10;
    
  3. В Spark:
    df = spark.table("demo.events")
    df.show(10)
    

Задание 4. Эволюция схемы

  1. Добавьте колонку:
    ALTER TABLE demo.events 
      ADD COLUMNS (user_id STRING);
    
  2. Запишите пару новых строк с этим полем:
    INSERT INTO demo.events VALUES ('evt1', CURRENT_TIMESTAMP, 'data', 'user123');
    
  3. Убедитесь, что обе колонки (payload, user_id) отображаются:
    SELECT id, payload, user_id 
    FROM demo.events 
    LIMIT 5;
    

Вычисления и Ad-Hoc

Apache Spark: архитектура кластеров в Yandex Cloud, оптимизация job’ов (Tungsten, Catalyst, tuning)

Архитектура Spark‑кластера в Yandex Cloud

  • Data Proc vs свой Spark на Kubernetes
    • Yandex Data Proc: готовый «облачный сервис» — вы указываете размер кластера, а всё остальное (установка Spark, мониторинг, авто‑скейлинг) берёт на себя Yandex.
    • Spark на Kubernetes: вы разворачиваете Spark‑Operator в своём кластере K8s, сами управляете подами, конфигурацией и ресурсами.
  • Роли узлов
    • Мастер (Driver) — главный процесс, который распределяет задачи.
    • Воркеры (Executors) — рабочие «повара», которые обрабатывают кусочки данных.
    • При HA‑настройке могут использовать Zookeeper для согласования, но в Data Proc это уже готово «из коробки».
  • Интеграция с облаком
    • Object Storage (s3a:// или storage.yandexcloud.net) для чтения/записи данных.
    • IAM‑роли: Spark‑узлы получают временные токены и могут обращаться к бакетам без передачи ключей.
    • VPC: весь трафик между узлами и хранилищем остаётся в приватной сети.

Механизмы оптимизации внутри Spark

  • Catalyst Optimizer
    1. Логический план: Spark смотрит, что вы написали в SQL/DataFrame.
    2. Применяет правила (rule‑based) — объединяет фильтры, убирает лишние шаги.
    3. Cost‑based оптимизации — выбирает лучший способ выполнить join или агрегацию, опираясь на статистику.
  • Tungsten Execution Engine
    • Off‑heap память: хранит данные вне «кучи» Java, чтобы сборщик мусора летал реже.
    • Код‑генерация (whole‑stage codegen): Spark сам «печатает» Java‑байт‑код, который очень быстро работает.
    • Cache‑aware структуры: данные упаковываются так, чтобы быстро загружаться в кэш процессора и экономить память.

Основные параметры тюнинга

  • spark.sql.shuffle.partitions — сколько кусочков (partitions) создаётся при shuffle (пересортировке).
  • spark.executor.memory и spark.executor.cores — сколько «пищи» (памяти) и сколько «поваров» (ядер) на каждого executor.
  • spark.driver.memory — сколько памяти у главного процесса.
  • Сериализация:
    • Java (по‑умолчанию) — медленнее.
    • Kryo — быстрее и компактнее, рекомендуется для больших объёмов.
  • Join‑стратегии:
    • Broadcast — маленькую таблицу раздаём всем executors (быстро).
    • Shuffle‑join — пересортировка больших таблиц, но иногда неизбежна.
  • Dynamic Allocation и checkpointing — сами регулируют число executors и защищают от потерь состояния.

Мониторинг и отладка

  • Spark UI
    • Stages: этапы вычисления, видно, какие tasks дольше всего.
    • Storage: какие RDD/DataFrame закешированы и сколько памяти занимают.
    • SQL: план выполнения SQL‑запросов.
    • Environment: все настройки вашего job’а.
  • Prometheus/Grafana
    • Собирают метрики: нагрузку CPU, задержку, число spill’ов на диск.
  • Типичные проблемы
    • Skew: когда один executor получает слишком много данных и работает медленно.
    • Spill to disk: когда не хватает памяти, данные сливаются на диск, и job тормозит.

Устный опрос vs мини‑практика

  • Устный опрос (5 мин)
    1. Что делает Driver?
    2. Чем Catalyst отличается от Tungsten?
    3. Зачем уменьшать shuffle.partitions?
  • Мини‑практика
    Обязательно: только запустив job и изменив параметры, вы почувствуете разницу в скорости и ресурсоёмкости.

Задание 1. Поднятие кластера Data Proc

  1. Через UI Yandex Cloud → Data Proc → Create cluster
    • 1 мастер + 2 воркера, стандартный образ Spark.
  2. Проверить в разделе YARN ResourceManager или Spark History Server, что узлы «забиты» и готовы к задачам.

Задание 2. Выполнение простого Spark‑job

  1. Подготовьте word_count.py:
    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder.appName("WordCount").getOrCreate()
    text = spark.read.text("s3a://your-bucket/sample.txt")
    counts = text.rdd.flatMap(lambda r: r[0].split()) \
        .map(lambda w: (w,1)).reduceByKey(lambda a,b: a+b)
    counts.show()
    spark.stop()
    
  2. Запустить:
    spark-submit \
      --master yarn \
      --deploy-mode cluster \
      word_count.py
    
  3. Посмотреть логи и Spark UI — убедиться, что задача выполнилась успешно.

Задание 3. Тюнинг‑челлендж

  1. Сменить spark.sql.shuffle.partitions:
    --conf spark.sql.shuffle.partitions=50
    
  2. Переключить сериализацию:
    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
    
  3. Сравнить
    • Время выполнения (см. Spark UI или логи).
    • Число spill’ов и загрузку памяти.

Задание 4. Анализ плана

  1. Откройте Spark UI → SQL Tab → DAG Visualization.
  2. Найдите шаги shuffle и узкие места (большие стадии, spill).
  3. Скриншот части плана, где Catalyst применил push‑down фильтра или где видно много shuffle‑tasks.
  4. Запишите, какие шаги можно оптимизировать (уменьшить partitions, добавить broadcast).

Trino: подключение к разным хранилищам, push‑down, federated queries

Конфигурация каталогов

  • Trino хранит настройки соединений в файлах catalog/*.properties.
  • Каждый файл описывает один «каталог» — источник данных, например, PostgreSQL или S3.
  • Внутри свойства, самые важные:
    connector.name=postgresql           # тип коннектора
    connection-url=jdbc:postgresql://…  # куда подключаться
    username=…                          # кто подключается
    password=…                          # пароль или токен
    
  • Аналогично для S3/Hive:
    connector.name=hive-hadoop2
    hive.metastore.uri=thrift://…
    

Типы коннекторов

  • Hive: таблицы на S3/Object Storage + метаданные в Metastore.
  • JDBC: подключение к реляционным БД (PostgreSQL, MySQL, Oracle).
  • Табличные форматы: Iceberg, Delta, Hudi — хранятся как файлы, но видны как таблицы.
  • NoSQL и поиск: Kafka, Elasticsearch — можно читать логи и документы напрямую.

Push‑down фильтров и проекций

  • Что уходит в источник
    – Если вы пишете WHERE id < 10, Trino постарается передать этот фильтр самому источнику (Postgres или S3), чтобы читать меньше данных.
  • Проекции
    – SELECT только нужных колонок (SELECT name, date вместо SELECT *) тоже «толкается» вниз.
  • Настройки
    predicate-pushdown.enabled=true
    pushdown-filter-enabled=true
    

Federated queries (федерация)

  • JOIN между разными источниками
    – Можно объединить, например, таблицу из Postgres и таблицу из S3 одним запросом:
    SELECT u.id, f.info
    FROM postgres.public.users u
    JOIN hive.default.files f
      ON u.id = f.user_id;
    
  • Согласование типов
    – Trino автоматически приведёт, например, INTEGER в Postgres к BIGINT в файле, если потребуется.
  • Производительность
    – Для небольших объёмов federated‑JOIN удобен; если таблицы огромные, лучше сначала перенести данные (ETL) в единое место.

Безопасность и разграничение доступа

  • Аутентификация: LDAP, JWT, встроенный файл password-authenticator.
  • ACL
    – В access-control.properties можно запретить или разрешить чтение/запись по каталогам, схемам и таблицам.

Устный опрос vs мини‑практика

Устный опрос (5 мин)

  • «Где лежат файлы с настройками коннекторов?»
  • «Что такое push‑down и зачем он нужен?»
  • «Когда полезны federated‑запросы?»

Мини‑практика
Обязательна: только на практике вы научитесь настраивать *.properties, видеть в EXPLAIN, как Trino отсылает фильтры и как объединяет разные базы.


Задание 1. Запуск Trino + Docker‑Postgres

  1. Два контейнера:
    docker run -d --name postgres -e POSTGRES_PASSWORD=pass -p 5432:5432 postgres:13
    docker run -d --name trino -p 8080:8080 trinodb/trino
    
  2. Создайте файл catalog/postgres.properties рядом с запуском Trino:
    connector.name=postgresql
    connection-url=jdbc:postgresql://postgres:5432/postgres
    username=postgres
    password=pass
    
  3. Перезапустите Trino (или добавьте каталог и сделайте docker restart trino).

Задание 2. Simple push‑down

  1. В PostgreSQL:
    CREATE TABLE public.users (id INT PRIMARY KEY, name TEXT);
    INSERT INTO public.users VALUES (1,'A'),(2,'B'),…(100,'Z');
    
  2. В Trino CLI:
    docker exec -it trino trino --server localhost:8080 --catalog postgres --schema public
    
  3. Выполните:
    EXPLAIN (TYPE DISTRIBUTED)
    SELECT * FROM users WHERE id < 10;
    
  4. Убедитесь, что в плане видно Filter[ id < 10 ] внутри TableScan, то есть условие «пушнено» в Postgres.

Задание 3. Federated JOIN

  1. Подключите Hive‑каталог над локальными CSV:
    • Создайте catalog/hive.properties:
      connector.name=hive-hadoop2
      hive.metastore.uri=thrift://<адрес-metastore>:9083
      
    • В метасторе зарегистрируйте внешний каталог, например hive.default.meta, с CSV-файлами, где есть колонка user_id.
  2. В Trino:
    EXPLAIN (TYPE DISTRIBUTED)
    SELECT u.id, m.info
    FROM postgres.public.users u
    JOIN hive.default.meta m
      ON u.id = m.user_id;
    
  3. Замерьте время выполнения SELECT …, потом добавьте фильтр WHERE u.id < 50 и снова замерьте.

Задание 4. Анализ и оптимизация

  1. Отключите в postgres.properties:
    predicate-pushdown.enabled=false
    
  2. Повторите запрос из Задания 2 и замерьте время.
  3. Включите снова и сравните: сколько времени с push‑down против без.
  4. Обсудите, в каком случае federated JOIN оправдан, а когда лучше сначала сделать ETL.

Ad‑hoc‑слой: когда использовать Spark vs Trino

Когда выбирают Spark, а когда Trino?

Характер нагрузки
  • Trino идеально подходит для быстрых, «интерактивных» запросов (< 1 сек) по небольшим или средним фрагментам данных (десятки–сотни мегабайт).
  • Spark — выбор для тяжёлых джобов, обрабатывающих сотни гига‑ и терабайт: сложные трансформации, ML‑пайплайны, большие компакты.
Язык и удобство
  • Trino
    – чистый SQL‑интерфейс, знакомый большинству аналитиков.
    – нет гибкости кода — только SQL.
  • Spark
    – DataFrame/DSL на Python, Scala или SQL.
    – можно писать любую логику: UDF, сложные функции, обращаться к ML‑библиотекам.
Как устроен «движок»
  • Trino
    – обрабатывает запросы потоком, пушит фильтры и проекции вниз, минимизирует копирование.
  • Spark
    – строит ленивый DAG, оптимизирует его Catalyst‑ом, исполняет через Tungsten‑движок, может кешировать промежуточные данные.
Источники данных
  • Trino отлично для federated–JOIN’ов: берет куски из разных источников (S3, базы, Elasticsearch).
  • Spark удобнее, когда всё «сырьё» уже лежит в Data Lake (Parquet/Iceberg), и нужно сделать серию сложных шагов ETL.
Стоимость и масштаб
  • Trino
    – маленький CPU, быстрый старт, платите за короткое время работы.
    – невыгоден для джобов на сотни гига из‑за накладных расходов плана.
  • Spark
    – гибко масштабируется, можно использовать spot‑инстансы, чинить параметры памяти/числа ядер.
    – на мелких задачах долго запускается и жрёт ресурсы.
Гарантии и устойчивость
  • Trino
    – каждая задача — самостоятельный запрос, нет перезапуска внутри.
    – при сбое нужно перезапустить весь запрос.
  • Spark
    – поддерживает checkpoint и восстановление DAG’ов.
    – умеет перезапускать упавшие куски (tasks) и продолжать с точки остановки.

Устный опрос vs мини‑практика

Устный опрос

 

  1. «Нужно раз в час посчитать 10 GB новых логов → Spark или Trino?»
  2. «Срочно посмотреть top‑10 продуктов за вчера — Spark или Trino?»
  3. Мини‑практика (рекомендуется сочетать оба)
  • Покажите на одном наборе данных оба инструмента: измерьте время, почувствуйте удобство и накладные расходы.

Задание 1. Сравнение простого фильтра

  1. Подготовьте файл events.csv (~100 MB) с колонками user_id,status,ts.
  2. Spark (pyspark):
    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder.appName("FilterDemo").getOrCreate()
    df = spark.read.csv("events.csv", header=True, inferSchema=True)
    import time; t0 = time.time()
    cnt = df.filter(df.status == 'OK').count()
    print("Count:", cnt, "Time:", time.time()-t0)
    spark.stop()
    
  3. Trino:
    • Создайте внешний каталог hive и таблицу над тем же events.csv.
    • В CLI выполните:
      SELECT COUNT(*) 
      FROM hive.default.events 
      WHERE status = 'OK';
      
    • Засеките время (Trino UI или shell).
  4. Сравните времена и обсудите: где был накладной старт, где — быстрая интерактивность.

Задание 2. Агрегация и группировка

  1. Spark:
    stats = df.groupBy("user_id") \
              .agg({"value":"avg", "value":"max"})
    stats.show()
    
  2. Trino:
    SELECT user_id, AVG(value), MAX(value)
    FROM hive.default.events
    GROUP BY user_id;
    
  3. Сравните удобство: сколько строк кода vs SQL‑запрос, и производительность в секундах.

Задание 3. Принятие решений по сценарию

Сценарий Выбор
1. Ежечасный отчёт по 10 GB новых логов Spark или Trino?
2. Разовый ad‑hoc‑анализ исторических данных в 1 TB Spark или Trino?
3. Быстрый federated‑JOIN между MySQL и S3 Spark или Trino?

Speed‑layer & Presentation

ClickHouse: колонковое хранилище под низколатентные запросы

Зачем ClickHouse в Speed‑layer?

  • Колонковое хранение и сжатие
    – Данные записаны «столбцами», а не «строками», поэтому при запросе нужных полей читаются лишь соответствующие фрагменты диска.
    – Дополнительное сжатие (LZ4) уменьшает объём I/O и ускоряет чтение.
  • OLAP‑запросы в реальном времени
    – Суммы, средние, группировки по миллионам строк выполняются за миллисекунды, идеально для дашбордов.

Архитектура хранения и исполнения

  • MergeTree‑движок
    Партиционирование: данные разбиваются по дате (или другому полю), чтобы при запросе читать только нужный диапазон.
    Слияние частей: фоновые процессы объединяют мелкие файлы в большие для эффективности.
    TTL: автоматически удаляет старые данные по расписанию.
  • Индексы
    Primary key даёт sparse index: ClickHouse знает, на каких файлах искать нужные диапазоны.
    Skip‑indexes (minmax, bloom‑filter) помогают ещё быстрее пропустить ненужные данные.
  • Векторное исполнение
    – Операции над данными выполняются целыми «батчами» значений (векторы), это эффективнее циклов по отдельным строкам.

Кластер и отказоустойчивость

  • Шардинг и репликация
    – Таблица разбивается на «шарды» (части), каждый имеет копии‑реплики для надёжности.
    – ZooKeeper координирует лидеров‑реплик и синхронизацию.
  • Read/write‑splitting
    – Чтение можно направлять на любую реплику, запись — только на лидера.
  • Placement rules и quorum‑replication
    – Гибкие правила, на каких узлах какие данные хранятся, и запись ждёт подтверждения от кворума реплик.

Materialized Views и агрегаты

  • Pre‑aggregations
    – Можно создать материализованное представление, где данные уже агрегированы по дате/категории и отвечают мгновенно.
  • CollapsingMergeTree
    – Позволяет накапливать события «добавить»/«удалить» и в результате получить конечное состояние.

Инструменты доступа

  • clickhouse-client — консольный клиент.
  • HTTP‑API — простой REST‐интерфейс, удобно скриптовать.
  • JDBC/ODBC — интеграция с BI (Metabase, Grafana, Tableau).

Безопасность и ресурсы

  • Пользователи и пароли, network policies — ограничение доступа по ролям и сетям.
  • Профили ресурсов
    max_memory_usage, max_threads, квоты на конкретные таблицы или пользователей.

Устный опрос vs мини‑практика

  • Устный опрос (5 мин)
    – «Что даёт MergeTree‑движок?»
    – «Как работают skip‑indexes?»
    – «Когда используют materialized view?»
  • Мини‑практика обязательна: только на практике вы прочувствуете скорость ClickHouse и увидите, как векторы данных обрабатываются мгновенно.

Задание 1. Quickstart на Docker

  1. Запустить:
    docker run -d --name clickhouse-server -p 9000:9000 -p 8123:8123 clickhouse/clickhouse-server
    docker run -it --rm --link clickhouse-server:clickhouse-server clickhouse/clickhouse-client --host clickhouse-server
    
  2. Подключиться:
    -- В клиенте
    SELECT version();
    

Задание 2. Создание и наполнение таблицы

  1. Создать таблицу:
    CREATE TABLE default.events (
      event_date Date,
      user_id    UInt64,
      amount     Float64
    ) ENGINE = MergeTree()
    PARTITION BY toYYYYMM(event_date)
    ORDER BY (event_date, user_id);
    
  2. Вставить 1000 строк:
    INSERT INTO default.events VALUES
      ('2025-04-01', 1, 12.5),
      ('2025-04-01', 2, 7.3),
      …;  -- повторите или сгенерируйте
    

Задание 3. Низколатентный запрос

  1. Выполнить:
    SELECT event_date, sum(amount) AS total
    FROM default.events
    WHERE event_date = today()
    GROUP BY event_date;
    
  2. Измерить время:
    \watch 1  -- обновлять каждую секунду
    

Задание 4. Materialized View для pre‑aggregate

  1. Создать MV:
    CREATE MATERIALIZED VIEW default.daily_sales
      ENGINE = SummingMergeTree()
      PARTITION BY event_date
      ORDER BY event_date
    AS
    SELECT event_date, sum(amount) AS total
    FROM default.events
    GROUP BY event_date;
    
  2. Сравнить:
    -- Мгновенный:
    SELECT * FROM default.daily_sales;
    -- На лету:
    SELECT event_date, sum(amount) FROM default.events GROUP BY event_date;
    

Задание 5. TTL‑политика

  1. Установить TTL:
    ALTER TABLE default.events
    MODIFY TTL event_date + INTERVAL 7 DAY;
    
  2. Проверить после 7 дней (или вручную смоделировать дату) удаление старых партиций:
    SHOW PARTITIONS FROM default.events;
    

Архитектура Speed Layer: конвейер в ClickHouse и подходы Lambda vs Kappa

Что такое Speed Layer и зачем он нужен?

Представьте, что в вашем интернет‑магазине каждую секунду происходят события (клики, заказы, статусы). Для отчётов в реальном времени нам нужно не ждать, пока ночью большой пакет данных обработается, а показывать свежие цифры почти сразу — с задержкой в секунды, а не часы. Это и есть задача Speed Layer (“слоя скорости”).

  • Low‑latency (низкая задержка) — данные доходят от события до отчёта за секунды–минуты, а не за часы.

Lambda vs Kappa: два способа «договориться» о Speed Layer

Lambda‑архитектура
  1. Batch Layer (пакетная часть)
    – обрабатывает большие объёмы данных (например, логи за прошедший день) с задержкой (раз в час).
  2. Speed Layer (быстрая часть)
    – непрерывно получает новые события и пишет их в хранилище.
  3. Serving Layer
    – объединяет результаты batch и speed, отдаёт единый отчёт.

Плюс: при сбое Speed Layer есть резерв в Batch‑слое.
Минус: две раздельные обработки — две программы, два набора кода.

Kappa‑архитектура
  • Только поток: нет отдельного batch и speed.
  • Все данные (и старые, и новые) обрабатываются единым потоком:
    1. Исторические события “перематываются” через очередь (Kafka) как будто только что пришли.
    2. Новые события идут тем же путём.

Плюс: один конвейер, один код.
Минус: при очень большом объёме “перемотка” (replay) может занять много времени.


Как данные попадают в ClickHouse

Пакетная загрузка (batch)
  • Кто? Airflow или Xenosphere + Spark.
  • Что делает? Раз в час (или день) читает файлы (CSV/Parquet) и выполняет в ClickHouse:
    INSERT INTO events FORMAT CSV
    
  • Когда? Для исторических данных, когда нужно “дозарядить” большие объёмы.
Потоковая загрузка (stream)
  • Кто? Spark Streaming, Flink или Debezium CDC.
  • Что делает? Принимает каждое событие из Kafka и сразу добавляет или обновляет строку в ClickHouse:
    curl 'http://localhost:8123/?query=INSERT%20INTO%20events%20FORMAT%20JSONEachRow' \
         --data-binary @event.json
    
  • Когда? Для свежих данных, чтобы отчёты сразу отражали изменения.
Объединение batch и stream
  • В Lambda мы храним две версии таблицы:
    • events_batch (из пакетной загрузки)
    • events_stream (из потока)
      Затем в Serving Layer объединяем их запросом:
    SELECT * FROM events_batch
    UNION ALL
    SELECT * FROM events_stream;
    
  • В Kappa только events_stream, но мы периодически “перематываем” Kafka‑топик, чтобы заново внести старые данные.

Надёжность: дубликаты и согласованность

  • Дубликаты
    – При повторном запуске batch‑job или при перезапуске stream конвейера одни и те же события могут записаться несколько раз.
  • Как бороться
    1. Deduplication: храним уникальный ключ (например, event_id) и вставляем upsert вместо insert.
    2. CollapsingMergeTree: движок ClickHouse, который понимает “+” и “–” события и сам убирает дубли.

Формат закрепления

  1. Устный опрос (5 мин)
    – «Что делает Speed Layer?»
    – «Чем Kappa отличается от Lambda?»
  2. Мини‑практика
    – Необходимо: даже простейшие hands‑on покажут, как строятся оба конвейера.

Задание 1. Sketch‑тренинг

На бумаге или доске нарисуйте два потока:

Источник данных ──▶ Batch ETL ──▶ ClickHouse
      │
      └──▶ Stream ETL ──▶ ClickHouse

Отметьте на схеме, где могут появиться дубликаты и где «опоздавшие» записи.


Задание 2. Batch → ClickHouse

  1. Подготовьте CSV (~100 MB) events.csv.
  2. Запишите его в ClickHouse из командной строки:
    clickhouse-client --query="INSERT INTO default.events FORMAT CSV" < events.csv
    
  3. Проверьте:
    SELECT count(*) FROM default.events;
    

Задание 3. Stream → ClickHouse

  1. Запустите Docker‑Kafka и Spark Streaming (локально или в Data Proc).
  2. Скрипт stream_to_ch.py:
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName("StreamToCH").getOrCreate()
    df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers","localhost:9092") \
        .option("subscribe","events") \
        .load() \
        .selectExpr("CAST(value AS STRING) as json")
    # Предполагаем, что json – строка CSV: "2025-04-17,user1,12.3"
    query = df.writeStream.foreachBatch(lambda batch, _: batch.write \
        .format("jdbc") \
        .option("url","jdbc:clickhouse://localhost:8123/default") \
        .option("driver","ru.yandex.clickhouse.ClickHouseDriver") \
        .option("dbtable","events") \
        .mode("append") \
        .save()
    ).start()
    query.awaitTermination()
    
  3. Сгенерируйте 100 событий в Kafka:
    seq 1 100 | xargs -I{} echo "2025-04-17,user{},5.0" | \
      kafka-console-producer --topic events --bootstrap-server localhost:9092
    
  4. Проверьте в ClickHouse:
    SELECT count(*) FROM default.events WHERE user_id >= 1 AND user_id <= 100;
    

Задание 4. Lambda vs Kappa анализ

  1. Lambda
    • Запустите batch‑job (задание 2) и stream‑job (задание 3).
    • Сгенерируйте дубли (повторите загрузку CSV) и посчитайте, сколько строк появится в таблице.
  2. Kappa
    • Откатьте Kafka‑топик командой:
      kafka-consumer-groups --bootstrap-server localhost:9092 \
        --group StreamToCH --reset-offsets --to-earliest --execute --topic events
      
    • Перезапустите только stream‑job.
    • Убедитесь, что ровно те же 100 событий попали в ClickHouse (без batch).

Presentation: схемы таблиц, материализованные представления, агрегаты

Как проектировать схемы таблиц

Star Schema («звезда»)
  • Представьте центр (факт) и лучи (справочники).
    • Fact-таблица хранит «сырьё» — каждую продажу, клик или событие.
    • Dimension-таблицы (справочники) описывают товары, пользователей, даты.
  • Почему «звезда»? Все справочники «смотрят» на факт, как лучи на солнце.
  • Плюс: просто писать запросы — JOIN факт↔справочник один раз.
  • Минус: при очень сложных справочниках может быть много колонок.
Snowflake Schema («снежинка»)
  • То же, что «звезда», но справочники сами могут ссылаться друг на друга.
    • Например, таблица «товары» содержит category_id, а отдельный справочник «категории» хранит название категории.
  • Плюс: меньше дублирования данных.
  • Минус: сложнее JOIN’ить, потому что JOIN’ов больше.
Гранулярность фактов
  • По событию (самая мелкая): каждая строка = один клик или одна продажа.
  • По дню/часу (более грубая): скатываете все события за час в одну строку.
  • По сессии: объединяете все действия одного пользователя за визит.

Materialized Views (материализованные представления)

  • Что это?
    – Это заранее готовая таблица с результатами тяжёлой агрегации, например ежедневной суммы продаж.
  • Зачем
    – Не каждый раз пересчитывать миллионы строк — просто читаем готовое число.
  • Отличие от обычного VIEW
    • Обычный VIEW: «картинка» поверх фактов, пересчитывается на каждый запрос.
    • MATERIALIZED VIEW: данные записаны и обновляются по расписанию или при появлении новых фактов.
  • Где
    • В СУБД (Postgres, Oracle): требует явного REFRESH MATERIALIZED VIEW.
    • В ClickHouse/Trino/BigQuery: могут обновляться автоматически при вставке в исходную таблицу.

Агрегированные таблицы

  • Уровни агрегации
    • По дате: сумма продаж в день.
    • По часу: почасовая динамика.
    • По категории: продажи на разные группы товаров.
    • По региону: продажи по городам.
  • Баланс
    – Чем больше разных уровней агрегации, тем быстрее отчёты, но дольше загрузка и больше места.
  • Slowly Changing Aggregates
    – Инкрементально дополняете только новую порцию данных (например, час за часом), а не всю таблицу.

Как обновлять представления и агрегаты

  1. Инкрементально (stream/CDC)
    – Когда поступает новый заказ, сразу же обновляете нужные MV или агрегатную строку.
  2. Полная перезагрузка (batch)
    – Раз в ночь считываете всё с нуля (или за последние сутки) и полностью перезаписываете агрегаты.
  3. Без простоев
    – Создаёте новую таблицу/представление с тем же именем на лету или используете swap‑операции (ALTER TABLE … RENAME).

Интеграция с BI

  • Как дашборды видят схемы
    – В Metabase, DataLens и других инструментах вы указываете базу и схему (например, analytics.fact_sales).
    – Далее выбираете таблицы и поля из справочников и фактов.
  • Рекомендации по названиям
    – Факт-таблицы называйте fact_<имя>, справочники dim_<имя>.
    – MV — mv_<описание>, агрегаты — agg_<уровень>_<что> (например, agg_daily_revenue).
  • Метаданные
    – В описании поля (comment) указывайте единицы измерения, формат даты, возможные значения.

Устный опрос vs мини‑практика

    1. «В чём отличие star schema от snowflake?»
    2. «Что делает materialized view?»
    3. «Когда использовать инкрементальную агрегацию?»
  • Мини‑практика
    Обязательно: только в практике на SQL/DDL убедитесь, как легко или сложно запустить MV и агрегатные таблицы.

Задание 1. Проектирование star schema

  1. Задача: есть сырые события продаж
    -- Факт
    CREATE TABLE fact_sales (
      order_id       String,
      order_date     Date,
      product_key    Int,
      user_key       Int,
      total_amount   Decimal(10,2)
    );
    -- Справочник товаров
    CREATE TABLE dim_product (
      product_key    Int PRIMARY KEY,
      product_name   String,
      category       String
    );
    -- Справочник пользователей
    CREATE TABLE dim_user (
      user_key       Int PRIMARY KEY,
      user_name      String,
      region         String
    );
    

Задание 2. Создание materialized view

  1. В ClickHouse или Trino:
    CREATE MATERIALIZED VIEW mv_daily_revenue
      TO agg_daily_revenue
    AS
    SELECT
      order_date AS dt,
      sum(total_amount) AS revenue
    FROM fact_sales
    GROUP BY dt;
    
  2. Проверка:
    INSERT INTO fact_sales VALUES ('o1','2025-04-18',1,1,100.0);
    SELECT * FROM agg_daily_revenue WHERE dt = '2025-04-18';
    

Задание 3. Инкрементальный aggregate table

  1. Создать таблицу:
    CREATE TABLE agg_hourly_sales (
      hour         DateTime,
      product_key  Int,
      amount       Decimal(10,2)
    );
    
  2. SQL‑скрипт (запуск раз в час):
    INSERT INTO agg_hourly_sales
    SELECT
      toStartOfHour(order_date) AS hour,
      product_key,
      sum(total_amount) AS amount
    FROM fact_sales
    WHERE order_date >= yesterday()
    GROUP BY hour, product_key;
    
  3. Airflow‑DAG: настройте запуск этого скрипта каждый час.

Задание 4. Сравнение производительности

  1. Прямой запрос:
    SELECT sum(total_amount)
    FROM fact_sales
    WHERE order_date = today();
    
  2. Через MV:
    SELECT revenue
    FROM agg_daily_revenue
    WHERE dt = today();
    
  3. Замеры:
    – Засеките время в SQL‑клиенте или ClickHouse UI для обоих запросов.
    – Обсудите, насколько быстрее выступает MV по сравнению с пересчётом «на лету».

BI & Visualization

Yandex DataLens: коннекторы, дашборды, управление доступом

Коннекторы: «трубопровод» данных

DataLens работает как радио‑приёмник: ему нужно «настроиться» на источник сигнала.

  • Что подключаем
    • ClickHouse, PostgreSQL — ваши быстрые базы
    • S3/CSV — файлы в облачном хранилище
    • HTTP API — данные, которые приходят по Интернету
  • Как настраивать
    1. Указываете адрес (URL), логин/пароль или ключ, иногда путь к SSL‑сертификату.
    2. Проверяете, что DataLens может «достучаться» до сервера: в VPC и firewall разрешены нужные порты.
    3. После подключения автоматом подтягиваются схемы — таблицы и их колонки. При их изменении можно нажать «Обновить схему».

Построение дашбордов: «картинки из данных»

  • Виджеты
    • Линейные графики (динамика по времени)
    • Столбчатые/круговые диаграммы (сравнение категорий)
    • Таблицы с подведение итогов
    • Карты (если есть геоданные)
  • Параметры визуализации
    • Фильтры: пусть пользователь выберет нужный товар или регион
    • Drill‑down: клик по столбцу разворачивает детальный отчёт
    • Drill‑through: ссылка на внешний отчёт (например, в другом дашборде)
  • Layout‑дизайн
    – Располагаете виджеты на сетке, подгоняете их размер, группируете по смыслу: динамика вверху, таблица снизу.

Пользовательский доступ и безопасность

  • Роли
    • Viewer — только смотреть
    • Editor — может редактировать дашборды
    • Admin — полные права
  • Шеринг
    • По группе или пользователю: даёте доступ только «Analysts»
    • Публичная ссылка: всем в интернете можно видеть, без входа
  • Row‑level security (RLS)
    – Можно настроить, чтобы разные пользователи видели разные строки (например, только свой регион).
  • Data Masking
    – Скрытие части значений (например, последние цифры кредитной карты).

Авто‑обновление и алерты

  • Cache TTL
    – DataLens кэширует результаты запросов: указываете «обновлять каждые N часов», чтобы не перегружать базу.
  • Алерты
    – Задаёте правило, например: «если суммарные продажи за вчера < X, отправить email/Slack».
    – DataLens проверит это условие по расписанию и оповестит нужных людей.

Задание 1. Подключение источника

  1. Откройте DataLens UI и перейдите в «Источники данных» → «Добавить».
  2. Выберите ClickHouse (или CSV):
    • Если ClickHouse:
      • Введите адрес http://<host>:8123
      • Укажите логин/пароль (или оставьте пустым, если нет).
    • Если CSV:
      • Загрузите файл или укажите S3‑путь s3://bucket/path/data.csv.
  3. Проверьте соединение и сохраните. В списке таблиц появится ваш источник.

Задание 2. Создание простого дашборда

  1. Создайте новый дашборд «Продажи за месяц».
  2. Добавьте виджет:
    • Тип «Линейный график».
    • Источник: таблица sales.
    • X‑ось: order_date, Y‑ось: SUM(amount).
  3. Добавьте фильтр по полю category.
  4. Опубликуйте дашборд и посмотрите, как работает фильтр.

Задание 3. Настройка прав доступа

  1. Перейдите в «Пользователи и группы».
  2. Создайте группу «Analysts» и добавьте в неё тестового пользователя.
  3. Откройте настройки дашборда «Продажи за месяц» → «Доступ», дайте группе Analysts роль Viewer.
  4. Проверьте под учётной записью другого пользователя, что без доступа дашборд не виден.

Задание 4. Авто‑обновление и алерты

  1. В настройках дашборда установите Cache TTL = 1 час.
  2. Перейдите в раздел «Алерты» и создайте правило:
    • Условие: SUM(amount) < 1000 за вчера.
    • Расписание: каждый день в 08:00.
    • Получатели: ваша почта или Slack‑канал.
  3. Проверьте: выполните вручную «Проверить сейчас» и убедитесь, что уведомление отправилось (или увидели результат проверки).

Metabase: быстрые опросы, embedding, alerting

Что такое Metabase?

Metabase — это простой и доступный инструмент бизнес‑аналитики с веб‑интерфейсом. Представьте, что вам нужно быстро получить отчёт по продажам или кликам, и при этом вы не хотите писать сложные скрипты или настраивать дорогие BI‑системы.

 

Metabase:

  • Устанавливается за пару минут на вашем сервере или локальной машине (есть Docker‑образ).
  • Подключается к любым популярным базам (PostgreSQL, MySQL, ClickHouse и др.) или даже к CSV‑файлам.
  • Позволяет «на лету» строить простые отчёты (Questions) щёлканиями мышки и более сложные — на чистом SQL (Native queries).
  • Объединяет эти отчёты в дашборды, даёт возможность быстро фильтровать данные и переходить внутрь деталей.
  • Поддерживает встраивание отчётов в ваши сайты и уведомления (alerts), если что‑то пошло не так (например, объём продаж упал ниже порога).

Metabase очень дружелюбен к новичкам: основную работу можно делать без единой строчки кода — достаточно выбрать таблицу, перетащить нужные поля и получить график или таблицу.


Основные возможности Metabase

  1. Быстрые опросы (Questions)
    Simple: выбор полей и автоматическая визуализация.
    Custom: графики и группировки через интерфейс без SQL.
    Native: полный контроль через SQL‑запросы.
    – Все Questions можно сохранять, группировать по коллекциям и добавлять фильтры.
  2. Дашборды
    – Сборник виджетов (Questions) на одном экране.
    – Общие фильтры, drill‑down (клик по точке графика ведёт к деталям).
    – Настройка сетки, размера и порядка блоков.
  3. Embedding (встраивание)
    – Публичные ссылки для внешних пользователей.
    – Защищённые iframe‑виджеты с signed JWT — можно встроить в ваш сайт или приложение, передавать параметры (например, ID пользователя).
  4. Alerting (уведомления)
    – Подписка на Question или весь Dashboard.
    – Триггеры: пороговые условия (например, orders < 100).
    – Доставка в email, Slack или на любой webhook.
  5. Управление доступом
    – Роли: Viewer (просмотр), Editor (редактирование), Admin (управление).
    – Группы пользователей, row‑level security для ограничения видимых строк и data masking.
  6. Коннекторы
    – ClickHouse, PostgreSQL, MySQL, MongoDB, Google BigQuery и др.
    – CSV/JSON‑файлы, HTTP API.
    – Обновление схемы («обновить таблицу») при изменении структуры источника.

Быстрые опросы (Questions)

Metabase называет «опросом» любую единичную визуализацию или таблицу данных — это как “робкий мини‑отчёт”.

  • Simple Question («Простой опрос»)
    – Вы кликаете на таблицу, выбираете несколько полей и Metabase сам подбирает виджет (график или таблицу).
    – Не требует писать код.
  • Custom Question («Настраиваемый опрос»)
    – Вы вручную выбираете, какие колонки и агрегации показывать (сумма, среднее, число строк) и где их разместить.
    – Можно добавлять фильтры и группировки, но всё ещё без SQL.
  • Native Query («Нативный запрос»)
    – Полный SQL‑код, который вы пишете сами.
    – Даёт максимальную гибкость: любые сложные выборки, JOIN‑ы и подзапросы.

Организация

  • Коллекции — папки, куда складываете вопросы по темам (Sales Analytics, Marketing, Support).
  • Фильтры и ползунки — вынесенные наружу элементы управления: например, на дашборде есть слайдер по датам, и он влияет на все вложенные вопросы.

Embedding (встраивание отчётов)

Как показать свой отчёт не только в Metabase, но и на вашем сайте или в приложении:

  1. Публичная ссылка
    – Простая URL‑ссылка, чтобы любой, у кого есть адрес, мог видеть отчёт.
    – Минус — нет защиты, любой с ссылкой просмотрит данные.
  2. Iframe с signed JWT
    – Metabase генерирует специальный токен (JWT), который подписан и действителен ограниченное время.
    – В вашем HTML-коде помещаете:
    <iframe src="https://metabase.company.com/embed/question/<QUESTION_ID>?jwt=<SIGNED_TOKEN>"
            width="800" height="600"></iframe>
    

    Безопасность:

    • Ограничение домена (только ваш сайт).
    • Время жизни токена (например, 5 минут).
      Параметры: можно передать в токене фильтр по пользователю или дате.

Alerting (уведомления)

Metabase умеет следить за изменением чисел в вопросе или дашборде и рассылать уведомления.

  • Подписка
    – Можно подписаться на отдельный Question или весь Dashboard.
  • Триггеры
    – Условие: например, «если количество заказов за сегодня < 100».
    – Частота: раз в час, раз в день, раз в 10 минут.
  • Каналы доставки
    • Email — письмо на почту.
    • Slack — сообщение в нужный канал.
    • Webhook — отправить POST‑запрос в ваше приложение.

Устный опрос vs мини‑практика

  • Устный опрос (5 мин)
    1. Как в Metabase создать Native query?
    2. В чём разница между Question и Dashboard?
    3. Какие есть каналы для alerting?
  • Мини‑практика
    Обязательно: Metabase интуитивен, но только вживую вы увидите, как работают конструктор вопросов, встраивание и алерты.

Задание 1. Создание быстрого опроса

  1. Войдите в Metabase и нажмите «New → Question».
  2. Выберите таблицу orders.
  3. Настройте:
    • Metric: SUM(amount)
    • Breakout: product_name
      – получите «топ‑5 товаров по продажам».
  4. Сохраните вопрос в коллекцию Sales Analytics.

Задание 2. Native SQL‑запрос

  1. Нажмите «New → Native query».
  2. Впишите SQL:
    SELECT
      date_trunc('day', order_date) AS day,
      COUNT(*) AS orders
    FROM orders
    WHERE order_date >= current_date - interval '7' day
    GROUP BY 1
    ORDER BY 1;
    
  3. Запустите и сохраните как «Weekly Orders».

Задание 3. Embedding в HTML‑страницу

  1. Найдите ваш Question и нажмите «Embed → Embed this question».
  2. Выберите вариант Signed embedding и скопируйте iframe‑код.
  3. Создайте простой файл embed.html:
    <!DOCTYPE html>
    <html><body>
      <h1>Embedded Report</h1>
      <!-- вставьте сюда iframe‑код -->
    </body></html>
    
  4. Откройте embed.html в браузере и убедитесь, что отчёт отображается.

Задание 4. Настройка алерта

  1. Откройте Question «Weekly Orders».
  2. Нажмите «Subscribe»«Alert».
  3. Условие: orders < 100 AND day = current_date.
  4. Частота: раз в час.
  5. Канал: email или Slack (введите тестовый адрес/канал).
  6. Нажмите «Save» и «Test now», чтобы убедиться, что уведомление уходит.

Оркестрация данных

Apache Airflow: DAG‑ы, операторы, XCom, SLA, RBAC

Что такое Airflow и где он нужен?

Представьте, что у вас есть набор шагов, которые нужно выполнить с определённым расписанием и в чётком порядке: например, каждую ночь собирать данные, обрабатывать их, загружать отчёты. Airflow — это как диспетчер задач для таких конвейеров (ETL, ML‑pipeline и т. д.):

  • Работает по расписанию (как cron),
  • Видит зависимости между задачами (если шаг A упал, не пиши в базу шаг B),
  • Имеет удобный веб‑интерфейс для мониторинга.

Архитектура Airflow

  • Scheduler («планировщик») — запускает задачи точно по расписанию и следит за зависимостями.
  • Webserver — веб‑панель, где вы видите свои DAG’и, логи и состояние.
  • Metadata DB — база (Postgres/SQLite), где Airflow хранит информацию о запусках.
  • Executors — механизмы выполнения:
    • LocalExecutor — всё на одной машине,
    • CeleryExecutor — задачи раздаются воркерам через очередь (Redis/RabbitMQ),
    • KubernetesExecutor — каждая задача — свой Pod в Kubernetes.

DAG: Directed Acyclic Graph

  • DAG (ориентированный ациклический граф) — ваша программа в Airflow.
    • Узлы — задачи (tasks),
    • Ребра — зависимости (task1 >> task2).
  • Как определить:
    from airflow import DAG
    from datetime import datetime, timedelta
    
    default_args = {
        'owner': 'analyst',
        'email': ['you@example.com'],
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    }
    
    dag = DAG(
        'my_first_dag',
        default_args=default_args,
        start_date=datetime(2025,4,18),
        schedule_interval='@daily'
    )
    
  • Best practices
    – Храните DAG’и в отдельных файлах по смыслу,
    – Используйте шаблоны (TaskGroups),
    – Версионируйте в Git.

Операторы и сенсоры

  • Операторы — атомарные шаги:
    • BashOperator — выполнить команду Linux,
    • PythonOperator — вызвать функцию на Python,
    • SparkSubmitOperator — запустить Spark‑задачу.
  • Сенсоры — ждут какого‑то события:
    • HttpSensor — пока не вернётся HTTP 200,
    • S3KeySensor — пока в бакете не появится нужный файл.

XCom: обмен данными между тасками

  • XCom (cross‑communication) — небольшие сообщения, которые одна задача push делает, а другая pull читает.
  • Пример:
    def producer(**ctx):
        ctx['ti'].xcom_push(key='result', value=42)
    
    def consumer(**ctx):
        val = ctx['ti'].xcom_pull(task_ids='task_producer', key='result')
        print("Got:", val)
    

SLA и оповещения

  • SLA (Service Level Agreement) — максимальное время выполнения задачи.
  • Задаётся в default_args или у оператора:
    task = BashOperator(
        task_id='task_2',
        bash_command='echo world',
        sla=timedelta(minutes=1),
        dag=dag
    )
    
  • on_sla_miss_callback позволяет отправить письмо или записать в лог, если задача не уложилась.

RBAC и безопасность

  • RBAC (Role‑Based Access Control) — по ролям в UI:
    • Admin — полные права,
    • User — выполнять и просматривать свои DAG’и,
    • Op — операционные действия (например, перезапуск задач).
  • Интеграция с LDAP или OAuth для единого входа.

Устный опрос vs мини‑практика

  • Устный опрос
    – «Что такое оператор и чем он отличается от сенсора?»
    – «Как задать SLA для задачи?»
    – «Для чего нужен XCom?»
    – «Как включить RBAC в Airflow?»
  • Мини‑практика
    – Обязательно: только создав свой первый DAG и прогнав задачи, вы поймёте, как всё работает на самом деле.

Задание 1. Локальный Airflow через Docker‑Compose

  1. Скопируйте готовый docker-compose.yml с сервисами:
    • airflow-webserver,
    • airflow-scheduler,
    • postgres (Metadata DB),
    • redis (для CeleryExecutor).
  2. Запустите:
    docker-compose up -d
    
  3. Откройте UI: http://localhost:8080

Задание 2. Простейший DAG из двух тасков

  1. В папке dags/ создайте файл hello_dag.py:
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    from airflow.operators.python import PythonOperator
    from datetime import datetime, timedelta
    
    default_args = {'start_date': datetime(2025,4,18)}
    dag = DAG('hello_dag', default_args=default_args, schedule_interval=None)
    
    task_1 = BashOperator(task_id='say_hello', bash_command='echo "hello"', dag=dag)
    task_2 = PythonOperator(task_id='say_world', python_callable=lambda: print("world"), dag=dag)
    
    task_1 >> task_2
    
  2. В UI нажмите Trigger DAG, посмотрите логи.

Задание 3. Использование XCom

  1. Измените say_hello на Python‑таску:
    def push_fn(ti):
        ti.xcom_push(key='msg', value='hello!')
    push = PythonOperator(task_id='push', python_callable=push_fn, dag=dag)
    
  2. В say_world достаньте msg и выведите:
    def pull_fn(ti):
        val = ti.xcom_pull(task_ids='push', key='msg')
        print("Pulled:", val)
    pull = PythonOperator(task_id='pull', python_callable=pull_fn, dag=dag)
    push >> pull
    

Задание 4. Настройка SLA и callback

  1. В default_args добавьте:
    'sla': timedelta(minutes=1),
    'on_sla_miss_callback': lambda context: print("SLA missed!", context)
    
  2. Сделайте say_world задержку:
    bash_command='sleep 90'
    
  3. Запустите — в логах должно появиться «SLA missed!».

Задание 5. Включение RBAC и создание ролей

  1. В airflow.cfg установите:
    rbac = True
    
  2. Перезапустите webserver.
  3. В UI Security → List Roles создайте роли analyst (Viewer) и engineer (Editor).
  4. Импортируйте тестовых пользователей и назначьте им роли.
  5. Войдите за «analyst» — убедитесь, что можно смотреть, но нельзя редактировать DAG’и.

Kubernetes: управление Spark‑, Trino‑, ClickHouse‑кластерами, Helm‑чарты, auto‑scaling

Почему Kubernetes для Big Data‑кластеров?

  • Единая платформа
    – Вместо каждого сервиса под свои виртуалки вы получаете общий «пул» ресурсов, где запускаются и Spark‑задачи, и Trino, и ClickHouse.
  • Декларативность
    – Вы описываете желаемое состояние в манифестах (YAML), а Kubernetes сам следит, чтобы оно было выполнено («self‑healing»).
  • Масштабирование
    – Легко добавить реплики или ноды, а потом так же быстро их убрать.

CRD и операторы

  • CRD (Custom Resource Definition)
    – Расширяет Kubernetes, добавляя новые типы объектов.
  • Операторы
    – Специальные контроллеры, которые знают, как запускать и управлять конкретным приложением:
    1. Spark Operator
      • Позволяет описать Spark‑задачу через ресурс SparkApplication.
      • Автоматически создаёт поды‑driver и поды‑executor, следит за их логами и завершением.
    2. Trino Operator / Helm‑чарт
      • Создаёт StatefulSet для coordinator и Deployment для workers, настраивает кластер в один командой.
    3. ClickHouse Operator
      • Управляет StatefulSet’ами, шардированием и репликами, создаёт сервисы и PVC.

Helm‑чарты и values.yaml

  • Helm‑чарт — это «шаблон» Kubernetes‑манифестов с переменными.
  • values.yaml — файл, где вы задаёте конкретные параметры (образ, число реплик, ресурсы).
  • Override
    – Для разных сред (dev, test, prod) вы можете иметь свои values-dev.yaml, values-prod.yaml и подставлять их при установке:
    helm install my-spark spark-operator/spark-operator \
      --namespace spark --values values-prod.yaml
    
  • Зависимости
    – Чарт может подтягивать другой (например, Spark Operator и вместе с ним Hive Metastore).

Хранение данных и конфигурация

  • PersistentVolumeClaim (PVC)
    – Часть диска кластера, чтобы ClickHouse мог хранить данные «вечно», даже если под умер.
  • ConfigMap
    – Для хранения общих настроек (например, trino.properties).
  • Secret
    – Для паролей и ключей (образы с учётными данными к базе).

Auto‑scaling

  • Horizontal Pod Autoscaler (HPA)
    – Следит за загрузкой CPU/памяти в подах (например, Trino‑coordinator) и автоматически увеличивает или уменьшает число реплик:
    kubectl autoscale deployment trino-coordinator \
      --cpu-percent=50 --min=1 --max=3 -n trino
    
  • Spark dynamic allocation
    spark.kubernetes.executor.autoscaling.enabled=true позволяет Spark самому просить больше executors, когда нужно.
  • Cluster Autoscaler
    – Подключается к облачному провайдеру (Yandex Cloud, GKE, EKS) и добавляет узлы в node pool, когда поды не помещаются из‑за ресурсов.

Устный опрос vs мини‑практика

    1. Что такое CRD и зачем он нужен?
    2. В чём отличие HPA и Cluster Autoscaler?
    3. Зачем ConfigMap и Secret?
  • Мини‑практика
    – Обязательно: только развёртывая реальные чарты и CRD‑объекты вы поймёте, как это всё работает вместе.

Задание 1. Установка Spark Operator

  1. Добавьте репозиторий и установите оператор:
    helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator
    helm install spark-operator spark-operator/spark-operator \
      --namespace spark-operator --create-namespace
    
  2. Создайте манифест spark-pi.yaml:
    apiVersion: sparkoperator.k8s.io/v1beta2
    kind: SparkApplication
    metadata:
      name: spark-pi
      namespace: spark-operator
    spec:
      type: Scala
      mode: cluster
      image: bitnami/spark:latest
      mainClass: org.apache.spark.examples.SparkPi
      arguments:
        - "1000"
      driver:
        cores: 1
        memory: "512m"
      executor:
        cores: 1
        instances: 2
        memory: "512m"
    
  3. Запустите и проверьте статус:
    kubectl apply -f spark-pi.yaml
    kubectl -n spark-operator get sparkapplications
    kubectl -n spark-operator logs spark-pi-driver
    

Задание 2. Деплой Trino через Helm

  1. Добавьте репозиторий и установите:
    helm repo add trino https://trinodb.github.io/charts
    helm install trino trino/trino --namespace trino --create-namespace
    
  2. Откройте UI (локальный форвард):
    kubectl port-forward svc/trino-coordinator 8080:8080 -n trino
    
  3. Измените values.yaml (например, установить worker.replicas: 2) и примените:
    helm upgrade trino trino/trino \
      --namespace trino --values values.yaml
    

Задание 3. Развёртывание ClickHouse Operator

  1. Добавьте репозиторий и установите оператор:
    helm repo add altinity https://altinity.github.io/altinity-helm-charts
    helm install clickhouse-operator altinity/clickhouse-operator \
      --namespace clickhouse --create-namespace
    
  2. Создайте CR манифест chi.yaml:
    apiVersion: clickhouse.altinity.com/v1
    kind: ClickHouseInstallation
    metadata:
      name: clickhouse-cluster
      namespace: clickhouse
    spec:
      configuration:
        clusters:
          - name: cluster1
            layout:
              shardsCount: 3
              replicasCount: 2
    
  3. Примените и проверьте StatefulSet’ы:
    kubectl apply -f chi.yaml
    kubectl -n clickhouse get statefulsets
    

Задание 4. Настройка auto‑scaling для Trino

  1. Установите metrics-server в кластер (если ещё нет).
  2. Создайте HPA:
    kubectl autoscale deployment trino-coordinator \
      --cpu-percent=60 --min=1 --max=3 -n trino
    
  3. Сгенерируйте нагрузку:
    hey -n 10000 -c 100 http://localhost:8080/v1/status
    
  4. Просмотрите изменение числа реплик:
    kubectl get hpa -n trino
    

Задание 5. Cluster Autoscaler в облаке

  1. Включите Cluster Autoscaler на уровне node pool (Yandex Cloud/GKE/EKS).
  2. Создайте нагрузку, которая требует больше ресурсов (например, запустите SparkApplication с 5 executors).
  3. Убедитесь, что добавились новые ноды в пул:
    kubectl get nodes
    

Data Catalog & Governance

Подраздел «OpenMetadata: сбор метаданных, lineage, data‑stewardship»

Что такое OpenMetadata и зачем он нужен?

Представьте, что у вас в компании десятки баз данных, таблиц и отчётов. Никто не знает, где лежат нужные данные, как они связаны и кто за них отвечает. OpenMetadata — это единый каталог метаданных, который помогает:

  • Централизовать описание всех таблиц, колонок, дашбордов и ETL‑джобов.
  • Автоматически собирать структуру и профилировать качество (сколько пустых строк, типы полей).
  • Видеть lineage — откуда данные пришли, какими преобразованиями они прошли и куда ушли дальше.
  • Назначать ответственных (data stewards) за каждую сущность: кто владелец, кто куратор, кто проверяет качество.

Архитектура OpenMetadata

  1. Ingestion сервисы
    • Коннекторы к разным источникам: PostgreSQL, Hive, BigQuery, S3 и др.
    • Можно «тянуть» по расписанию (polling) или слушать события изменений (event‑driven).
  2. Metadata Store
    • Обычно Postgres для сущностей и Elasticsearch для быстрого поиска.
  3. API‑Gateway
    • REST API + Python‑SDK, через которые можно автоматизировать любую работу.
  4. UI
    • Веб‑интерфейс, где виден граф lineage (связи между таблицами и скриптами), профили качества и атрибуты (теги, SLA‑метрики).

Сбор метаданных (Ingestion)

  • Коннекторы
    – Настраиваете «источник» с адресом базы и учётными данными.
  • Запуск сбора
    – По расписанию (например, раз в день) подключается и собирает схему, статистику (количество строк, уникальных значений).
    – Можно настроить реакцию на события, чтобы собирать изменения сразу после выпуска нового ETL‑джоба.

Lineage (прослеживаемость)

  • Автоматическая
    – OpenMetadata умеет встраиваться в SQL‑джобы и ETL‑пайплайны, видеть в них источники и цели.
  • Ручная коррекция
    – В UI можно добавить связь, если коннектор что‑то не заметил.
  • Impact analysis
    – По графу lineage вы сразу увидите, какие отчёты или downstream‑таблицы затронет изменение в исходной таблице.

Data Stewardship

  • Роли
    • Data Owner – «владелец» таблицы, отвечает за бизнес‑логику.
    • Data Steward – куратор качества и терминологии, назначает теги, проверяет SLA.
  • Процесс
    1. Engineer вносит новое описание или изменяет схему.
    2. Steward получает уведомление, проверяет соответствие стандартам.
    3. После одобрения изменения становятся «официальными» в каталоге.

API и интеграции

  • REST API позволяет программно:
    • запускать ingestion,
    • получать список таблиц,
    • читать и изменять lineage и теги.
  • Python‑SDK для удобной работы в скриптах.
  • Webhooks — оповещают внешние системы при изменении метаданных.

Устный опрос vs мини‑практика

  • Устный опрос (5 мин)
    1. Что такое сущность (Entity) и связь (Relationship) в OpenMetadata?
    2. Чем отличается lineage от impact analysis?
    3. Кто такой Data Steward и какие у него задачи?
  • Мини‑практика
    – Обязательно: только зайдя в UI, настроив коннектор и увидев граф lineage, слушатели поймут, как каталог оживает.

Мини‑задания

  1. Развёртывание OpenMetadata
    • Склонировать пример Docker‑Compose из репозитория OpenMetadata.
    • Запустить команды docker-compose up.
    • Открыть UI на http://localhost:8585.
  2. Ingestion метаданных
    • Настроить коннектор к тестовой базе PostgreSQL (адрес, логин, пароль).
    • Запустить сбор (Ingestion) и убедиться, что таблицы и колонки появились в UI.
  3. Просмотр и анализ lineage
    • В тестовой БД создать VIEW, который объединяет две таблицы:
      CREATE VIEW orders_users AS
        SELECT o.order_id, o.amount, u.region
        FROM orders o
        JOIN users u ON o.user_id = u.id;
      
    • В OpenMetadata найти граф lineage этого VIEW и проследить связь до таблиц orders и users.
  4. Назначение Data Steward
    • В UI создать пользователя с ролью Data Steward.
    • Перейти к таблице orders, добавить тег „Critical” и в комментарии описать политику качества (например, «sales_amount > 0»).

Роли и политики: кто меняет схему и проверяет качество

Важные роли в Data Governance

Роль Основные задачи
Data Owner Определяет смысл таблицы, принимает решения по её изменению
Data Steward Следит за терминологией, качеством, SLA, назначает теги
Data Custodian Обеспечивает техническое хранение и безопасность данных
Data Engineer Реализует изменения схем, пишет ETL, обеспечивает доставку данных

Процесс изменения схемы

  1. Инициатор (Analyst или Engineer) предлагает изменение (документ, Pull Request).
  2. Ревью (Steward/Architect) проверяет корректность, совместимость и бизнес‑логику.
  3. Одобрение (Change Board) согласовывает сроки и уведомляет Custodian.
  4. Внедрение (Engineer) деплоит новую схему в систему.
  5. Мониторинг (Steward) отслеживает метрики качества и SLA.

Политики качества данных

  • SLA‑метрики
    • Полнота: доля пустых значений < 1 %
    • Актуальность: задержка обновления < 5 мин
    • Корректность: валидация бизнес‑правил (например, суммы > 0)
  • Threshold’ы и алерты
    – Если метрика выходит за порог, Steward получает уведомление и начинает расследование.

Коммуникация и эскалация

  • Уведомления
    – При нарушении SLA автоматом рассылаются письма/Slack‑сообщения Steward и Owner.
  • Data Council
    – Регулярная встреча (раз в месяц) ответственных по данным: обсуждение инцидентов, планов изменений.

Устный опрос vs мини‑практика

  • Устный опрос
    1. Кто отвечает за изменение схемы и кто за её хранение?
    2. Как выглядит процесс одобрения в Change Board?
    3. Какая метрика качества данных важнее всего?
  • Мини‑практика
    – Рекомендуется: разыграть ролевую игру Change Board, чтобы прочувствовать сложность согласования и важность коммуникации.

Мини‑задания

  1. RACI‑матрица для изменения схемы
    • Составить таблицу, где по шагам процесса (предложение→ревью→деплой→мониторинг) указать:
      • Responsible (кто делает),
      • Accountable (кто отвечает),
      • Consulted (кого нужно спросить),
      • Informed (кого уведомить).
  2. Шаблон политики качества данных
    • Опишите метрику “потеря записей”:
      • Как её считать (разница ожиданий и факта),
      • Пороговые значения,
      • Процесс реагирования при нарушении.
  3. Ролевая игра «Change Board»
    • Раздайте участникам роли: Engineer, Analyst, Steward, Architect.
    • Предложите реальный кейс изменения схемы (например, добавление новой колонки).
    • Проведите обсуждение и голосование: утвердить или отклонить изменение, определив сроки и условия.

Monitoring & Alerting

Grafana: сбор метрик из Spark, Trino, ClickHouse, Airflow

Что такое Grafana?

Grafana — это веб‑инструмент для визуализации метрик и логов.

Вы подключаете к ней источник метрик (Prometheus, ClickHouse, Elasticsearch и др.), а Grafana рисует графики, таблицы и шильдики в удобных дашбордах.

Data Sources

  • Prometheus: собирает метрики с экспортеров.
  • Graphite: старая, но распространённая система.
  • ClickHouse/Elasticsearch: можно записывать метрики сразу в колонковое хранилище.
  • Как добавить: в UI → Configuration → Data Sources → Add → выбираете тип, указываете URL и аутентификацию, нажимаете Save & Test.

Панели и дашборды

  • Панель (Panel) — единичная визуализация: time series, gauge, table.
  • Дашборд (Dashboard) — набор панелей, расположенных на сетке.
  • Построение:
    1. Создаёте дашборд → Add Panel.
    2. Пишете запрос к источнику (PromQL, SQL, Elasticsearch DSL).
    3. Выбираете тип графика и настраиваете оси, легенду, цвета.

Variables & Templating

  • Переменные (Variables) дают интерактивность:
    • Например, список кластеров Spark или namespaces в Kubernetes.
    • В UI дашборда → Variables, выбираете источник и запрос, Grafana подтянет варианты.
  • Использование: в запросе пишете $cluster или ${cluster}, и при смене переменной все панели меняются под выбранный кластер.

Alerting в Grafana

  • Alert‑rule создаётся прямо на панели.
    1. В редакторе панели переходим на вкладку Alert.
    2. Задаём условие: WHEN avg() OF query(A, 5m, now) IS ABOVE 80.
    3. Настраиваем Notification channel (email, Slack, webhook).
  • Отличие от Prometheus‑rule: Grafana умеет делать более «красивые» дашборд‑алерты и объединять несколько запросов в одном правиле.

Пользовательский доступ

  • Роли:
    • Viewer — только просмотр.
    • Editor — может править панели и дашборды.
    • Admin — полные права.
  • Папки и права: дашборды группируются в папки, и к каждой папке можно задать права для группы пользователей.

Задание 1. Docker Compose‑стек

  1. Подготовьте docker-compose.yml с серверами Grafana и Prometheus:
    version: '3'
    services:
      prometheus:
        image: prom/prometheus
        ports: ['9090:9090']
        volumes:
          - ./prometheus.yml:/etc/prometheus/prometheus.yml
      grafana:
        image: grafana/grafana
        ports: ['3000:3000']
    
  2. Запустите:
    docker-compose up -d
    
  3. Откройте Grafana: http://localhost:3000 (логин по умолчанию admin/admin).

Задание 2. Добавление Data Source

  1. В Grafana UI → Configuration → Data Sources → Add data source.
  2. Выберите Prometheus, введите URL http://prometheus:9090, нажмите Save & Test.
  3. Убедитесь, что статус «Data source is working».

Задание 3. Построение дашборда

  1. Создайте новый дашборд → Add PanelPrometheus.
  2. Введите запрос spark_executor_cpu_seconds_total (или аналог вашей метрики).
  3. Перейдите во вкладку TransformAdd variable → создайте переменную cluster, тип Query, запрос label_values(spark_executor_cpu_seconds_total, cluster)Save.
  4. В панели замените часть запроса на $cluster, чтобы выбирать кластер из выпадающего списка.

Задание 4. Настройка Alert‑rule

  1. Перейдите в панель CPU и выберите AlertCreate Alert Rule.
  2. Условие:
    • WHEN avg() OF query(A, 5m, now) IS ABOVE 80
  3. Добавьте Notification channel (email или webhook).
  4. Simulate load:
    • На Prometheus-экспортер node_exporter
    • Запустите stress или hey по API, чтобы CPU вырос.
  5. Дождитесь срабатывания алерта и проверьте уведомление.

Prometheus: экспортёры и alert‑rules

Что такое Prometheus?

Prometheus — это система мониторинга с базой временных рядов. Он scrape‑ит (собирает) метрики с экспортёров и хранит их в своей базе, а потом по этим метрикам можно строить отчёты и alert‑rules.

Exporters

  • node_exporter: метрики ОС (CPU, память, диск).
  • jmx_exporter: метрики Java‑приложений (Spark, Trino, Airflow) через JMX.
  • clickhouse_exporter, airflow-exporter: специфичные для сервисов.

Scrape configuration

В prometheus.yml описываем, откуда брать метрики:

scrape_configs:
  - job_name: 'node'
    static_configs:
      - targets: ['host.docker.internal:9100']

  - job_name: 'spark'
    static_configs:
      - targets: ['spark-driver:8080']
  • job_name — имя группы.
  • targets — список адресов host:port.
  • relabeling и discovery помогают автоматически находить поды в Kubernetes.

Alerting rules

В отдельном файле, например alerts.yml:

groups:
- name: spark.rules
  rules:
  - alert: SparkExecutorDown
    expr: up{job="spark"} == 0
    for: 2m
    labels:
      severity: critical
    annotations:
      summary: "Executor {{ $labels.instance }} is down"
  • expr — PromQL выражение.
  • for — время, в течение которого условие должно держаться, прежде чем сработает alert.
  • labels и annotations — метаданные уведомления.
    Alertmanager получает эти срабатывания, маршрутизирует по routes и отправляет в receivers (email, Slack, webhook).

Metric naming & labels

  • snake_case: spark_executor_cpu_seconds_total.
  • labels: instance, job, cluster, namespace.
  • Яркое имя + понятные метки облегчают фильтрацию и агрегацию.

Задание 1. Подключение node_exporter

  1. Запустите:
    docker run -d -p 9100:9100 prom/node-exporter
    
  2. Добавьте в prometheus.yml:
    scrape_configs:
      - job_name: 'node'
        static_configs:
          - targets: ['host.docker.internal:9100']
    
  3. Перезапустите Prometheus и проверьте на http://localhost:9090/targets, что job node активен.

Задание 2. JMX‑exporter для Spark

  1. Скачайте jmx_prometheus_javaagent.jar и config.yml (пример из репозитория).
  2. Запустите Spark‑приложение с опцией:
    --jvm-opts="-javaagent:/opt/jmx_prometheus.jar=8080:/opt/config.yml"
    
  3. В prometheus.yml добавьте:
    - job_name: 'spark'
      static_configs:
        - targets: ['spark-driver-host:8080']
    
  4. Убедитесь, что метрики видны в http://localhost:9090/metrics.

Задание 3. Написание alert‑rule

  1. Создайте alerts.yml рядом с prometheus.yml:
    groups:
    - name: airflow.rules
      rules:
      - alert: AirflowSchedulerDown
        expr: up{job="airflow-scheduler"} == 0
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "Airflow scheduler is down on {{ $labels.instance }}"
    
  2. В prometheus.yml подключите:
    rule_files:
      - 'alerts.yml'
    
  3. Перезапустите Prometheus и в UI проверьте Alerts — должна быть правило AirflowSchedulerDown.

Задание 4. Интеграция с Alertmanager

  1. Запустите Alertmanager (через Docker).
  2. В alertmanager.yml добавьте ресивер (email или Slack).
  3. В Prometheus prometheus.yml укажите:
    alerting:
      alertmanagers:
        - static_configs:
            - targets: ['alertmanager:9093']
    
  4. Убедитесь, что при проверке правила (ALERTS в UI) уведомления уходят в указанный канал.

Итоги и лучшие практики

CI/CD для ETL/SQL

CI/CD (Continuous Integration / Continuous Deployment) — это автоматизация проверки и развёртывания вашего кода.

Для ETL‑процессов и SQL‑скриптов это значит:

  • Хранить весь код (DAG‑и, SQL, конфиги) в системе контроля версий (Git).
  • Проверять каждое изменение автоматически: линтеры для SQL, unit‑ и integration‑тесты для Python/SQL.
  • Деплоить в тестовую среду и затем в production без ручного копирования.

Задания

  1. Инициализировать Git‑репо
    git init
    git checkout -b feature/etl-pipeline
    # добавить пару файлов (dag.py, script.sql), сделать коммит
    git add .
    git commit -m "Начальный ETL‑пакет"
    git push origin feature/etl-pipeline
    # открыть pull request в GitHub
    
  2. Настроить SQLFluff
    • Установить: pip install sqlfluff sqlfluff-postgres
    • Создать .sqlfluff:
      [sqlfluff]
      dialect = postgres
      
    • Запустить линтинг:
      sqlfluff lint scripts/transform.sql
      
  3. GitHub Actions для Python‑ETL
    • В .github/workflows/ci.yml:
      name: CI
      on: [pull_request]
      jobs:
        lint:
          runs-on: ubuntu-latest
          steps:
          - uses: actions/checkout@v2
          - name: SQLFluff Lint
            run: sqlfluff lint scripts/transform.sql
        test:
          runs-on: ubuntu-latest
          steps:
          - uses: actions/checkout@v2
          - name: Run pytest
            run: pytest tests/
      
    • Сделать PR и убедиться, что workflow запускается.
  4. dbt‑модель и тест
    • В проекте dbt создайте модель models/my_model.sql:
      select * from {{ ref('raw_events') }}
      
    • В schema.yml:
      models:
        - name: my_model
          columns:
            - name: id
              tests:
                - not_null
                - unique
      
    • Добавьте шаг в CI:
      - name: dbt test
        run: |
          pip install dbt
          dbt deps
          dbt seed
          dbt run
          dbt test
      

Стоимость и оптимизация (compute, storage, network)

Ваш счёт в облаке складывается из трёх частей:

  1. Compute: VM‑часы, слоты BigQuery, CPU/Spark executors.
  2. Storage: гигабайты в S3 или в Data Lake, операции PUT/GET.
  3. Network: исходящий трафик (egress) и межрегиональные передачи.

Оптимизировать можно так:

  • Правильно размерить кластер и включить auto‑scaling.
  • Хранить старые данные в более дешёвом классе (Infrequent/Archive).
  • Использовать lifecycle‑правила и partition pruning, чтобы читать и платить только за нужное.

Задания

  1. Анализ billing‑report
    • Скачать CSV отчёт за последний месяц из cloud console.
    • Открыть в Excel/Google Sheets:
      • Найти топ‑3 самых дорогих cost centers (Team, Project).
  2. Настройка lifecycle‑правила в S3/YCS
    • В консоли S3/YCS создайте правило:
      • Перевод в Infrequent Access через 30 дней.
      • Переход в Archive через 180 дней.
  3. Проверка auto‑scaling
    • Запустите на кластере Spark‑job, постепенно увеличивающий нагрузку.
    • Убедитесь, что количество executors растёт и уменьшается автоматически.
  4. Теги ресурсов
    • Добавьте к VM/кластеру теги team=analytics, env=prod.
    • Постройте отчёт по расходам в billing dashboard по тегам.

Безопасность и управление доступом (IAM, RBAC, шифрование)

  • IAM‑принцип — «минимальные права»: даёте пользователю только те доступы, что ему нужны.
  • RBAC — доступ к интерфейсам Airflow, Trino, ClickHouse и BI‑сервисам по ролям (чтение, правка, админ).
  • Шифрование
    • At‑rest: SSE‑KMS или свой ключ (BYOK).
    • In‑transit: TLS/SSL между клиентами и сервисами.
  • Аудит: храните логи доступа в Cloud Audit Logs или SIEM, следите за изменениями через OpenMetadata.

Конкретные задания

  1. Создать роль trino_readonly в IAM
    • Дать минимальные права на чтение схемы и таблиц в S3/Glue.
  2. Включить RBAC в Airflow
    • В airflow.cfg: rbac = True.
    • Перезапустить сервис, создать группу analyst с ролью Viewer, проверить ограничения.
  3. Настроить SSE‑KMS для бакета
    • В консоли S3/YCS включить шифрование на уровне бакета с использованием KMS‑ключа.
  4. Просмотреть аудит в OpenMetadata
    • В UI найти недавние события изменения схемы и доступа к таблицам.

Как выбирать инструмент под задачу

При выборе между Spark, Trino, ClickHouse и другими, учитывайте:

Критерий Spark Trino ClickHouse
Объём данных > 100 GB (batch/ML) < 100 GB (interactive) < 10 TB (real‑time OLAP)
Задержка минуты–часы < 1 сек миллисекунды–секунды
Язык Python/Scala + SQL SQL only SQL only
Управление свой кластер / managed lightweight (serverless) managed/Operator в K8s
Стоимость spot‑инстансы, auto‑scale pay‑per‑query resource‑based

Задания

  1. Матрица оценки инструментов
    • Составить таблицу по приведённым критериям для вашего проекта (скопируйте шаблон выше и заполните).
  2. PoC Spark vs Trino
    • На наборе данных ~100 GB:
      • Spark: запустить spark-sql со count(*).
      • Trino: подключить тот же файл через Hive‑каталог и выполнить SELECT COUNT(*).
    • Сравнить время и ресурсы.
  3. Mini‑workshop
    • Разделить участников на команды, дать кейс и варианты инструментов, собраться и защитить своё решение (5 мин на команду).
  4. Документирование
    • В Wiki/Confluence или в репозитории создать страницу «Выбор Big Data инструментов» с вашими выводами и обоснованиями.
Понравилась статья? Поделиться с друзьями:
Школа Виктора Комлева
Добавить комментарий

Этот сайт использует Akismet для борьбы со спамом. Узнайте, как обрабатываются ваши данные комментариев.