使用 SMT 建立訂閱

本文說明如何使用單一訊息轉換 (SMT) 建立 Pub/Sub 訂閱項目。

訂閱項目 SMT 可直接在 Pub/Sub 中輕量修改訊息資料和屬性。這項功能可在訊息傳送至訂閱端之前,進行資料清理、篩選或格式轉換。

如要建立含有 SMT 的訂閱項目,可以使用 Google Cloud 控制台、Google Cloud CLI、用戶端程式庫或 Pub/Sub API。

事前準備

必要角色和權限

如要取得使用 SMT 建立訂閱項目所需的權限,請要求管理員為您授予專案的 Pub/Sub 編輯者 (roles/pubsub.editor) 身分與存取權管理角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。

這個預先定義的角色具備使用 SMT 建立訂閱項目所需的權限。如要查看確切的必要權限,請展開「必要權限」部分:

所需權限

如要使用 SMT 建立訂閱項目,必須具備下列權限:

  • 授予專案的建立訂閱項目權限: pubsub.subscriptions.create

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

視訂閱類型而定,您可能需要額外權限。 如要查看確切的權限清單,請參閱討論建立特定訂閱項目的文件。舉例來說,如果您要使用 SMT 建立 BigQuery 訂閱項目,請參閱「建立 BigQuery 訂閱項目」頁面。

如果您在與主題不同的專案中建立訂閱項目,則必須在包含主題的專案中,將 roles/pubsub.subscriber 角色授予包含訂閱項目的專案主體。

您可以在專案層級和個別資源層級設定存取權控管。

建立含有 SMT 的訂閱項目

使用 SMT 建立訂閱項目前,請先參閱「訂閱項目的屬性」說明文件。

下列範例假設您要使用這個使用者定義函式 (UDF) SMT 建立訂閱項目。如要進一步瞭解 UDF,請參閱 UDF 總覽

function redactSSN(message, metadata) {
  const data = JSON.parse(message.data);
  delete data['ssn'];
  message.data = JSON.stringify(data);
  return message;
}

控制台

  1. 前往 Google Cloud 控制台的 Pub/Sub「Subscriptions」(訂閱項目) 頁面。

    前往「訂閱項目」頁面

  2. 按一下「Create Subscription」 (建立訂閱項目)

    「建立訂閱」頁面隨即開啟。

  3. 在「Subscription ID」(訂閱 ID) 欄位中,輸入訂閱 ID。 如要進一步瞭解如何命名訂閱項目,請參閱命名規範

  4. 在「轉換」下方,按一下「新增轉換」

  5. 輸入函式名稱。例如 redactSSN

  6. 如果不想立即將 SMT 搭配訂閱項目使用,請按一下「停用轉換」選項。這樣做仍會儲存 SMT,但訊息透過訂閱項目傳送時,系統不會執行 SMT。

  7. 輸入新的轉換。例如:

    function redactSSN(message, metadata) {
      const data = JSON.parse(message.data);
      delete data['ssn'];
      message.data = JSON.stringify(data);
      return message;
    }
  8. Pub/Sub 提供驗證函式,可讓您驗證 SMT。按一下「驗證」來驗證轉換。

  9. 如要新增其他轉換作業,請按一下「新增轉換作業」

  10. 如要依特定順序排列所有 SMT,可以使用向上鍵和向下鍵。如要移除 SMT,請按一下「刪除」按鈕。
  11. Pub/Sub 提供測試函式,可讓您檢查在範例訊息上執行 SMT 的結果。如要測試 SMT,請按一下「測試轉換」

  12. 在「Test transform」視窗中,選取要測試的函式。

  13. 在「輸入訊息」視窗中輸入範例訊息。

  14. 如要新增訊息屬性,請按一下「新增屬性」,然後輸入一或多個鍵/值組合。

  15. 按一下「測試」。系統會顯示將 SMT 套用至郵件的結果。

  16. 關閉視窗,停止測試範例郵件的 SMT。

  17. 按一下「建立」即可建立訂閱項目。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Pub/Sub 提供驗證函式,可讓您驗證 SMT。執行 gcloud pubsub message-transforms validate 指令:

    gcloud pubsub message-transforms validate --message-transform-file=TRANSFORM_FILE

    更改下列內容:

    • TRANSFORM_FILE:包含單一 SMT 的 YAML 或 JSON 檔案路徑。

      以下是 YAML 轉換檔案的範例:

      - javascriptUdf:
          code: >
              function redactSSN(message, metadata) {
                const data = JSON.parse(message.data);
                delete data['ssn'];
                message.data = JSON.stringify(data);
                return message;
              }
          functionName: redactSSN

  3. Pub/Sub 提供測試功能,可讓您檢查對範例訊息執行一或多個 SMT 的結果。執行 gcloud pubsub message-transforms test 指令:

    gcloud pubsub message-transforms test --message-transforms-file=TRANSFORMS_FILE --message=MESSAGE --attributes=ATTRIBUTES

    更改下列內容:

    • TRANSFORMS_FILE:包含一或多個 SMT 的 YAML 或 JSON 檔案路徑。

      以下是 YAML 轉換檔案範例:

      - javascriptUdf:
          code: >
              function redactSSN(message, metadata) {
                const data = JSON.parse(message.data);
                delete data['ssn'];
                message.data = JSON.stringify(data);
                return message;
              }
          functionName: redactSSN

    • MESSAGE:用於測試 SMT 的訊息內文。

    • ATTRIBUTES:用於測試 SMT 的訊息屬性。

  4. 如要建立訂閱項目,請執行 gcloud pubsub subscriptions create 指令:

    gcloud pubsub subscriptions create SUBSCRIPTION_ID \
        --topic=TOPIC_NAME \
        --message-transforms-file=TRANSFORMS_FILE

    取代下列項目:

    • SUBSCRIPTION_ID:要建立的訂閱項目 ID 或名稱。 如要瞭解如何命名訂閱項目,請參閱「資源名稱」。訂閱項目名稱無法變更。

    • TOPIC_NAME:要訂閱的主題名稱,格式為 projects/PROJECT_ID/topics/TOPIC_ID

    • TRANSFORMS_FILE:包含一或多個 SMT 的 YAML 或 JSON 檔案路徑。

      以下是 YAML 轉換檔案範例:

      - javascriptUdf:
          code: >
              function redactSSN(message, metadata) {
                const data = JSON.parse(message.data);
                delete data['ssn'];
                message.data = JSON.stringify(data);
                return message;
              }
          functionName: redactSSN

  5. Java

    在試用這個範例之前,請先按照JavaPub/Sub 快速入門導覽課程:使用用戶端程式庫」中的操作說明進行設定。 詳情請參閱 Pub/Sub Java API 參考說明文件

    如要驗證 Pub/Sub,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。

    import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
    import com.google.pubsub.v1.JavaScriptUDF;
    import com.google.pubsub.v1.MessageTransform;
    import com.google.pubsub.v1.ProjectSubscriptionName;
    import com.google.pubsub.v1.ProjectTopicName;
    import com.google.pubsub.v1.Subscription;
    import java.io.IOException;
    
    public class CreateSubscriptionWithSmtExample {
      public static void main(String... args) throws Exception {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String topicId = "your-topic-id";
        String subscriptionId = "your-subscription-id";
    
        createSubscriptionWithSmtExample(projectId, topicId, subscriptionId);
      }
    
      public static void createSubscriptionWithSmtExample(
          String projectId, String topicId, String subscriptionId) throws IOException {
    
        // UDF that removes the 'ssn' field, if present
        String code =
            "function redactSSN(message, metadata) {"
                + "  const data = JSON.parse(message.data);"
                + "  delete data['ssn'];"
                + "  message.data = JSON.stringify(data);"
                + "  return message;"
                + "}";
        String functionName = "redactSSN";
    
        JavaScriptUDF udf =
            JavaScriptUDF.newBuilder().setCode(code).setFunctionName(functionName).build();
        MessageTransform transform = MessageTransform.newBuilder().setJavascriptUdf(udf).build();
    
        try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
    
          ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
          ProjectSubscriptionName subscriptionName =
              ProjectSubscriptionName.of(projectId, subscriptionId);
    
          Subscription subscription =
              subscriptionAdminClient.createSubscription(
                  Subscription.newBuilder()
                      .setName(subscriptionName.toString())
                      .setTopic(topicName.toString())
                      // Add the UDF message transform
                      .addMessageTransforms(transform)
                      .build());
    
          System.out.println("Created subscription with SMT: " + subscription.getAllFields());
        }
      }
    }

    Python

    在試用這個範例之前,請先按照PythonPub/Sub 快速入門導覽課程:使用用戶端程式庫」中的操作說明進行設定。 詳情請參閱 Pub/Sub Python API 參考說明文件

    如要驗證 Pub/Sub,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。

    from google.cloud import pubsub_v1
    from google.pubsub_v1.types import JavaScriptUDF, MessageTransform
    
    # TODO(developer): Choose an existing topic.
    # project_id = "your-project-id"
    # topic_id = "your-topic-id"
    # subscription_id = "your-subscription-id"
    
    publisher = pubsub_v1.PublisherClient()
    subscriber = pubsub_v1.SubscriberClient()
    topic_path = publisher.topic_path(project_id, topic_id)
    subscription_path = subscriber.subscription_path(project_id, subscription_id)
    
    code = """function redactSSN(message, metadata) {
                const data = JSON.parse(message.data);
                delete data['ssn'];
                message.data = JSON.stringify(data);
                return message;
                }"""
    udf = JavaScriptUDF(code=code, function_name="redactSSN")
    transforms = [MessageTransform(javascript_udf=udf)]
    
    with subscriber:
        subscription = subscriber.create_subscription(
            request={
                "name": subscription_path,
                "topic": topic_path,
                "message_transforms": transforms,
            }
        )
        print(f"Created subscription with SMT: {subscription}")

    Go

    在試用這個範例之前,請先按照GoPub/Sub 快速入門導覽課程:使用用戶端程式庫」中的操作說明進行設定。 詳情請參閱 Pub/Sub Go API 參考說明文件

    如要驗證 Pub/Sub,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。

    // Copyright 2025 Google LLC
    //
    // Licensed under the Apache License, Version 2.0 (the "License");
    // you may not use this file except in compliance with the License.
    // You may obtain a copy of the License at
    //
    //     https://www.apache.org/licenses/LICENSE-2.0
    //
    // Unless required by applicable law or agreed to in writing, software
    // distributed under the License is distributed on an "AS IS" BASIS,
    // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    // See the License for the specific language governing permissions and
    // limitations under the License.
    
    package subscriptions
    
    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/pubsub"
    )
    
    // createSubscriptionWithSMT creates a subscription with a single message transform function applied.
    func createSubscriptionWithSMT(w io.Writer, projectID, subID string, topic *pubsub.Topic) error {
    	// projectID := "my-project-id"
    	// subID := "my-sub"
    	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %w", err)
    	}
    	defer client.Close()
    
    	code := `function redactSSN(message, metadata) {
    			const data = JSON.parse(message.data);
    			delete data['ssn'];
    			message.data = JSON.stringify(data);
    			return message;
    		}`
    	transform := pubsub.MessageTransform{
    		Transform: pubsub.JavaScriptUDF{
    			FunctionName: "redactSSN",
    			Code:         code,
    		},
    	}
    	cfg := pubsub.SubscriptionConfig{
    		Topic:             topic,
    		MessageTransforms: []pubsub.MessageTransform{transform},
    	}
    	sub, err := client.CreateSubscription(ctx, subID, cfg)
    	if err != nil {
    		return fmt.Errorf("CreateSubscription: %w", err)
    	}
    	fmt.Fprintf(w, "Created subscription with message transform: %v\n", sub)
    	return nil
    }
    
    

SMT 如何與其他訂閱功能互動

如果訂閱項目同時使用 SMT 和 Pub/Sub 的內建篩選器,系統會先套用篩選器,再套用 SMT。這會造成下列影響:

  • 如果 SMT 變更了訊息屬性,Pub/Sub 篩選器就不會套用至新的屬性集。
  • 如果訊息遭到 Pub/Sub 篩選器篩除,系統就不會套用 SMT。

如果 SMT 篩除郵件,請注意這對監控訂閱項目待處理事項的影響。如果將訂閱項目饋送至 Dataflow 管道,請勿使用 SMT 篩除訊息,否則會中斷 Dataflow 的自動調度資源。

後續步驟