有序消息

消息排序是 Pub/Sub 的一项功能,可让您在订阅方客户端中按发布方客户端发布消息的顺序接收消息。

例如,假设某个区域中的发布者客户端按顺序发布了消息 1、2 和 3。启用消息排序后,订阅者客户端会按相同的顺序接收已发布的消息。为了按顺序传送消息,发布者客户端必须在同一区域中发布消息。不过,订阅者可以连接到任何区域,并且系统仍会保证消息顺序。

消息排序是一项实用功能,适用于数据库更改捕获、用户会话跟踪和流式应用等需要保留事件时间顺序的场景。

本页面介绍了消息排序的概念,以及如何设置订阅者客户端以按顺序接收消息。如需配置发布者客户端以实现消息排序,请参阅使用排序键发布消息

消息排序概览

Pub/Sub 中的排序由以下因素决定:

  • 排序键:这是 Pub/Sub 消息元数据中使用的字符串,表示必须按顺序传送消息的实体。排序键的长度上限为 1 KB。如需接收某个区域中的一组有序消息,您必须在同一区域中发布具有相同排序键的所有消息。排序键的一些示例包括客户 ID 和数据库中行的主键。

    每个排序键的发布吞吐量限制为 1 MBps。主题上所有排序键的吞吐量受限于发布区域中的可用配额。此限制可提高到数 GBps。

    排序键不等同于基于分区的消息传递系统中的分区,因为排序键的基数预计会比分区高得多。

  • 启用消息排序:这是一项订阅设置。如果订阅已启用消息排序,则订阅者客户端会按服务接收消息的顺序接收在同一区域中发布且具有相同排序键的消息。您必须在订阅中启用此设置

    假设您有两个订阅 A 和 B,它们都附加到同一主题 T。订阅 A 配置为启用消息排序,订阅 B 配置为不启用消息排序。在此架构中,订阅 A 和订阅 B 都会从主题 T 接收同一组消息。如果您在同一区域发布带有排序键的消息,则订阅 A 会按消息的发布顺序接收消息。而订阅 B 接收消息时没有任何顺序要求。

一般来说,如果您的解决方案要求发布者客户端发送有序消息和无序消息,请创建单独的主题,一个用于有序消息,另一个用于无序消息。

使用有序消息传递时的注意事项

以下列表包含有关 Pub/Sub 中有序消息传递行为的重要信息:

  • 键内排序:使用相同排序键发布的消息应按顺序接收。假设您针对排序键 A 发布了消息 1、2 和 3。启用排序后,预计 1 会在 2 之前交付,2 会在 3 之前交付。

  • 跨键排序:使用不同排序键发布的消息预计不会按顺序接收。假设您有排序键 A 和 B。 对于排序键 A,消息 1 和 2 按顺序发布。对于排序键 B,消息 3 和 4 按顺序发布。不过,消息 1 可能会在消息 4 之前或之后到达。

  • 消息重新传送:Pub/Sub 会为每条消息至少传送一次,因此 Pub/Sub 服务可能会重新传送消息。消息的重新传送会触发相应键的所有后续消息(包括已确认的消息)的重新传送。假设订阅方客户端接收到具有特定排序键的消息 1、2 和 3。如果消息 2 被重新传送(因为确认时限已过或尽力而为的确认未在 Pub/Sub 中持久保留),则消息 3 也会被重新传送。如果在订阅上同时启用了消息排序和死信主题,则可能不是这样,因为 Pub/Sub 会尽最大努力将消息转发到死信主题。

  • 确认延迟和死信主题:对于给定的排序键,未确认的消息可能会延迟其他排序键的消息传送,尤其是在服务器重启或流量变化期间。为了确保此类活动有序进行,请务必及时确认所有消息。如果无法及时确认,请考虑使用死信主题,以防止无限期保留消息。请注意,当消息写入死信主题时,顺序可能无法保留。

  • 消息亲和性(streamingPull 客户端):具有相同键的消息通常会传送给同一 streamingPull 订阅方客户端。当消息对于特定订阅方客户端的排序键处于待处理状态时,系统会预期出现亲和性。如果没有未处理的消息,亲和性可能会因负载均衡或客户端断开连接而发生变化。

    为确保即使在可能出现亲和性变化的情况下也能顺利处理,至关重要的是,您需要以这样一种方式设计 streamingPull 应用:它能够处理任何客户端中具有给定排序键的消息。

  • 与 Dataflow 集成:使用 Pub/Sub 配置 Dataflow 时,请勿为订阅启用消息排序。Dataflow 具有自己的消息总排序机制,可确保在窗口化操作中按时间顺序处理所有消息。这种排序方法与 Pub/Sub 基于排序键的方法不同。将排序键与 Dataflow 搭配使用可能会降低流水线性能。

  • 自动伸缩:Pub/Sub 的有序传送功能可扩缩到数十亿个排序键。排序键的数量越多,向订阅者并行传送的消息就越多,因为排序适用于具有相同排序键的所有消息。

  • 性能权衡:有序交付确实需要做出一些权衡取舍。与无序传送相比,有序传送会降低发布可用性并增加端到端消息传送延迟时间。在按序传送的情况下,故障切换需要协调,以确保消息按正确的顺序写入和读取。

  • 热键:使用消息排序时,所有具有相同排序键的消息都会按服务接收它们的顺序发送到订阅者客户端。在完成前一条消息的回调之前,用户回调不会运行。在向订阅者传送消息时,共享同一排序键的消息的吞吐量上限不受 Pub/Sub 的限制,而是受订阅者客户端的处理速度限制。当单个排序键上的积压消息过多时,就会出现热门键,因为每秒生成的消息数超过了订阅者每秒可处理的消息数。为了缓解热门键问题,请尽可能使用最精细的键,并尽可能缩短每条消息的处理时间。您还可以监控 subscription/oldest_unacked_message_age 指标,如果该指标的值不断上升,可能表明存在热键。

如需详细了解如何使用消息排序,请参阅以下最佳实践主题:

订阅者客户端的消息排序行为

订阅者客户端按消息在特定区域中的发布顺序接收消息。Pub/Sub 支持多种接收消息的方式,例如连接到拉取订阅和推送订阅的订阅者客户端。客户端库使用 streamingPull(PHP 除外)。

如需详细了解这些订阅类型,请参阅选择订阅类型

以下部分讨论了按顺序接收消息对于每种类型的订阅者客户端意味着什么。

StreamingPull 订阅者客户端

将客户端库与 streamingPull 搭配使用时,您必须指定一个用户回调,该回调会在订阅者客户端收到消息时运行。借助客户端库,对于任何给定的排序键,系统都会按正确的顺序对消息运行回调,直至完成。如果消息在该回调中得到确认,则对消息的所有计算都会按顺序进行。不过,如果用户回调在消息上安排了其他异步工作,订阅者客户端必须确保异步工作按顺序完成。一种方法是将消息添加到按顺序处理的本地工作队列中。

拉取订阅者客户端

对于连接到拉取订阅的订阅方客户端,Pub/Sub 消息排序支持以下功能:

  • PullResponse 中某个排序键的所有消息在列表中都按正确的顺序排列。

  • 一个排序键一次只能有一个未完成的消息批次。

一次只能有一个批次的消息处于未完成状态,这是为了保持按顺序传送所必需的,因为 Pub/Sub 服务无法确保其针对订阅者的拉取请求发送的响应的成功或延迟。

推送订阅者客户端

推送的限制比拉取的限制更严格。 对于推送订阅,Pub/Sub 每次仅支持每个排序键的一条未处理消息。每条消息都会作为单独的请求发送到推送端点。因此,并行发送请求会遇到与以下情况相同的问题:同时向拉取订阅者传送具有相同排序键的多批消息。对于经常使用相同排序键发布消息的主题,或者对于延迟时间非常重要的主题,推送订阅可能不是一个好的选择。

导出订阅客户

导出订阅支持有序消息。对于 BigQuery 订阅,具有相同排序键的消息会按顺序写入其 BigQuery 表。对于 Cloud Storage 订阅,具有相同排序键的消息可能不会全部写入同一文件。如果消息位于同一文件中,则具有相同排序键的消息会按顺序排列。当消息分布在多个文件中时,排序键的后续消息可能会出现在文件名的时间戳早于包含较早消息的文件名的时间戳的文件中。

启用消息排序

要按顺序接收消息,请在从中接收消息的订阅上设置消息排序属性。按顺序接收消息可能会增加延迟时间。创建订阅后,您无法更改消息排序属性。

您可以在使用 Google Cloud 控制台、Google Cloud CLI 或 Pub/Sub API 创建订阅时设置消息排序属性。

控制台

要使用消息排序属性创建订阅,请执行以下操作:

  1. 在 Google Cloud 控制台中,前往订阅页面。

前往订阅页面

  1. 点击创建订阅

  2. 输入订阅 ID

  3. 选择要接收消息的主题。

  4. 消息排序部分,选择使用排序键对消息排序

  5. 点击创建

gcloud

要使用消息排序属性创建订阅,请使用 gcloud pubsub subscriptions create 命令和 --enable-message-ordering 标志:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --enable-message-ordering

SUBSCRIPTION_ID 替换为订阅的 ID。

如果请求成功,命令行会显示一条确认消息:

Created subscription [SUBSCRIPTION_ID].

REST

如需创建具有消息排序属性的订阅,请发送如下所示的 PUT 请求:

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth application-default print-access-token)

替换以下内容:

  • PROJECT_ID:包含主题的项目的 ID
  • SUBSCRIPTION_ID:订阅的 ID

在请求正文中,指定以下内容:

{
  "topic": TOPIC_ID,
  "enableMessageOrdering": true,
}

TOPIC_ID 替换为要附加到订阅的主题的 ID。

如果请求成功,则响应为 JSON 格式的订阅:

{
  "name": projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,
  "topic": projects/PROJECT_ID/topics/TOPIC_ID,
  "enableMessageOrdering": true,
}

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.set_enable_message_ordering(true);
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithOrderingSample
{
    public Subscription CreateSubscriptionWithOrdering(string projectId, string topicId, string subscriptionId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        var topicName = TopicName.FromProjectTopic(projectId, topicId);
        var subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            EnableMessageOrdering = true
        };

        Subscription subscription = null;
        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Go

以下示例使用 Go Pub/Sub 客户端库的主要版本 (v2)。如果您仍在使用 v1 库,请参阅迁移到 v2 的指南。如需查看 v1 代码示例的列表,请参阅 已弃用的代码示例

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Go API 参考文档

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

func createWithOrdering(w io.Writer, projectID, topic, subscription string) error {
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	// subscription := "projects/my-project/subscriptions/my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// Message ordering can only be set when creating a subscription.
	sub, err := client.SubscriptionAdminClient.CreateSubscription(ctx, &pubsubpb.Subscription{
		Name:                  subscription,
		Topic:                 topic,
		EnableMessageOrdering: true,
	})
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created subscription: %v\n", sub)
	return nil
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithOrdering {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";

    createSubscriptionWithOrderingExample(projectId, topicId, subscriptionId);
  }

  public static void createSubscriptionWithOrderingExample(
      String projectId, String topicId, String subscriptionId) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Set message ordering to true for ordered messages in the subscription.
                  .setEnableMessageOrdering(true)
                  .build());

      System.out.println("Created a subscription with ordering: " + subscription.getAllFields());
    }
  }
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithOrdering(
  topicNameOrId,
  subscriptionNameOrId,
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableMessageOrdering: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with ordering enabled.`,
  );
  console.log(
    'To process messages in order, remember to add an ordering key to your messages.',
  );
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithOrdering(
  topicNameOrId: string,
  subscriptionNameOrId: string,
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableMessageOrdering: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with ordering enabled.`,
  );
  console.log(
    'To process messages in order, remember to add an ordering key to your messages.',
  );
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "enable_message_ordering": True,
        }
    )
    print(f"Created subscription with ordering: {subscription}")

Ruby

以下示例使用 Ruby Pub/Sub 客户端库 v3。如果您仍在使用 v2 库,请参阅 迁移到 v3 的指南。如需查看 Ruby v2 代码示例的列表,请参阅 已弃用的代码示例

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

# topic_id        = "your-topic-id"
# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::PubSub.new
subscription_admin = pubsub.subscription_admin

subscription = subscription_admin.create_subscription \
  name: pubsub.subscription_path(subscription_id),
  topic: pubsub.topic_path(topic_id),
  enable_message_ordering: true

puts "Pull subscription #{subscription_id} created with message ordering."

后续步骤