Membuat pipeline Dataflow menggunakan Go

Halaman ini menunjukkan cara menggunakan Apache Beam SDK untuk Go guna membuat program yang menentukan pipeline. Kemudian, Anda menjalankan pipeline secara lokal dan di layanan Dataflow. Untuk pengantar tentang pipeline WordCount, lihat video Cara menggunakan WordCount di Apache Beam.

Sebelum memulai

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. Jika Anda menggunakan penyedia identitas (IdP) eksternal, Anda harus login ke gcloud CLI dengan identitas gabungan Anda terlebih dahulu.

  4. Untuk melakukan inisialisasi gcloud CLI, jalankan perintah berikut:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com
  8. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  10. Install the Google Cloud CLI.

  11. Jika Anda menggunakan penyedia identitas (IdP) eksternal, Anda harus login ke gcloud CLI dengan identitas gabungan Anda terlebih dahulu.

  12. Untuk melakukan inisialisasi gcloud CLI, jalankan perintah berikut:

    gcloud init
  13. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  14. Verify that billing is enabled for your Google Cloud project.

  15. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com
  16. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  18. Beri peran ke akun layanan default Compute Engine Anda. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • Ganti PROJECT_ID dengan project ID Anda.
    • Ganti PROJECT_NUMBER dengan nomor project Anda. Untuk menemukan nomor project Anda, lihat Mengidentifikasi project atau gunakan perintah gcloud projects describe.
    • Ganti SERVICE_ACCOUNT_ROLE dengan setiap peran individual.
  19. Create a Cloud Storage bucket and configure it as follows:
    • Set the storage class to S (Standar).
    • Tetapkan lokasi penyimpanan sebagai berikut: US (Amerika Serikat).
    • Ganti BUCKET_NAME dengan nama bucket yang unik. Jangan sertakan informasi sensitif pada nama bucket karena namespace bucket bersifat global dan dapat dilihat publik.
    • gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
    • Salin ID project Google Cloud dan nama bucket Cloud Storage. Anda memerlukan nilai ini nanti dalam panduan memulai ini.
    • Menyiapkan lingkungan pengembangan

      Apache Beam SDK adalah model pemrograman open source untuk pipeline data. Anda menentukan pipeline dengan program Apache Beam, lalu memilih runner, seperti Dataflow, untuk menjalankan pipeline Anda.

      Sebaiknya gunakan Go versi terbaru saat bekerja dengan Apache Beam SDK untuk Go. Jika Anda belum menginstal Go versi terbaru, gunakan Panduan download dan penginstalan Go untuk mendownload dan menginstal Go untuk sistem operasi Anda.

      Untuk memverifikasi versi Go yang telah Anda instal, jalankan perintah berikut di terminal lokal Anda:

      go version

      Menjalankan contoh wordcount Beam

      Apache Beam SDK untuk Go mencakup contoh pipeline wordcount. Contoh wordcount melakukan hal berikut:

      1. Membaca file teks sebagai input. Secara default, fungsi ini membaca file teks yang berada di bucket Cloud Storage dengan nama resource gs://dataflow-samples/shakespeare/kinglear.txt.
      2. Mengurai setiap baris menjadi kata-kata.
      3. Melakukan penghitungan frekuensi pada kata yang di-tokenisasi.

      Untuk menjalankan contoh Beam wordcount versi terbaru di komputer lokal Anda, lakukan langkah-langkah berikut:

      1. Gunakan perintah git clone untuk meng-clone repositori GitHub apache/beam:

        git clone https://github.com/apache/beam.git
      2. Beralih ke direktori beam/sdks/go:

        cd beam/sdks/go
      3. Gunakan perintah berikut untuk menjalankan pipeline:

        go run examples/wordcount/wordcount.go \
          --input gs://dataflow-samples/shakespeare/kinglear.txt \
          --output outputs

        Flag input menentukan file yang akan dibaca, dan flag output menentukan nama file untuk output jumlah frekuensi.

      Setelah pipeline selesai, lihat hasil output:

      more outputs*

      Untuk keluar, tekan q.

      Ubah kode pipeline

      Pipeline Beam wordcount membedakan antara kata-kata huruf besar dan huruf kecil. Langkah-langkah berikut menunjukkan cara membuat modul Go Anda sendiri, mengubah pipeline wordcount sehingga pipeline tidak peka huruf besar/kecil, dan menjalankannya di Dataflow.

      Membuat modul Go

      Untuk melakukan perubahan pada kode pipeline, ikuti langkah-langkah berikut.

      1. Buat direktori untuk modul Go Anda di lokasi pilihan Anda:

        mkdir wordcount
        cd wordcount
      2. Buat modul Go. Untuk contoh ini, gunakan example/dataflow sebagai jalur modul.

        go mod init example/dataflow
      3. Download salinan terbaru kode wordcount dari repositori GitHub Apache Beam. Masukkan file ini ke dalam direktori wordcount yang Anda buat.

      4. Jika Anda menggunakan sistem operasi non-Linux, Anda harus mendapatkan paket unix Go. Paket ini diperlukan untuk menjalankan pipeline di layanan Dataflow.

        go get -u golang.org/x/sys/unix
      5. Pastikan file go.mod cocok dengan kode sumber modul:

        go mod tidy

      Menjalankan pipeline yang tidak diubah

      Verifikasi bahwa pipeline wordcount yang tidak dimodifikasi berjalan secara lokal.

      1. Dari terminal, buat dan jalankan pipeline secara lokal:

         go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
             --output outputs
      2. Lihat hasil output:

         more outputs*
      3. Untuk keluar, tekan q.

      Mengubah kode pipeline

      Untuk mengubah pipeline agar tidak peka huruf besar/kecil, ubah kode untuk menerapkan fungsi strings.ToLower ke semua kata.

      1. Di editor pilihan Anda, buka file wordcount.go.

      2. Periksa blok init (komentar telah dihapus agar lebih jelas):

         func init() {
           register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
           register.Function2x1(formatFn)
           register.Emitter1[string]()
         }
        
      3. Tambahkan baris baru untuk mendaftarkan fungsi strings.ToLower:

         func init() {
           register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
           register.Function2x1(formatFn)
           register.Emitter1[string]()
           register.Function1x1(strings.ToLower)
         }
        
      4. Periksa fungsi CountWords:

         func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
           s = s.Scope("CountWords")
        
           // Convert lines of text into individual words.
           col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
        
           // Count the number of times each word occurs.
           return stats.Count(s, col)
         }
        
      5. Untuk mengubah huruf kecil kata, tambahkan ParDo yang menerapkan strings.ToLower ke setiap kata:

         func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
           s = s.Scope("CountWords")
        
           // Convert lines of text into individual words.
           col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
        
           // Map all letters to lowercase.
           lowercaseWords := beam.ParDo(s, strings.ToLower, col)
        
           // Count the number of times each word occurs.
           return stats.Count(s, lowercaseWords)
         }
        
      6. Simpan file.

      Jalankan pipeline yang diperbarui secara lokal

      Jalankan pipeline wordcount yang telah diupdate secara lokal dan verifikasi bahwa output telah berubah.

      1. Bangun dan jalankan pipeline wordcount yang telah diubah:

         go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
             --output outputs
      2. Lihat hasil output dari pipeline yang diubah. Semua kata harus berupa huruf kecil.

         more outputs*
      3. Untuk keluar, tekan q.

      Menjalankan pipeline di layanan Dataflow

      Untuk menjalankan contoh wordcount yang telah diupdate di layanan Dataflow, gunakan perintah berikut:

      go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
          --output gs://BUCKET_NAME/results/outputs \
          --runner dataflow \
          --project PROJECT_ID \
          --region DATAFLOW_REGION \
          --staging_location gs://BUCKET_NAME/binaries/

      Ganti kode berikut:

      • BUCKET_NAME: nama bucket Cloud Storage.

      • PROJECT_ID: Google Cloud project ID.

      • DATAFLOW_REGION: region tempat Anda ingin men-deploy tugas Dataflow. Misalnya, europe-west1. Untuk mengetahui daftar lokasi yang tersedia, lihat Lokasi Dataflow. Flag --region menggantikan region default yang ditetapkan di server metadata, klien lokal, atau variabel lingkungan.

      Melihat hasil Anda

      Anda dapat melihat daftar tugas Dataflow di konsolGoogle Cloud . Di konsol Google Cloud , buka halaman Jobs Dataflow.

      Buka Tugas

      Halaman Jobs menampilkan detail tugas wordcount Anda, termasuk status Running pada awalnya, lalu Succeeded.

      Saat Anda menjalankan pipeline menggunakan Dataflow, hasilnya akan disimpan di bucket Cloud Storage. Lihat hasil output menggunakan Google Cloud console atau terminal lokal.

      Konsol

      Untuk melihat hasil di konsol Google Cloud , buka halaman Cloud Storage Buckets.

      Buka Buckets

      Dari daftar bucket di project Anda, klik bucket penyimpanan yang Anda buat sebelumnya. File output yang dibuat oleh tugas Anda ditampilkan di direktori results.

      Terminal

      Lihat hasil dari terminal Anda atau menggunakan Cloud Shell.

      1. Untuk mencantumkan file output, gunakan perintah gcloud storage ls:

        gcloud storage ls gs://BUCKET_NAME/results/outputs* --long

        Ganti BUCKET_NAME dengan nama bucket Cloud Storage output yang ditentukan.

      2. Untuk melihat hasil dalam file output, gunakan perintah gcloud storage cat:

        gcloud storage cat gs://BUCKET_NAME/results/outputs*

      Pembersihan

      Agar tidak menimbulkan biaya pada akun Google Cloud Anda untuk resource yang digunakan di halaman ini, hapus project Google Cloud yang berisi resource tersebut.

      1. Hapus bucket:
        gcloud storage buckets delete BUCKET_NAME
      2. Jika Anda mempertahankan project, batalkan peran yang Anda berikan ke akun layanan default Compute Engine. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut:

        • roles/dataflow.admin
        • roles/dataflow.worker
        • roles/storage.objectAdmin
        gcloud projects remove-iam-policy-binding PROJECT_ID \
            --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
            --role=SERVICE_ACCOUNT_ROLE
      3. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

        gcloud auth application-default revoke
      4. Optional: Revoke credentials from the gcloud CLI.

        gcloud auth revoke

      Langkah berikutnya