Интеграция ClickHouse с Databricks
Коннектор ClickHouse Spark полностью совместим с Databricks. В этом руководстве рассматриваются настройка, установка и сценарии использования коннектора в Databricks с учетом особенностей платформы.
Выбор API для Databricks
По умолчанию Databricks использует Unity Catalog, который блокирует регистрацию каталога Spark. В этом случае вы должны использовать TableProvider API (доступ на основе формата).
Однако, если вы отключите Unity Catalog, создав кластер с режимом доступа No isolation shared, вы можете вместо этого использовать Catalog API. Catalog API обеспечивает централизованную конфигурацию и нативную интеграцию со Spark SQL.
| Статус Unity Catalog | Рекомендуемый API | Примечания |
|---|
| Enabled (по умолчанию) | TableProvider API (на основе формата) | Unity Catalog блокирует регистрацию каталога Spark |
| Disabled (No isolation shared) | Catalog API | Требуется кластер с режимом доступа «No isolation shared» |
Установка в Databricks
Вариант 1: загрузка JAR через интерфейс Databricks
-
Соберите или скачайте JAR-файл среды выполнения:
clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}-{{ stable_version }}.jar
-
Загрузите JAR в рабочее пространство Databricks:
- Перейдите в Workspace → откройте нужную папку
- Нажмите Upload → выберите JAR-файл
- JAR-файл будет сохранён в вашем рабочем пространстве
-
Установите библиотеку в кластер:
- Перейдите в Compute → выберите кластер
- Откройте вкладку Libraries
- Нажмите Install New
- Выберите DBFS или Workspace → укажите загруженный JAR-файл
- Нажмите Install
- Перезапустите кластер, чтобы загрузить библиотеку
Вариант 2: Установка через Databricks CLI
# Upload JAR to DBFS
databricks fs cp clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}-{{ stable_version }}.jar \
dbfs:/FileStore/jars/
# Install on cluster
databricks libraries install \
--cluster-id <your-cluster-id> \
--jar dbfs:/FileStore/jars/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}-{{ stable_version }}.jar
Вариант 3: Координаты Maven (рекомендуется)
-
Перейдите в рабочее пространство Databricks:
- Откройте раздел Compute → выберите кластер
- Перейдите на вкладку Libraries
- Нажмите Install New
- Выберите вкладку Maven
-
Добавьте координаты Maven:
com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}
- Нажмите Install и перезапустите кластер, чтобы загрузить библиотеку
Использование TableProvider API
Когда Unity Catalog включён (по умолчанию), вы должны использовать TableProvider API (доступ, основанный на формате), так как Unity Catalog блокирует регистрацию каталога Spark. Если вы отключили Unity Catalog, используя кластер с режимом доступа «No isolation shared», вы можете вместо этого использовать Catalog API.
Чтение данных
# Чтение из ClickHouse с использованием API TableProvider
df = spark.read \
.format("clickhouse") \
.option("host", "your-clickhouse-cloud-host.clickhouse.cloud") \
.option("protocol", "https") \
.option("http_port", "8443") \
.option("database", "default") \
.option("table", "events") \
.option("user", "default") \
.option("password", dbutils.secrets.get(scope="clickhouse", key="password")) \
.option("ssl", "true") \
.load()
# Схема определяется автоматически
df.display()
val df = spark.read
.format("clickhouse")
.option("host", "your-clickhouse-cloud-host.clickhouse.cloud")
.option("protocol", "https")
.option("http_port", "8443")
.option("database", "default")
.option("table", "events")
.option("user", "default")
.option("password", dbutils.secrets.get(scope="clickhouse", key="password"))
.option("ssl", "true")
.load()
df.show()
Запись данных
# Запись в ClickHouse — таблица будет автоматически создана, если она ещё не существует
df.write \
.format("clickhouse") \
.option("host", "your-clickhouse-cloud-host.clickhouse.cloud") \
.option("protocol", "https") \
.option("http_port", "8443") \
.option("database", "default") \
.option("table", "events_copy") \
.option("user", "default") \
.option("password", dbutils.secrets.get(scope="clickhouse", key="password")) \
.option("ssl", "true") \
.option("order_by", "id") \ # Обязательно: укажите ORDER BY при создании новой таблицы
.option("settings.allow_nullable_key", "1") \ # Обязательно для ClickHouse Cloud, если в ORDER BY есть столбцы типа Nullable
.mode("append") \
.save()
df.write
.format("clickhouse")
.option("host", "your-clickhouse-cloud-host.clickhouse.cloud")
.option("protocol", "https")
.option("http_port", "8443")
.option("database", "default")
.option("table", "events_copy")
.option("user", "default")
.option("password", dbutils.secrets.get(scope="clickhouse", key="password"))
.option("ssl", "true")
.option("order_by", "id") // Обязательно: укажите ORDER BY при создании новой таблицы
.option("settings.allow_nullable_key", "1") // Обязательно для ClickHouse Cloud, если в ORDER BY есть столбцы типа Nullable
.mode("append")
.save()
Особенности Databricks
Управление секретами
Используйте области секретов Databricks для безопасного хранения учетных данных ClickHouse:
# Access secrets
password = dbutils.secrets.get(scope="clickhouse", key="password")
Инструкции по настройке см. в документации Databricks по управлению секретами.
Подключение к ClickHouse Cloud
При подключении к ClickHouse Cloud из Databricks:
- Используйте HTTPS-протокол (
protocol: https, http_port: 8443)
- Включите SSL (
ssl: true)
Примеры
Пример полного рабочего процесса
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Инициализация Spark с коннектором для ClickHouse
spark = SparkSession.builder \
.config("spark.jars.packages", "com.clickhouse.spark:clickhouse-spark-runtime-3.4_2.12:0.9.0") \
.getOrCreate()
# Чтение из ClickHouse
df = spark.read \
.format("clickhouse") \
.option("host", "your-host.clickhouse.cloud") \
.option("protocol", "https") \
.option("http_port", "8443") \
.option("database", "default") \
.option("table", "source_table") \
.option("user", "default") \
.option("password", dbutils.secrets.get(scope="clickhouse", key="password")) \
.option("ssl", "true") \
.load()
# Преобразование данных
transformed_df = df.filter(col("status") == "active")
# Запись в ClickHouse
transformed_df.write \
.format("clickhouse") \
.option("host", "your-host.clickhouse.cloud") \
.option("protocol", "https") \
.option("http_port", "8443") \
.option("database", "default") \
.option("table", "target_table") \
.option("user", "default") \
.option("password", dbutils.secrets.get(scope="clickhouse", key="password")) \
.option("ssl", "true") \
.option("order_by", "id") \
.mode("append") \
.save()
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
// Инициализация Spark с коннектором для ClickHouse
val spark = SparkSession.builder
.config("spark.jars.packages", "com.clickhouse.spark:clickhouse-spark-runtime-3.4_2.12:0.9.0")
.getOrCreate()
// Чтение из ClickHouse
val df = spark.read
.format("clickhouse")
.option("host", "your-host.clickhouse.cloud")
.option("protocol", "https")
.option("http_port", "8443")
.option("database", "default")
.option("table", "source_table")
.option("user", "default")
.option("password", dbutils.secrets.get(scope="clickhouse", key="password"))
.option("ssl", "true")
.load()
// Преобразование данных
val transformedDF = df.filter(col("status") === "active")
// Запись в ClickHouse
transformedDF.write
.format("clickhouse")
.option("host", "your-host.clickhouse.cloud")
.option("protocol", "https")
.option("http_port", "8443")
.option("database", "default")
.option("table", "target_table")
.option("user", "default")
.option("password", dbutils.secrets.get(scope="clickhouse", key="password"))
.option("ssl", "true")
.option("order_by", "id")
.mode("append")
.save()