格式化收到的事件

管道會將匯流排連結至目標目的地,並將事件訊息轉送至該目的地。您可以設定管道,以特定格式接收事件資料,也可以在事件傳送至目的地之前,將事件資料轉換為另一個支援的格式。舉例來說,您可能需要將事件傳送至只接受 Avro 資料的端點。

支援的格式

系統支援下列格式轉換:

  • Avro 轉換為 JSON
  • Avro 轉換為 Protobuf
  • JSON 轉換為 Avro
  • JSON 轉 Protobuf
  • Protobuf 轉換為 Avro
  • Protobuf 轉為 JSON

注意事項:

  • 轉換事件格式時,只有事件酬載會轉換,整個事件訊息則不會。

  • 如果管道指定了傳入資料格式,所有事件都必須符合該格式。如果事件不符合預期格式,系統會視為持續性錯誤

  • 如果未指定管道的輸入資料格式,就無法設定輸出格式。

  • 在轉換特定目的地的事件格式前,系統會先套用所有已設定的資料轉換

  • 除非您指定訊息繫結,否則事件一律會採用二進位內容模式,透過 HTTP 要求以 CloudEvents 格式傳送

  • 系統會動態偵測 JSON 結構定義。如果是 Protobuf 結構定義,您只能定義一個頂層型別,且系統不支援參照其他型別的匯入陳述式。沒有 syntax 識別碼的結構定義預設為 proto2。請注意,結構定義大小有限制

設定管道來格式化事件

您可以在 Google Cloud 控制台中或使用 gcloud CLI,設定管道預期接收特定格式的事件資料,或將事件資料從一種格式轉換為另一種格式。

控制台

  1. 在 Google Cloud 控制台中,依序前往「Eventarc」>「Pipelines」(管道) 頁面。

    前往 Pipelines

  2. 您可以建立管道,或是點按管道名稱來更新管道。

  3. 在「Pipeline details」(管道詳細資料) 頁面中,按一下 「Edit」(編輯)

  4. 在「事件中介服務」窗格中,執行下列操作:

    1. 選取「套用轉換」核取方塊。
    2. 在「Inbound format」清單中,選取適用的格式。

      請注意,如果為管道指定了輸入資料格式,所有事件都必須符合該格式。任何格式不符預期的事件都會視為持續性錯誤

    3. 如果是 Avro 或 Protobuf 格式,您必須指定傳入結構定義。 (選用) 您可以上傳傳入的結構定義,不必直接指定。

    4. 在「CEL 運算式」欄位中,使用 CEL 編寫轉換運算式。

    5. 按一下「繼續」

  5. 在「目的地」窗格中,執行下列操作:

    1. 如適用,請在「Outbound format」(輸出格式) 清單中選取格式。

      請注意,如果管道指定輸入資料格式,就無法設定輸出格式。

    2. 選用:套用訊息繫結。詳情請參閱訊息繫結

  6. 按一下 [儲存]

    更新管道可能需要幾分鐘的時間。

gcloud

  1. 開啟終端機。

  2. 您可以建立管道,也可以使用 gcloud eventarc pipelines update 指令更新管道:

    gcloud eventarc pipelines update PIPELINE_NAME \
        --location=REGION \
        --INPUT_PAYLOAD_FLAG \
        --destinations=OUTPUT_PAYLOAD_KEY

    更改下列內容:

    • PIPELINE_NAME:管道的 ID 或完整名稱
    • REGION支援的 Eventarc Advanced 位置

      或者,您也可以設定 gcloud CLI 位置屬性:

      gcloud config set eventarc/location REGION
      
    • INPUT_PAYLOAD_FLAG:輸入資料格式標記,可以是下列其中一種:

      • --input-payload-format-avro-schema-definition
      • --input-payload-format-json
      • --input-payload-format-protobuf-schema-definition

      請注意,如果為管道指定輸入資料格式,所有事件都必須符合該格式。任何格式不符預期的事件都會視為持續性錯誤

    • OUTPUT_PAYLOAD_KEY:輸出資料格式鍵,可以是下列其中一項:

      • output_payload_format_avro_schema_definition
      • output_payload_format_json
      • output_payload_format_protobuf_schema_definition

      請注意,如果設定輸出資料格式鍵,也必須指定輸入資料格式標記。

    更新管道可能需要幾分鐘的時間。

    範例:

    以下範例使用 --input-payload-format-protobuf-schema-definition 旗標指定管道應以 Protobuf 資料格式接收事件,並採用特定結構定義:

    gcloud eventarc pipelines update my-pipeline \
        --input-payload-format-protobuf-schema-definition \
    '
      syntax = "proto3";
      message schema {
        string name = 1;
        string severity = 2;
      }
    '

    下例使用 output_payload_format_avro_schema_definition 金鑰和 --input-payload-format-avro-schema-definition 旗標建立管道,該管道會以 Avro 格式接收事件,並以相同格式輸出事件:

    gcloud eventarc pipelines create my-pipeline \
        --location=us-central1 \
        --destinations=http_endpoint_uri='https://example-endpoint.com',output_payload_format_avro_schema_definition='{"type": "record", "name": "my_record", "fields": [{"name": "my_field", "type": "string"}]}' \
        --input-payload-format-avro-schema-definition='{"type": "record", "name": "my_record", "fields": [{"name": "my_field", "type": "string"}]}'

    以下範例使用 output_payload_format_protobuf_schema_definition 鍵和 --input-payload-format-avro-schema-definition 旗標更新管道,並使用結構定義將管道的事件資料從 Avro 轉換為 Protobuf:

    gcloud eventarc pipelines update my-pipeline \
        --location=us-central1 \
        --destinations=output_payload_format_protobuf_schema_definition='message MessageProto {string prop1 = 1; string prop2 = 2;}' \
        --input-payload-format-avro-schema-definition= \
        '
        {
          "type": "record",
          "name": "MessageProto",
          "fields": [
            { "name" : "prop1", "type": "string" },
            { "name" : "prop2", "type": "string" },
          ]
        }
        '