使用 Dataflow 匯入、匯出及修改資料

本頁面說明如何使用 Spanner 適用的 Dataflow 連接器,匯入、匯出及修改 Spanner GoogleSQL 方言資料庫和 PostgreSQL 方言資料庫中的資料。

Dataflow 是一種可轉換並充實資料的代管服務。Spanner 的 Dataflow 連接器可讓您透過 Dataflow 管道對 Spanner 讀取及寫入資料,並視需要轉換或修改資料。您也能建立在 Spanner 和其他Google Cloud 產品間轉移資料的管道。

如要有效率地將大量資料移入或移出 Spanner,建議您採用 Dataflow 連接器。此外,如果資料庫需要進行 Partitioned DML 不支援的大規模轉換作業 (例如資料表移動,以及需要 JOIN 的大量刪除作業),建議使用這個方法。使用個別資料庫時,您也可以透過其他方式匯入及匯出資料:

  • 使用 Google Cloud 主控台將個別資料庫以 Avro 格式從 Spanner 匯出至 Cloud Storage。
  • 使用 Google Cloud 主控台將資料庫從您匯出至 Cloud Storage 的檔案重新匯入 Spanner。
  • 使用 REST API 或 Google Cloud CLI,在 Spanner 和 Cloud Storage 之間來回執行匯出匯入工作 (同樣使用 Avro 格式)。

Spanner 的 Dataflow 連接器是 Apache Beam Java SDK 的一部分,可提供用於執行上述操作的 API。如要進一步瞭解本頁討論的部分概念,例如 PCollection 物件和轉換,請參閱 Apache Beam 程式設計指南

將連接器新增至 Maven 專案

如要將 Google Cloud Dataflow 連接器新增至 Maven 專案,請在您的 pom.xml 檔案中新增 beam-sdks-java-io-google-cloud-platform Maven 構件做為依附元件。

舉例來說,假設您的 pom.xml 檔案將 beam.version 設定成適當的版本號碼,便會新增下列依附元件:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>${beam.version}</version>
</dependency>

從 Spanner 讀取資料

如要從 Spanner 讀取資料,請套用 SpannerIO.read 轉換。並利用 SpannerIO.Read 類別中的方法設定讀取作業。套用上述轉換會傳回 PCollection<Struct>,其中集合內的每個元素都代表讀取作業傳回的個別資料列。您可以根據自己想要的輸出結果,選擇是否使用特定 SQL 查詢來讀取 Spanner。

套用 SpannerIO.read 轉換會執行同步讀取作業,藉此傳回一致的資料視圖。除非您另行指定,否則系統一律會在您開始讀取時建立讀取結果的快照。請參閱讀取,進一步瞭解 Spanner 可執行的各種不同讀取類型。

使用查詢讀取資料

如要從 Spanner 讀取特定一組資料,請利用 SpannerIO.Read.withQuery 方法設定轉換,藉此指定 SQL 查詢。例如:

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withQuery("SELECT * FROM " + options.getTable()));

不指定查詢而讀取資料

如要在不使用查詢的情況下讀取資料庫,您可以透過 SpannerIO.Read.withTable 方法指定資料表名稱,並透過 SpannerIO.Read.withColumns 方法指定要讀取的資料欄清單。例如:

GoogleSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("Singers")
        .withColumns("singerId", "firstName", "lastName"));

PostgreSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("singers")
        .withColumns("singer_id", "first_name", "last_name"));

如要限制讀取的資料列,可以使用 SpannerIO.Read.withKeySet 方法指定要讀取的一組主鍵。

您也可以使用指定的次要索引讀取資料表。與 readUsingIndex API 呼叫相同,索引必須包含查詢結果中顯示的所有資料。

如要這麼做,請指定上一個範例所示的資料表,並使用 SpannerIO.Read.withIndex 方法指定包含所需資料欄值的 index。索引必須儲存轉換作業需要讀取的所有資料欄。系統會隱含儲存基礎資料表的主鍵。舉例來說,如要使用索引 SongsBySongName 讀取 Songs 資料表,請使用下列程式碼:

GoogleSQL

// Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the Songs table,
            .withColumns("SingerId", "AlbumId", "TrackId", "SongName"));

PostgreSQL

// // Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the songs table,
            .withColumns("singer_id", "album_id", "track_id", "song_name"));

控管交易資料的過時程度

系統保證會在一致的資料快照上執行轉換。如要控管資料的過時程度,請使用 SpannerIO.Read.withTimestampBound 方法。詳情請參閱交易

在相同交易中讀取多份資料表

如要同時讀取多份資料表以確保資料一致性,請執行單一交易中的所有讀取作業,如要執行這項操作,請套用 createTransaction 轉換,建立 PCollectionView<Transaction> 物件,接著該物件就會建立交易。上述操作產生的視圖可透過 SpannerIO.Read.withTransaction 傳送至讀取作業。

GoogleSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerID, FirstName, LastName FROM Singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums")
            .withTransaction(tx));

PostgreSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, first_name, last_name FROM singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, album_id, album_title FROM albums")
            .withTransaction(tx));

從所有可用的資料表讀取資料

您可以從 Spanner 資料庫中所有可用的資料表讀取資料。

GoogleSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    "SELECT t.table_name FROM information_schema.tables AS t WHERE t"
                        + ".table_catalog = '' AND t.table_schema = ''"))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create().withQuery("SELECT * FROM " + tableName);
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

PostgreSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    Statement.newBuilder(
                            "SELECT t.table_name FROM information_schema.tables AS t "
                                + "WHERE t.table_catalog = $1 AND t.table_schema = $2")
                        .bind("p1")
                        .to(spannerConfig.getDatabaseId().get())
                        .bind("p2")
                        .to("public")
                        .build()))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create()
                              .withQuery("SELECT * FROM \"" + tableName + "\"");
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

排解不支援的查詢

Dataflow 連接器僅支援 Spanner SQL 查詢,其中查詢執行計畫的第一個運算子為分散式聯集。如果您嘗試透過查詢從 Spanner 讀取資料,並發現有例外狀況說明查詢 does not have a DistributedUnion at the root,請按照「瞭解 Spanner 如何執行查詢」一節的步驟操作,利用 Google Cloud 控制台擷取查詢的執行計畫。

如果您是使用不支援的 SQL 查詢,請將該查詢簡化,讓其中的分散式聯集做為查詢執行計畫的第一個運算子。此外,請移除匯總函式、資料表聯結,以及 DISTINCTGROUP BYORDER 運算子,因為這些運算子很可能會導致查詢無法正常運作。

建立寫入變異

請使用 Mutation 類別的 newInsertOrUpdateBuilder 方法,而非 newInsertBuilder 方法,除非 Java 管道絕對需要使用該方法。如果是 Python 管道,請使用 SpannerInsertOrUpdate,而不是 SpannerInsert。Dataflow 提供至少執行一次的強制保證,這表示系統可能會多次寫入變異。因此,只有 INSERT 變異可能會產生 com.google.cloud.spanner.SpannerException: ALREADY_EXISTS 錯誤,導致管道執行失敗。為避免發生這項錯誤,請改用 INSERT_OR_UPDATE 變異,這個方法會新增資料列,或更新已存在的資料列的資料欄值。INSERT_OR_UPDATE 異動可以套用多次。

寫入 Spanner 及轉換資料

您可以使用 SpannerIO.write 轉換執行一組輸入資料列變異,藉此透過 Dataflow 連接器將資料寫入 Spanner。為了增進效率,Dataflow 連接器會將變異批次分組。

以下範例說明如何將寫入轉換套用至變異的 PCollection

GoogleSQL

albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId));

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withDialectView(dialectView));

如果轉換在完成前意外停止,已套用的變異將不會復原。

自動套用變異群組

您可以使用 MutationGroup 類別,確保變異群組能一起自動套用。系統保證會經由同一項交易來提交 MutationGroup 中的變異,但也可能重新嘗試進行該交易。

如有資料是一起儲存在金鑰體系中的相近位置,您可以將影響這類資料的變數集合成變數群組,藉此發揮最佳效能。由於 Spanner 會同時將父項和子項資料表的資料交錯列於父項資料表中,這類資料在金鑰體系中的位置一向相當接近。建議您將變數群組的結構調整為以下任一形式:其中一個變異是套用至父項資料表,而其他變異則套用至子項資料表;或者所有變異都會修改金鑰體系中儲存位置相近的資料。如要進一步瞭解 Spanner 如何儲存父項和子項資料表的資料,請參閱結構定義與資料模型。如果您並未依據建議的資料表階層來整理變異群組,或您存取的資料在金鑰體系中的儲存位置並不相近,Spanner 可能會需要執行兩階段修訂,進而導致效能減慢。詳情請參閱「位置的取捨」。

如要使用 MutationGroup,請建構 SpannerIO.write 轉換並呼叫 SpannerIO.Write.grouped 方法,該方法會傳回轉換,然後您就可以將轉換套用至 MutationGroup 物件的 PCollection

建立 MutationGroup 時,過程中列出的第一個變異會成為主要變異。如果變異群組會同時影響父項和子項資料表,主要變異應為父項資料表的變異。在其他情況下,您可以使用任何變異做為主要變異。Dataflow 連接器會利用主要變異來判定分區界線,以便有效率地將變異一同批次處理。

舉例來說,假設您的應用程式會監控行為,並標記出有問題的使用者行為供您檢查。發現帶有標記的行為時,您會希望透過更新 Users 資料表來禁止使用者存取應用程式,同時也需要將這類事件記錄在 PendingReviews 資料表。如要確保這兩份資料表皆可自動更新,請使用 MutationGroup

GoogleSQL

PCollection<MutationGroup> mutations =
    suspiciousUserIds.apply(
        MapElements.via(
            new SimpleFunction<>() {

              @Override
              public MutationGroup apply(String userId) {
                // Immediately block the user.
                Mutation userMutation =
                    Mutation.newUpdateBuilder("Users")
                        .set("id")
                        .to(userId)
                        .set("state")
                        .to("BLOCKED")
                        .build();
                long generatedId =
                    Hashing.sha1()
                        .newHasher()
                        .putString(userId, Charsets.UTF_8)
                        .putLong(timestamp.getSeconds())
                        .putLong(timestamp.getNanos())
                        .hash()
                        .asLong();

                // Add an entry to pending review requests.
                Mutation pendingReview =
                    Mutation.newInsertOrUpdateBuilder("PendingReviews")
                        .set("id")
                        .to(generatedId) // Must be deterministically generated.
                        .set("userId")
                        .to(userId)
                        .set("action")
                        .to("REVIEW ACCOUNT")
                        .set("note")
                        .to("Suspicious activity detected.")
                        .build();

                return MutationGroup.create(userMutation, pendingReview);
              }
            }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .grouped());

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
PCollection<MutationGroup> mutations = suspiciousUserIds
    .apply(MapElements.via(new SimpleFunction<String, MutationGroup>() {

      @Override
      public MutationGroup apply(String userId) {
        // Immediately block the user.
        Mutation userMutation = Mutation.newUpdateBuilder("Users")
            .set("id").to(userId)
            .set("state").to("BLOCKED")
            .build();
        long generatedId = Hashing.sha1().newHasher()
            .putString(userId, Charsets.UTF_8)
            .putLong(timestamp.getSeconds())
            .putLong(timestamp.getNanos())
            .hash()
            .asLong();

        // Add an entry to pending review requests.
        Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews")
            .set("id").to(generatedId)  // Must be deterministically generated.
            .set("userId").to(userId)
            .set("action").to("REVIEW ACCOUNT")
            .set("note").to("Suspicious activity detected.")
            .build();

        return MutationGroup.create(userMutation, pendingReview);
      }
    }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .withDialectView(dialectView)
    .grouped());

建立變異群組時,做為引數提供的第一個變異會成為主要變異。由於本範例中的兩份資料表互不相關,因此沒有明確的主要變異。userMutation之所以最先列出,是因為我們選取它做為主要變異。雖然分開套用兩個變異的做法比較便捷,但無法確保單元性。在此情況下,變異群組會是最理想的選擇。

後續步驟