訂單訊息

Pub/Sub 的訊息排序功能可讓訂閱端用戶端按照發布端用戶端發布訊息的順序接收訊息。

舉例來說,假設某個區域的發布端用戶端依序發布訊息 1、2 和 3。啟用訊息排序後,訂閱端用戶端會按照發布順序接收訊息。如要依序傳送訊息,發布端用戶端必須在相同區域發布訊息。不過,訂閱者可以連線至任何區域,且排序保證仍會維持。

訊息排序功能適用於資料庫變更擷取、使用者工作階段追蹤,以及保留事件時間順序的串流應用程式等情境。

本頁面說明訊息排序的概念,以及如何設定訂閱端,依序接收訊息。如要為訊息排序設定發布商用戶端,請參閱「運用排序鍵發布訊息」。

訊息排序總覽

Pub/Sub 中的排序方式取決於下列因素:

  • 排序鍵:這是 Pub/Sub 訊息中繼資料中使用的字串,代表訊息必須排序的實體。排序鍵的長度上限為 1 KB。如要在某個區域接收一組排序過的訊息,您必須在同一個區域中,發布所有排序鍵相同的訊息。排序鍵的範例包括客戶 ID 和資料庫中資料列的主鍵。

    每個排序鍵的發布處理量上限為 1 MBps。主題上所有排序鍵的輸送量,不得超過發布區域的配額。這項上限可提高至多個 Gbps 單位。

    排序鍵不等同於以分區為基礎的訊息系統中的分區,因為排序鍵的基數預期會比分區高出許多。

  • 啟用訊息排序功能:這是訂閱設定。如果訂閱項目已啟用訊息排序功能,訂閱端用戶端就會按照服務接收訊息的順序,接收在相同區域發布且排序鍵相同的訊息。您必須在訂閱項目中啟用這項設定

    假設您有兩個訂閱項目 A 和 B,都附加至相同的主題 T。 訂閱項目 A 啟用訊息排序功能,訂閱項目 B 則未啟用。在這個架構中,訂閱項目 A 和 B 都會收到主題 T 的同一組訊息。如果您在同一個區域發布含有排序鍵的訊息,訂閱項目 A 會按照訊息的發布順序接收訊息。而訂閱 B 則會收到訊息,但不會有任何預期順序。

一般來說,如果解決方案需要發布者用戶端傳送已排序和未排序的訊息,請建立個別主題,一個用於已排序的訊息,另一個用於未排序的訊息。

使用依序傳送訊息功能的注意事項

以下列出 Pub/Sub 排序訊息傳送行為的重要資訊:

  • 排序鍵內排序:系統應會依序接收以相同排序鍵發布的訊息。假設您為排序鍵 A 發布訊息 1、2 和 3。啟用排序功能後,預計會先傳送 1,再傳送 2,最後傳送 3。

  • 跨鍵排序:發布時使用不同排序鍵的訊息,預期不會依序接收。假設您有訂購鍵 A 和 B。 對於排序鍵 A,訊息 1 和 2 會依序發布。對於排序鍵 B,訊息 3 和 4 會依序發布。不過,訊息 1 可能會比訊息 4 早到或晚到。

  • 訊息重新傳送:Pub/Sub 至少會傳送每個訊息一次,因此 Pub/Sub 服務可能會重新傳送訊息。重新傳送訊息會觸發該金鑰的所有後續訊息重新傳送作業,即使是已確認的訊息也不例外。假設訂閱端用戶端收到特定排序鍵的訊息 1、2 和 3。如果訊息 2 重新傳送 (因為確認期限已過,或盡量確認的訊息未保留在 Pub/Sub 中),訊息 3 也會重新傳送。如果訂閱項目同時啟用訊息排序和無效信件主題,這個行為可能不成立,因為 Pub/Sub 會盡量將訊息轉送至無效信件主題。

  • 延遲確認和無效信件主題:如果特定排序鍵的訊息未經確認,可能會延遲其他排序鍵的訊息傳送,尤其是在伺服器重新啟動或流量變更期間。為確保這類活動井然有序,請務必及時確認所有訊息。如果無法及時確認訊息,建議使用死信主題,避免訊息無限期保留。請注意,訊息寫入無效信件主題時,系統可能不會保留順序。

  • 訊息相依性 (streamingPull 用戶端):相同鍵的訊息通常會傳送至同一個 streamingPull 訂閱端用戶端。如果特定訂閱端用戶端的排序鍵有未完成的訊息,系統就會預期出現親和性。如果沒有未處理的訊息,親和性可能會因負載平衡或用戶端中斷連線而轉移。

    為確保即使親和性可能變更,也能順利處理,請務必設計串流 Pull 應用程式,讓應用程式能處理特定排序鍵的任何用戶端訊息。

  • 與 Dataflow 整合:使用 Pub/Sub 設定 Dataflow 時,請勿為訂閱項目啟用訊息排序功能。Dataflow 有自己的訊息總排序機制,可確保所有訊息按時間順序排列,這是視窗作業的一部分。這種排序方式與 Pub/Sub 的排序鍵方法不同。在 Dataflow 中使用排序鍵可能會降低管道效能。

  • 自動調整資源配置:Pub/Sub 的依序傳送功能可擴充至數十億個排序鍵。排序鍵數量越多,可平行傳送至訂閱端的訊息就越多,因為排序鍵適用於所有具有相同排序鍵的訊息。

  • 效能取捨:依序傳送會帶來一些取捨。與無序傳送相比,有序傳送會降低發布可用性,並增加端對端訊息傳送延遲時間。在依序傳送的案例中,容錯移轉需要協調作業,確保訊息以正確順序寫入及讀取。

  • 熱鍵:使用訊息排序功能時,系統會將所有含有相同排序鍵的訊息,按照服務接收順序傳送至訂閱端。使用者回呼不會執行,直到前一則訊息的回呼完成為止。傳送給訂閱者時,如果訊息共用同一個排序鍵,則訊息的最高總處理量不會受到 Pub/Sub 限制,而是取決於訂閱者用戶端的處理速度。如果每秒產生的訊息數超過訂閱者每秒可處理的訊息數,個別排序鍵就會累積待處理作業,這時就會發生熱鍵。如要減輕熱鍵問題,請盡可能使用最細微的鍵,並盡量縮短每則訊息的處理時間。您也可以監控 subscription/oldest_unacked_message_age 指標的值是否上升,這可能表示有熱鍵。

如要進一步瞭解如何使用訊息排序功能,請參閱下列最佳做法主題:

訂閱者用戶端訊息排序行為

訂閱端用戶端會按照訊息在特定區域的發布順序接收訊息。Pub/Sub 支援多種接收訊息的方式,例如連線至提取和推送訂閱項目的訂閱端用戶端。用戶端程式庫會使用 streamingPull (PHP 除外)。

如要進一步瞭解這些訂閱類型,請參閱「選擇訂閱類型」。

以下各節將討論依序接收訊息對各類型訂閱端用戶端有何意義。

StreamingPull 訂閱者用戶端

使用用戶端程式庫搭配 streamingPull 時,您必須指定使用者回呼,在訂閱端收到訊息時執行。使用用戶端程式庫時,系統會針對任何指定排序鍵,以正確順序對訊息執行回呼,直到完成為止。如果在該回呼中確認訊息,系統會依序對訊息執行所有計算。不過,如果使用者回呼會在訊息中排定其他非同步工作,訂閱端必須確保非同步工作依序完成。其中一個做法是將訊息新增至本機工作佇列,並依序處理。

提取訂閱者用戶端

對於連線至提取訂閱項目的訂閱端用戶端,Pub/Sub 訊息排序支援下列項目:

  • PullResponse 中訂購鍵的所有訊息都會依正確順序排列在清單中。

  • 一次只能針對一個排序鍵保留一批訊息。

一次只能有一批待處理的訊息,這是為了維持訊息的傳送順序,因為 Pub/Sub 服務無法確保為訂閱者的提取要求傳送的回應是否成功或延遲。

推送訂閱者用戶端

推送的限制比提取更嚴格。如果是推送訂閱項目,Pub/Sub 一次只能為每個排序鍵保留一則未處理的訊息。每則訊息都會以個別要求的形式傳送至推送端點。因此,並行傳送要求會導致與下列情況相同的問題:同時將多批訊息傳送至同一排序鍵,以供訂閱者提取。如果主題經常發布含有相同排序鍵的訊息,或延遲時間非常重要,可能就不適合使用推送訂閱項目。

匯出訂閱用戶端

匯出訂閱項目支援已排序的訊息。如果是 BigQuery 訂閱項目,系統會將含有相同排序鍵的訊息依序寫入 BigQuery 資料表。如果是 Cloud Storage 訂閱項目,含有相同排序鍵的訊息可能不會全部寫入同一個檔案。在同一個檔案中,訊息會依排序鍵排序。如果訊息分散在多個檔案中,排序鍵的後續訊息可能會出現在檔案名稱的時間戳記較早的檔案中,而檔案名稱的時間戳記較晚的檔案中,則會出現較早的訊息。

啟用訊息排序功能

如要依序接收訊息,請在接收訊息的訂閱項目中設定訊息排序屬性。依序接收訊息可能會增加延遲時間。建立訂閱項目後,就無法變更訊息排序屬性。

使用 Google Cloud 控制台、Google Cloud CLI 或 Pub/Sub API 建立訂閱項目時,可以設定訊息排序屬性。

控制台

如要建立具有訊息排序屬性的訂閱項目,請按照下列步驟操作:

  1. 前往 Google Cloud 控制台的「Subscriptions」(訂閱項目) 頁面。

前往「訂閱項目」頁面

  1. 按一下「Create Subscription」 (建立訂閱項目)

  2. 輸入訂閱 ID

  3. 選擇要接收訊息的主題。

  4. 在「訊息排序」部分,選取「使用排序鍵為訊息排序」

  5. 點選「建立」

gcloud

如要建立具有訊息排序屬性的訂閱項目,請使用 gcloud pubsub subscriptions create 指令和 --enable-message-ordering 標記:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --enable-message-ordering

SUBSCRIPTION_ID 替換為訂閱 ID。

如果要求成功,指令列會顯示確認訊息:

Created subscription [SUBSCRIPTION_ID].

REST

如要使用訊息排序屬性建立訂閱項目,請傳送類似下列內容的 PUT 要求:

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth application-default print-access-token)

更改下列內容:

  • PROJECT_ID:含有主題的專案 ID
  • SUBSCRIPTION_ID:訂閱 ID

在要求主體中,指定下列項目:

{
  "topic": TOPIC_ID,
  "enableMessageOrdering": true,
}

TOPIC_ID 替換為要附加至訂閱項目的主題 ID。

如果要求成功,回應會是 JSON 格式的訂閱項目:

{
  "name": projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,
  "topic": projects/PROJECT_ID/topics/TOPIC_ID,
  "enableMessageOrdering": true,
}

C++

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 C++ 設定操作說明進行操作。詳情請參閱 Pub/Sub C++ API 參考說明文件

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.set_enable_message_ordering(true);
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 C# 設定操作說明進行操作。詳情請參閱 Pub/Sub C# API 參考說明文件


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithOrderingSample
{
    public Subscription CreateSubscriptionWithOrdering(string projectId, string topicId, string subscriptionId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        var topicName = TopicName.FromProjectTopic(projectId, topicId);
        var subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            EnableMessageOrdering = true
        };

        Subscription subscription = null;
        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Go

以下範例使用 Go Pub/Sub 用戶端程式庫的主要版本 (v2)。如果您仍在使用第 1 版程式庫,請參閱第 2 版遷移指南。如要查看第 1 版程式碼範例清單,請參閱 已淘汰的程式碼範例

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Go 設定說明進行操作。詳情請參閱 Pub/Sub Go API 參考說明文件

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

func createWithOrdering(w io.Writer, projectID, topic, subscription string) error {
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	// subscription := "projects/my-project/subscriptions/my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// Message ordering can only be set when creating a subscription.
	sub, err := client.SubscriptionAdminClient.CreateSubscription(ctx, &pubsubpb.Subscription{
		Name:                  subscription,
		Topic:                 topic,
		EnableMessageOrdering: true,
	})
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created subscription: %v\n", sub)
	return nil
}

Java

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Java 設定操作說明進行操作。詳情請參閱 Pub/Sub Java API 參考說明文件

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithOrdering {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";

    createSubscriptionWithOrderingExample(projectId, topicId, subscriptionId);
  }

  public static void createSubscriptionWithOrderingExample(
      String projectId, String topicId, String subscriptionId) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Set message ordering to true for ordered messages in the subscription.
                  .setEnableMessageOrdering(true)
                  .build());

      System.out.println("Created a subscription with ordering: " + subscription.getAllFields());
    }
  }
}

Node.js

在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Node.js 設定說明進行操作。詳情請參閱 Pub/Sub Node.js API 參考說明文件

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithOrdering(
  topicNameOrId,
  subscriptionNameOrId,
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableMessageOrdering: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with ordering enabled.`,
  );
  console.log(
    'To process messages in order, remember to add an ordering key to your messages.',
  );
}

Node.js

在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Node.js 設定說明進行操作。詳情請參閱 Pub/Sub Node.js API 參考說明文件

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithOrdering(
  topicNameOrId: string,
  subscriptionNameOrId: string,
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableMessageOrdering: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with ordering enabled.`,
  );
  console.log(
    'To process messages in order, remember to add an ordering key to your messages.',
  );
}

Python

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Python 設定操作說明來進行。詳情請參閱 Pub/Sub Python API 參考說明文件

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "enable_message_ordering": True,
        }
    )
    print(f"Created subscription with ordering: {subscription}")

Ruby

以下範例使用 Ruby Pub/Sub 用戶端程式庫 v3。如果您仍在使用第 2 版程式庫,請參閱 第 3 版遷移指南。如要查看 Ruby 第 2 版程式碼範例清單,請參閱 已淘汰的程式碼範例

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫的操作說明設定 Ruby 環境。詳情請參閱 Pub/Sub Ruby API 參考說明文件

# topic_id        = "your-topic-id"
# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::PubSub.new
subscription_admin = pubsub.subscription_admin

subscription = subscription_admin.create_subscription \
  name: pubsub.subscription_path(subscription_id),
  topic: pubsub.topic_path(topic_id),
  enable_message_ordering: true

puts "Pull subscription #{subscription_id} created with message ordering."

後續步驟