Dataproc 選用 Delta Lake 元件

當您使用選用元件功能建立 Dataproc 叢集時,可以安裝 Delta Lake 等其他元件。本頁說明如何在 Dataproc 叢集上選用安裝 Delta Lake 元件。

安裝在 Dataproc 叢集時,Delta Lake 元件會安裝 Delta Lake 程式庫,並設定叢集中的 Spark 和 Hive,以便與 Delta Lake 搭配使用。

相容的 Dataproc 映像檔版本

您可以在使用 Dataproc 映像檔 2.2.46 以上版本建立的 Dataproc 叢集上安裝 Delta Lake 元件。

請參閱「支援的 Dataproc 版本」,瞭解 Dataproc 映像檔版本中包含的 Delta Lake 元件版本。

建立 Dataproc 叢集並啟用 Delta Lake 元件時,系統會設定下列 Spark 屬性,以便與 Delta Lake 搭配使用。

設定檔 屬性 預設值
/etc/spark/conf/spark-defaults.conf spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
/etc/spark/conf/spark-defaults.conf spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog

安裝元件

使用 Google Cloud 控制台、Google Cloud CLI 或 Dataproc API 建立 Dataproc 叢集時,請安裝元件。

控制台

  1. 在 Google Cloud 控制台中,前往 Dataproc 的「Create a cluster」(建立叢集) 頁面。

    前往「建立叢集」

    系統會選取「設定叢集」面板。

  2. 在「元件」部分,選取「選用元件」下方的「Delta Lake」和其他選用元件,即可安裝至叢集。

gcloud CLI

如要建立包含 Delta Lake 元件的 Dataproc 叢集,請使用 gcloud dataproc clusters create 指令搭配 --optional-components 標記。

gcloud dataproc clusters create CLUSTER_NAME \
    --optional-components=DELTA \
    --region=REGION \
    ... other flags

注意:

REST API

您可以透過 Dataproc API,使用 SoftwareConfig.Component,做為 clusters.create 要求的一部分,指定 Delta Lake 元件。

使用範例

本節提供使用 Delta Lake 表格讀取及寫入資料的範例。

Delta Lake 資料表

寫入 Delta Lake 資料表

您可以使用 Spark DataFrame 將資料寫入 Delta Lake 資料表。下列範例會建立 DataFrame 和範例資料、在 Cloud Storage 中建立 my_delta_table Delta Lake 資料表,然後將資料寫入 Delta Lake 資料表。

PySpark

# Create a DataFrame with sample data.
data = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"])

# Create a Delta Lake table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS my_delta_table (
    id integer,
    name string)
USING delta
LOCATION 'gs://delta-gcs-demo/example-prefix/default/my_delta_table'""")

# Write the DataFrame to the Delta Lake table in Cloud Storage.
data.writeTo("my_delta_table").append()

Scala

// Create a DataFrame with sample data.
val data = Seq((1, "Alice"), (2, "Bob")).toDF("id", "name")

// Create a Delta Lake table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS my_delta_table (
    id integer,
    name string)
USING delta
LOCATION 'gs://delta-gcs-demo/example-prefix/default/my_delta_table'""")

// Write the DataFrame to the Delta Lake table in Cloud Storage.
data.write.format("delta").mode("append").saveAsTable("my_delta_table")

Spark SQL

CREATE TABLE IF NOT EXISTS my_delta_table (
    id integer,
    name string)
USING delta
LOCATION 'gs://delta-gcs-demo/example-prefix/default/my_delta_table';

INSERT INTO my_delta_table VALUES ("1", "Alice"), ("2", "Bob");

從 Delta Lake 資料表讀取

以下範例會讀取 my_delta_table 並顯示內容。

PySpark

# Read the Delta Lake table into a DataFrame.
df = spark.table("my_delta_table")

# Display the data.
df.show()

Scala

// Read the Delta Lake table into a DataFrame.
val df = spark.table("my_delta_table")

// Display the data.
df.show()

Spark SQL

SELECT * FROM my_delta_table;

Hive with Delta Lake

寫入 Hive 中的 Delta 資料表。

Dataproc Delta Lake 選用元件已預先設定為與 Hive 外部資料表搭配使用。

詳情請參閱「Hive 連接器」。

在 beeline 用戶端中執行範例。

beeline -u jdbc:hive2://

建立 Spark Delta Lake 資料表。

必須先使用 Spark 建立 Delta Lake 資料表,Hive 外部資料表才能參照該資料表。

CREATE TABLE IF NOT EXISTS my_delta_table (
    id integer,
    name string)
USING delta
LOCATION 'gs://delta-gcs-demo/example-prefix/default/my_delta_table';

INSERT INTO my_delta_table VALUES ("1", "Alice"), ("2", "Bob");

建立 Hive 外部資料表。

SET hive.input.format=io.delta.hive.HiveInputFormat;
SET hive.tez.input.format=io.delta.hive.HiveInputFormat;

CREATE EXTERNAL TABLE deltaTable(id INT, name STRING)
STORED BY 'io.delta.hive.DeltaStorageHandler'
LOCATION 'gs://delta-gcs-demo/example-prefix/default/my_delta_table';

注意:

  • io.delta.hive.DeltaStorageHandler 類別會實作 Hive 資料來源 API。它可以載入 Delta 資料表並擷取中繼資料。如果 CREATE TABLE 陳述式中的資料表結構定義與基礎 Delta Lake 中繼資料不一致,系統會擲回錯誤。

在 Hive 中讀取 Delta Lake 資料表。

如要從 Delta 資料表讀取資料,請使用 SELECT 陳述式:

SELECT * FROM deltaTable;

捨棄 Delta Lake 資料表。

如要捨棄 Delta 資料表,請使用 DROP TABLE 陳述式:

DROP TABLE deltaTable;