Memproses lebih banyak pesan dengan kontrol konkurensi

Kontrol serentak adalah fitur yang tersedia di library klien tingkat tinggi Pub/Sub. Anda juga dapat menerapkan kontrol serentak Anda sendiri saat menggunakan library tingkat rendah.

Dukungan untuk kontrol serentak bergantung pada bahasa pemrograman library klien. Untuk penerapan bahasa yang mendukung thread paralel, seperti C++, Go, dan Java, library klien membuat pilihan default untuk jumlah thread.

Pilihan ini mungkin tidak optimal untuk aplikasi Anda. Misalnya, jika aplikasi pelanggan Anda tidak dapat mengimbangi volume pesan masuk dan tidak terikat CPU, Anda harus meningkatkan jumlah thread. Untuk operasi pemrosesan pesan yang intensif CPU, mengurangi jumlah thread mungkin tepat.

Halaman ini menjelaskan konsep kontrol serentak dan cara menyiapkan fitur untuk klien pelanggan Anda. Untuk mengonfigurasi klien penayang Anda untuk kontrol serentak, lihat Kontrol serentak.

Konfigurasi kontrol konkurensi

Nilai default untuk variabel kontrol serentak dan nama variabel mungkin berbeda di seluruh library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi Referensi API. Misalnya, di library klien Java, metode untuk mengonfigurasi kontrol serentak adalah setParallelPullCount(), setExecutorProvider(), setSystemExecutorProvider(), dan setChannelProvider().

  • setParallelPullCount() memungkinkan Anda memutuskan jumlah streaming yang akan dibuka. Anda dapat membuka lebih banyak aliran jika klien pelanggan Anda dapat menangani lebih banyak data daripada yang dikirim dalam satu aliran, yaitu 10 MBps.

  • setExecutorProvider() memungkinkan Anda menyesuaikan penyedia eksekutor yang digunakan untuk memproses pesan. Misalnya, Anda dapat mengubah penyedia eksekutor menjadi penyedia yang menampilkan eksekutor bersama tunggal dengan jumlah thread terbatas di beberapa klien subscriber. Konfigurasi ini membantu membatasi jumlah thread yang dibuat. Jumlah total thread yang digunakan untuk kontrol serentak bergantung pada penyedia eksekutor yang diteruskan di library klien dan jumlah penarikan paralel.

  • setSystemExecutorProvider() memungkinkan Anda menyesuaikan penyedia eksekutor yang digunakan untuk pengelolaan sewa. Biasanya, Anda tidak mengonfigurasi nilai ini kecuali jika Anda ingin menggunakan penyedia eksekutor yang sama di setExecutorProvider dan setSystemExecutorProvider. Misalnya, Anda dapat menggunakan penyedia eksekutor yang sama jika memiliki sejumlah langganan dengan throughput rendah. Menggunakan nilai yang sama akan membatasi jumlah thread di klien.

  • setChannelProvider() memungkinkan Anda menyesuaikan penyedia saluran yang digunakan untuk membuka koneksi ke Pub/Sub. Biasanya, Anda tidak mengonfigurasi nilai ini kecuali jika Anda ingin menggunakan channel yang sama di beberapa klien pelanggan. Menggunakan kembali channel di terlalu banyak klien dapat menyebabkan error GOAWAY atau ENHANCE_YOUR_CALM. Jika Anda melihat error ini di log aplikasi atau Cloud Logs, buat lebih banyak channel.

Contoh kode untuk kontrol serentak

C++

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan C++ di Panduan memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
auto sample = [](std::string project_id, std::string subscription_id) {
  // Create a subscriber with 16 threads handling I/O work, by default the
  // library creates `std::thread::hardware_concurrency()` threads.
  auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
      pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
      Options{}
          .set<pubsub::MaxConcurrencyOption>(8)
          .set<GrpcBackgroundThreadPoolSizeOption>(16)));

  // Create a subscription where up to 8 messages are handled concurrently. By
  // default the library uses `std::thread::hardware_concurrency()` as the
  // maximum number of concurrent callbacks.
  auto session = subscriber.Subscribe(
      [](pubsub::Message const& m, pubsub::AckHandler h) {
        // This handler executes in the I/O threads, applications could use,
        // std::async(), a thread-pool, or any other mechanism to transfer the
        // execution to other threads.
        std::cout << "Received message " << m << "\n";
        std::move(h).ack();
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      });
  return std::make_pair(subscriber, std::move(session));
};

Go

Contoh berikut menggunakan library klien Go Pub/Sub versi utama (v2). Jika Anda masih menggunakan library v1, lihat panduan migrasi ke v2. Untuk melihat daftar contoh kode v1, lihat contoh kode yang tidak digunakan lagi.

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Panduan memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"
	"sync/atomic"
	"time"

	"cloud.google.com/go/pubsub/v2"
)

func pullMsgsConcurrencyControl(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// client.Subscriber can be passed a subscription ID (e.g. "my-sub") or
	// a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub").
	// If a subscription ID is provided, the project ID from the client is used.
	sub := client.Subscriber(subID)
	// NumGoroutines determines the number of streams sub.Receive will spawn to pull
	// messages. It is recommended to set this to 1, unless your throughput
	// is greater than 10 MB/s, as even having 1 stream can still result in
	// messages being handled asynchronously.
	sub.ReceiveSettings.NumGoroutines = 1
	// MaxOutstandingMessages limits the number of concurrent handlers of messages.
	// In this case, up to 8 unacked messages can be handled concurrently.
	sub.ReceiveSettings.MaxOutstandingMessages = 8

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var received int32

	// Receive blocks until the context is cancelled or an error occurs.
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		atomic.AddInt32(&received, 1)
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("sub.Receive returned error: %w", err)
	}
	fmt.Fprintf(w, "Received %d messages\n", received)

	return nil
}

Java

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Panduan memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub Java API.


import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithConcurrencyControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithConcurrencyControlExample(projectId, subscriptionId);
  }

  public static void subscribeWithConcurrencyControlExample(
      String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      // Provides an executor service for processing messages. The default `executorProvider` used
      // by the subscriber has a default thread count of 5.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      // `setParallelPullCount` determines how many StreamingPull streams the subscriber will open
      // to receive message. It defaults to 1. `setExecutorProvider` configures an executor for the
      // subscriber to process messages. Here, the subscriber is configured to open 2 streams for
      // receiving messages, each stream creates a new executor with 4 threads to help process the
      // message callbacks. In total 2x4=8 threads are used for message processing.
      subscriber =
          Subscriber.newBuilder(subscriptionName, receiver)
              .setParallelPullCount(2)
              .setExecutorProvider(executorProvider)
              .build();

      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Ruby

Contoh berikut menggunakan library klien Pub/Sub Ruby v3. Jika Anda masih menggunakan library v2, lihat panduan migrasi ke v3. Untuk melihat daftar contoh kode Ruby v2, lihat contoh kode yang tidak digunakan lagi.

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Ruby di Panduan memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Ruby Pub/Sub.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::PubSub.new
subscriber = pubsub.subscriber subscription_id

# Use 2 threads for streaming, 4 threads for executing callbacks and 2 threads
# for sending acknowledgements and/or delays
listener = subscriber.listen streams: 2, threads: {
  callback: 4,
  push:     2
} do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

listener.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
listener.stop.wait!

Langkah berikutnya

Baca opsi penayangan lainnya yang dapat Anda konfigurasi untuk langganan: