Testare, sincronizzare ed eseguire il deployment dei DAG da GitHub

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Questa guida spiega come creare una pipeline CI/CD per testare, sincronizzare ed eseguire il deployment dei DAG nell'ambiente Cloud Composer dal repository GitHub.

Se vuoi sincronizzare solo i dati di altri servizi, consulta Trasferire dati da altri servizi.

Panoramica della pipeline CI/CD

Diagramma dell'architettura che mostra i passaggi del flusso. La revisione pre-invio e delle richieste pull si trova nella sezione GitHub, mentre la sincronizzazione DAG e la verifica manuale DAG si trovano nella sezione Google Cloud .
Figura 1. Diagramma dell'architettura che mostra i passaggi del flusso (fai clic per ingrandire)

La pipeline CI/CD per testare, sincronizzare ed eseguire il deployment dei DAG prevede i seguenti passaggi:

  1. Apporti una modifica a un DAG ed esegui il push di questa modifica a un ramo di sviluppo nel tuo repository.

  2. Apri una richiesta di pull rispetto al ramo principale del repository.

  3. Cloud Build esegue test delle unità per verificare che il DAG sia valido.

  4. La tua richiesta di pull è stata approvata e unita al ramo principale del tuo repository.

  5. Cloud Build sincronizza l'ambiente Cloud Composer di sviluppo con queste nuove modifiche.

  6. Verifica che il DAG si comporti come previsto nell'ambiente di sviluppo.

  7. Se il DAG funziona come previsto, caricalo nell'ambiente Cloud Composer di produzione.

Obiettivi

Prima di iniziare

  • Questa guida presuppone che tu stia lavorando con due ambienti Cloud Composer identici: un ambiente di sviluppo e un ambiente di produzione.

    Ai fini di questa guida, stai configurando una pipeline CI/CD solo per il tuo ambiente di sviluppo. Assicurati che l'ambiente che utilizzi non sia un ambiente di produzione.

  • Questa guida presuppone che tu abbia archiviato i tuoi DAG e i relativi test in un repository GitHub.

    La pipeline CI/CD di esempio mostra i contenuti di un repository di esempio. I DAG e i test vengono archiviati nella directory dags/, con i file dei requisiti, il file dei vincoli e i file di configurazione di Cloud Build archiviati al primo livello. L'utilità di sincronizzazione DAG e i relativi requisiti si trovano nella directory utils.

Crea un job di controllo pre-invio e test unitari

Il primo job Cloud Build esegue un controllo pre-invio, che esegue test unitari per i tuoi DAG.

Aggiungere test delle unità

Se non l'hai ancora fatto, crea test unitari per i tuoi DAG. Salva questi test insieme ai DAG nel repository, ciascuno con il suffisso _test. Ad esempio, il file di test per il DAG in example_dag.py è example_dag_test.py. Questi sono i test eseguiti come controllo pre-invio nel repository.

Crea la configurazione YAML di Cloud Build per il controllo pre-invio

Nel repository, crea un file YAML denominato test-dags.cloudbuild.yaml che configura il job Cloud Build per i controlli pre-invio. che prevede tre passaggi:

  1. Installa le dipendenze necessarie per i tuoi DAG.
  2. Installa le dipendenze necessarie per i test delle unità.
  3. Esegui i test DAG.

steps:
  # install dependencies
  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements.txt", "-c", "constraints.txt", "--user"]

  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements-test.txt", "--user"]

  # run in python 3.8 which is latest version in Cloud Composer
  - name: python:3.8-slim
    entrypoint: python3.8
    args: ["-m", "pytest", "-s", "dags/"]

Crea il trigger di Cloud Build per il controllo pre-invio

Segui la guida Creazione di repository da GitHub per creare un trigger basato su un'app GitHub con le seguenti configurazioni:

  • Nome: test-dags

  • Evento: richiesta di pull

  • Origine - Repository: scegli il repository

  • Origine - Branch di base: ^main$ (modifica main con il nome del branch di base del repository, se necessario)

  • Fonte - Controllo dei commenti: non richiesto

  • Configurazione build - File di configurazione Cloud Build: /test-dags.cloudbuild.yaml (il percorso del file di build)

Crea un job di sincronizzazione DAG e aggiungi lo script di utilità DAG

Successivamente, configura un job Cloud Build che esegue uno script di utilità DAG. Lo script di utilità in questo job sincronizza i DAG con l'ambiente Cloud Composer dopo che sono stati uniti al ramo principale nel repository.

Aggiungere lo script di utilità DAGs

Aggiungi lo script dell'utilità DAG al tuo repository. Questo script di utilità copia tutti i file DAG nella directory dags/ del tuo repository in una directory temporanea, ignorando tutti i file Python non DAG. Lo script utilizza quindi la libreria client di Cloud Storage per caricare tutti i file dalla directory temporanea alla directory dags/ nel bucket dell'ambiente Cloud Composer.

from __future__ import annotations

import argparse
import glob
import os
from shutil import copytree, ignore_patterns
import tempfile

# Imports the Google Cloud client library
from google.cloud import storage


def _create_dags_list(dags_directory: str) -> tuple[str, list[str]]:
    temp_dir = tempfile.mkdtemp()

    # ignore non-DAG Python files
    files_to_ignore = ignore_patterns("__init__.py", "*_test.py")

    # Copy everything but the ignored files to a temp directory
    copytree(dags_directory, f"{temp_dir}/", ignore=files_to_ignore, dirs_exist_ok=True)

    # The only Python files left in our temp directory are DAG files
    # so we can exclude all non Python files
    dags = glob.glob(f"{temp_dir}/*.py")
    return (temp_dir, dags)


def upload_dags_to_composer(
    dags_directory: str, bucket_name: str, name_replacement: str = "dags/"
) -> None:
    """
    Given a directory, this function moves all DAG files from that directory
    to a temporary directory, then uploads all contents of the temporary directory
    to a given cloud storage bucket
    Args:
        dags_directory (str): a fully qualified path to a directory that contains a "dags/" subdirectory
        bucket_name (str): the GCS bucket of the Cloud Composer environment to upload DAGs to
        name_replacement (str, optional): the name of the "dags/" subdirectory that will be used when constructing the temporary directory path name Defaults to "dags/".
    """
    temp_dir, dags = _create_dags_list(dags_directory)

    if len(dags) > 0:
        # Note - the GCS client library does not currently support batch requests on uploads
        # if you have a large number of files, consider using
        # the Python subprocess module to run gsutil -m cp -r on your dags
        # See https://cloud.google.com/storage/docs/gsutil/commands/cp for more info
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)

        for dag in dags:
            # Remove path to temp dir
            dag = dag.replace(f"{temp_dir}/", name_replacement)

            try:
                # Upload to your bucket
                blob = bucket.blob(dag)
                blob.upload_from_filename(dag)
                print(f"File {dag} uploaded to {bucket_name}/{dag}.")
            except FileNotFoundError:
                current_directory = os.listdir()
                print(
                    f"{name_replacement} directory not found in {current_directory}, you may need to override the default value of name_replacement to point to a relative directory"
                )
                raise

    else:
        print("No DAGs to upload.")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
    )
    parser.add_argument(
        "--dags_directory",
        help="Relative path to the source directory containing your DAGs",
    )
    parser.add_argument(
        "--dags_bucket",
        help="Name of the DAGs bucket of your Composer environment without the gs:// prefix",
    )

    args = parser.parse_args()

    upload_dags_to_composer(args.dags_directory, args.dags_bucket)

Crea la configurazione YAML di Cloud Build per sincronizzare i DAG

Nel repository, crea un file YAML denominato add-dags-to-composer.cloudbuild.yaml che configuri il job Cloud Build per la sincronizzazione dei DAG. che prevede due passaggi:

  1. Installa le dipendenze necessarie per lo script di utilità DAG.

  2. Esegui lo script di utilità per sincronizzare i DAG nel repository con l'ambiente Cloud Composer.

steps:
  # install dependencies
  - name: python
    entrypoint: pip
    args: ["install", "-r", "utils/requirements.txt", "--user"]

  # run
  - name: python
    entrypoint: python
    args: ["utils/add_dags_to_composer.py", "--dags_directory=${_DAGS_DIRECTORY}", "--dags_bucket=${_DAGS_BUCKET}"]

Crea il trigger di Cloud Build

Segui la guida Creazione di repository da GitHub per creare un trigger basato su un'app GitHub con le seguenti configurazioni:

  • Nome: add-dags-to-composer

  • Evento: push al ramo

  • Origine - Repository: scegli il repository

  • Origine - Branch di base: ^main$ (modifica main con il nome del branch di base del repository, se necessario)

  • Origine - Filtro dei file inclusi (glob): dags/**

  • Configurazione build - File di configurazione Cloud Build: /add-dags-to-composer.cloudbuild.yaml (il percorso del file di build)

Nella configurazione avanzata, aggiungi due variabili di sostituzione:

  • _DAGS_DIRECTORY: la directory in cui si trovano i DAG nel repository. Se utilizzi il repository di esempio di questa guida, è dags/.

  • _DAGS_BUCKET: il bucket Cloud Storage che contiene la directory dags/ nell'ambiente Cloud Composer di sviluppo. Ometti il prefisso gs://. Ad esempio: us-central1-example-env-1234ab56-bucket.

Testa la pipeline CI/CD

In questa sezione, segui un flusso di sviluppo DAG che utilizza i trigger Cloud Build appena creati.

Esegui un job di pre-invio

Crea una richiesta di pull al ramo principale per testare la build. Individua il controllo pre-invio nella pagina. Fai clic su Dettagli e scegli Visualizza ulteriori dettagli su Google Cloud Build per visualizzare i log di build nella consoleGoogle Cloud .

Screenshot di un controllo GitHub chiamato test-dags con una freccia rossa che punta al nome del progetto tra parentesi
Figura 2. Screenshot dello stato del controllo pre-invio di Cloud Build su GitHub (fai clic per ingrandire)

Se il controllo pre-invio non è andato a buon fine, consulta Risoluzione degli errori di build.

Verifica che il DAG funzioni nell'ambiente Cloud Composer di sviluppo

Una volta approvata la richiesta di pull, uniscila al ramo principale. Utilizza la consoleGoogle Cloud per visualizzare i risultati della build. Se hai molti trigger Cloud Build, puoi filtrare le build in base al nome del trigger add-dags-to-composer.

Una volta completato il job di sincronizzazione di Cloud Build, il DAG sincronizzato viene visualizzato nell'ambiente Cloud Composer di sviluppo. Qui puoi verificare che il DAG funzioni come previsto.

Aggiungere il DAG all'ambiente di produzione

Dopo che il DAG funziona come previsto, aggiungilo manualmente all'ambiente di produzione. Per farlo, carica il file DAG nella directory dags/ del bucket dell'ambiente Cloud Composer di produzione.

Se il job di sincronizzazione DAG non è riuscito o se il DAG non si comporta come previsto nell'ambiente Cloud Composer di sviluppo, consulta la sezione Risoluzione degli errori di build.

Risoluzione degli errori di build

Questa sezione spiega come risolvere scenari comuni di errori di build.

Cosa succede se il controllo pre-invio non è andato a buon fine?

Dalla richiesta di pull, fai clic su Dettagli e scegli Visualizza altri dettagli su Google Cloud Build per visualizzare i log di build nella consoleGoogle Cloud . Utilizza questi log per eseguire il debug del problema relativo al tuo DAG. Una volta risolti i problemi, esegui il commit della correzione e il push nel tuo branch. Il controllo pre-invio viene eseguito di nuovo e puoi continuare a eseguire iterazioni utilizzando i log come strumento di debug.

Cosa succede se il mio job di sincronizzazione del DAG non è andato a buon fine?

Utilizza la console Google Cloud per visualizzare i risultati della build. Se hai molti trigger Cloud Build, puoi filtrare le build in base al nome del trigger add-dags-to-composer. Esamina i log del job di build e risolvi gli errori. Se hai bisogno di ulteriore assistenza per risolvere gli errori, utilizza i canali di assistenza.

Cosa succede se il mio DAG non funziona correttamente nel mio ambiente Cloud Composer?

Se il DAG non funziona come previsto nell'ambiente Cloud Composer di sviluppo, non promuoverlo manualmente nell'ambiente Cloud Composer di produzione. Procedi invece in uno dei seguenti modi:

  • Ripristina la richiesta di pull con le modifiche che hanno interrotto il DAG per ripristinarlo allo stato immediatamente precedente alle modifiche (questa operazione ripristina anche tutti gli altri file nella richiesta di pull).
  • Crea una nuova richiesta di pull per ripristinare manualmente le modifiche al DAG danneggiato.
  • Crea una nuova richiesta di pull per correggere gli errori nel DAG.

L'esecuzione di uno di questi passaggi attiva un nuovo controllo pre-invio e, al momento dell'unione, il job di sincronizzazione DAG.

Passaggi successivi