Solucionar problemas de DAG

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

En esta página, se proporcionan información y pasos para solucionar problemas comunes en el flujo de trabajo.

Algunos problemas de ejecución de DAG pueden deberse a que el programador de Airflow no funciona de forma correcta u óptima. Sigue las instrucciones para solucionar problemas del Programador y resolver estos inconvenientes.

Flujo de trabajo para la solución de problemas

Para comenzar con la solución de problemas, siga estos pasos:

  1. Consulta los registros de Airflow.

    Puedes aumentar el nivel de registro de Airflow anulando la siguiente opción de configuración de Airflow.

    Sección Clave Valor
    logging (core en Airflow 1) logging_level El valor predeterminado es INFO. Establece el valor en DEBUG para obtener más verbosidad en los mensajes de registro.
  2. Consulta el Panel de supervisión.

  3. Revisa Cloud Monitoring.

  4. En la consola de Google Cloud , busca errores en las páginas de los componentes de tu entorno.

  5. En la interfaz web de Airflow, consulta la Vista de gráfico del DAG para ver las instancias de tareas fallidas.

    Sección Clave Valor
    webserver dag_orientation LR, TB, RL, oBT

Depura fallas del operador

Para depurar una falla del operador, sigue estos pasos:

  1. Verifica si hay errores específicos de la tarea.
  2. Consulta los registros de Airflow.
  3. Revisa Cloud Monitoring.
  4. Verifica los registros específicos del operador.
  5. Corrige los errores.
  6. Sube el DAG a la carpeta /dags.
  7. En la interfaz web de Airflow, borra los estados anteriores del DAG.
  8. Reanuda o ejecuta el DAG.

Soluciona problemas de ejecución de tareas

Airflow es un sistema distribuido con muchas entidades, como el programador, el ejecutor y los trabajadores, que se comunican entre sí a través de una lista de tareas en cola y la base de datos de Airflow, y envían señales (como SIGTERM). En el siguiente diagrama, se muestra una descripción general de las interconexiones entre los componentes de Airflow.

Interacción entre los componentes de Airflow
Figura 1. Interacción entre los componentes de Airflow (haz clic para ampliar)

En un sistema distribuido como Airflow, puede haber algunos problemas de conectividad de red o la infraestructura subyacente puede experimentar problemas intermitentes. Esto puede generar situaciones en las que las tareas pueden fallar y reprogramarse para su ejecución, o las tareas pueden no completarse correctamente (por ejemplo, tareas Zombie o tareas que se atascaron en la ejecución). Airflow tiene mecanismos para lidiar con estas situaciones y reanudar automáticamente el funcionamiento normal. En las siguientes secciones, se explican los problemas comunes que ocurren durante la ejecución de tareas de Airflow: tareas zombi, instancias de finalización y señales SIGTERM.

Solución de problemas relacionados con los procesos zombi

Airflow detecta dos tipos de discrepancias entre una tarea y un proceso que ejecuta la tarea:

  • Las tareas zombi son tareas que deberían estar en ejecución, pero no lo están. Esto puede ocurrir si se finalizó el proceso de la tarea o no responde, si el trabajador de Airflow no informó el estado de la tarea a tiempo porque está sobrecargado o si se apagó la VM en la que se ejecuta la tarea. Airflow encuentra estas tareas de forma periódica y las reintenta o las marca como fallidas, según la configuración de la tarea.

    Descubre tareas zombi

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("airflow-scheduler")
    textPayload:"Detected zombie job"
  • Las tareas zombie son tareas que no deberían estar en ejecución. Airflow encuentra estas tareas periódicamente y las finaliza.

En las siguientes secciones, se describen los motivos y las soluciones más comunes para las tareas Zombie.

El trabajador de Airflow se quedó sin memoria

Cada trabajador de Airflow puede ejecutar hasta [celery]worker_concurrency instancias de tareas de forma simultánea. Si el consumo de memoria acumulativo de esas instancias de tareas supera el límite de memoria de un trabajador de Airflow, se finaliza un proceso aleatorio en él para liberar recursos.

Descubre eventos de memoria insuficiente del trabajador de Airflow

resource.type="k8s_node"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
log_id("events")
jsonPayload.message:"Killed process"
jsonPayload.message:("airflow task" OR "celeryd")

A veces, la falta de memoria en un trabajador de Airflow puede provocar que se envíen paquetes con formato incorrecto durante una sesión de SQL Alchemy a la base de datos, a un servidor DNS o a cualquier otro servicio al que llame un DAG. En este caso, el otro extremo de la conexión podría rechazar o descartar las conexiones del trabajador de Airflow. Por ejemplo:

"UNKNOWN:Error received from peer
{created_time:"2024-11-31T10:09:52.217738071+00:00", grpc_status:14,
grpc_message:"failed to connect to all addresses; last error: UNKNOWN:
ipv4:<ip address>:443: handshaker shutdown"}"

Soluciones:

Se expulsó el trabajador de Airflow

Las expulsiones de Pods son una parte normal de la ejecución de cargas de trabajo en Kubernetes. GKE expulsa los Pods si se quedaron sin almacenamiento o para liberar recursos para cargas de trabajo con mayor prioridad.

Descubre las expulsiones de trabajadores de Airflow

resource.type="k8s_pod"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
resource.labels.pod_name:"airflow-worker"
log_id("events")
jsonPayload.reason="Evicted"

Soluciones:

Se cerró el trabajador de Airflow

Es posible que los trabajadores de Airflow se quiten de forma externa. Si las tareas en ejecución no finalizan durante un período de finalización ordenado, se interrumpen y es posible que se detecten como zombis.

Descubre las finalizaciones de los pods de trabajadores de Airflow

resource.type="k8s_cluster"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
protoPayload.methodName:"pods.delete"
protoPayload.response.metadata.name:"airflow-worker"

Situaciones y soluciones posibles:

  • Los trabajadores de Airflow se reinician durante las modificaciones del entorno, como las actualizaciones o la instalación de paquetes:

    Descubre las modificaciones del entorno de Composer

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("cloudaudit.googleapis.com%2Factivity")

    Puedes realizar estas operaciones cuando no se estén ejecutando tareas críticas o habilitar los reintentos de tareas.

  • Es posible que varios componentes no estén disponibles temporalmente durante las operaciones de mantenimiento.

    Descubre las operaciones de mantenimiento de GKE

    resource.type="gke_nodepool"
    resource.labels.cluster_name="GKE_CLUSTER_NAME"
    protoPayload.metadata.operationType="UPGRADE_NODES"

    Puedes especificar períodos de mantenimiento para minimizar

    se superpone con la ejecución de las tareas críticas.

El trabajador de Airflow estaba bajo una carga pesada

La cantidad de recursos de CPU y memoria disponibles para un trabajador de Airflow está limitada por la configuración del entorno. Si el uso de recursos se acerca a los límites, es posible que se produzca una contención de recursos y demoras innecesarias durante la ejecución de la tarea. En situaciones extremas, cuando faltan recursos durante períodos más largos, esto puede causar tareas zombie.

Soluciones:

La base de datos de Airflow estaba bajo una carga pesada

Varios componentes de Airflow utilizan una base de datos para comunicarse entre sí y, en particular, para almacenar los latidos de las instancias de tareas. La escasez de recursos en la base de datos genera tiempos de consulta más largos y puede afectar la ejecución de tareas.

A veces, los siguientes errores están presentes en los registros de un trabajador de Airflow:

(psycopg2.OperationalError) connection to server at <IP address>,
port 3306 failed: server closed the connection unexpectedly

This probably means the server terminated abnormally before or while
processing the request.

Soluciones:

La base de datos de Airflow no estuvo disponible temporalmente

Un trabajador de Airflow puede tardar en detectar y controlar correctamente los errores intermitentes, como los problemas de conectividad temporales. Es posible que supere el umbral predeterminado de detección de zombies.

Descubre los tiempos de espera de la señal de monitoreo de Airflow

resource.type="cloud_composer_environment"
resource.labels.environment_name="ENVIRONMENT_NAME"
log_id("airflow-worker")
textPayload:"Heartbeat time limit exceeded"

Soluciones:

  • Aumenta el tiempo de espera para las tareas zombi y anula el valor de la opción de configuración de Airflow [scheduler]scheduler_zombie_task_threshold:

    Sección Clave Valor Notas
    scheduler scheduler_zombie_task_threshold Nuevo tiempo de espera (en segundos) El valor predeterminado es 300.

Soluciona problemas relacionados con la finalización de instancias

Airflow usa el mecanismo de instancia de finalización para cerrar las tareas de Airflow. Este mecanismo se usa en las siguientes situaciones:

  • Cuando un programador finaliza una tarea que no se completó a tiempo.
  • Cuando una tarea agota el tiempo de espera o se ejecuta durante demasiado tiempo.

Cuando Airflow finaliza las instancias de tareas, puedes ver las siguientes entradas de registro en los registros de un trabajador de Airflow que ejecutó la tarea:

  INFO - Subtask ... WARNING - State of this instance has been externally set
  to success. Terminating instance.
  INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
  INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.

Soluciones posibles:

  • Verifica el código de la tarea para detectar errores que puedan hacer que se ejecute durante demasiado tiempo.

  • Aumenta el valor de la opción de configuración de Airflow [celery_broker_transport_options]visibility_timeout.

    Como resultado, el programador espera más tiempo a que finalice una tarea antes de considerarla una tarea zombie. Esta opción es especialmente útil para las tareas que llevan mucho tiempo y duran varias horas. Si el valor es demasiado bajo (por ejemplo, 3 horas), el programador considera que las tareas que se ejecutan durante 5 o 6 horas están "colgadas" (tareas zombie).

  • Aumenta el valor de la opción de configuración de Airflow [core]killed_task_cleanup_time.

    Un valor más largo proporciona más tiempo a los trabajadores de Airflow para finalizar sus tareas correctamente. Si el valor es demasiado bajo, las tareas de Airflow podrían interrumpirse de forma abrupta, sin tiempo suficiente para finalizar su trabajo correctamente.

Solución de problemas relacionados con los indicadores SIGTERM

Linux, Kubernetes, el programador de Airflow y Celery usan señales SIGTERM para finalizar los procesos responsables de ejecutar los trabajadores o las tareas de Airflow.

Puede haber varios motivos por los que se envían señales SIGTERM en un entorno:

  • Una tarea se convirtió en tarea zombi y debe detenerse.

  • El programador descubrió un duplicado de una tarea y envía señales de SIGTERM y de finalización de la instancia a la tarea para detenerla.

  • En el ajuste de escala automático horizontal de Pods, el plano de control de GKE envía señales SIGTERM para quitar los Pods que ya no son necesarios.

  • El programador puede enviar indicadores SIGTERM al proceso DagFileProcessorManager. El programador usa estas señales SIGTERM para administrar el ciclo de vida del proceso DagFileProcessorManager y se pueden ignorar de forma segura.

    Ejemplo:

    Launched DagFileProcessorManager with pid: 353002
    Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: []
    Sending the signal Signals.SIGTERM to group 353002
    Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
    
  • Condición de carrera entre la devolución de llamada de latido y las devoluciones de llamada de salida en el local_task_job, que supervisa la ejecución de la tarea. Si el latido detecta que una tarea se marcó como exitosa, no puede distinguir si la tarea en sí se completó correctamente o si se le indicó a Airflow que la considere exitosa. Sin embargo, finalizará un ejecutor de tareas sin esperar a que salga.

    Estos indicadores SIGTERM se pueden ignorar sin inconvenientes. La tarea ya está en el estado correcto y la ejecución de la DAG en su totalidad no se verá afectada.

    La entrada de registro Received SIGTERM. es la única diferencia entre la salida normal y la finalización de la tarea en el estado correcto.

    Condición de carrera entre las devoluciones de llamada de latido y salida
    Figura 2. Condición de carrera entre las devoluciones de llamada de latido y salida (haz clic para ampliar)
  • Un componente de Airflow usa más recursos (CPU, memoria) de los que permite el nodo del clúster.

  • El servicio de GKE realiza operaciones de mantenimiento y envía señales SIGTERM a los Pods que se ejecutan en un nodo que está a punto de actualizarse.

    Cuando una instancia de tarea se finaliza con SIGTERM, puedes ver las siguientes entradas de registro en los registros de un trabajador de Airflow que ejecutó la tarea:

    {local_task_job.py:211} WARNING - State of this instance has been externally
    set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
    SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
    with exception
    

Soluciones posibles:

Este problema ocurre cuando una VM que ejecuta la tarea se queda sin memoria. Esto no se relaciona con la configuración de Airflow, sino con la cantidad de memoria disponible para la VM.

  • En Cloud Composer 1, puedes volver a crear tu entorno con un tipo de máquina que tenga más rendimiento.

  • Puedes reducir el valor de la opción de configuración de Airflow de [celery]worker_concurrency concurrencia. Esta opción determina cuántas tareas ejecuta de forma simultánea un trabajador de Airflow determinado.

Consultas de Cloud Logging para descubrir los motivos de los reinicios o las expulsiones de Pods

Los entornos de Cloud Composer usan clústeres de GKE como capa de infraestructura de procesamiento. En esta sección, puedes encontrar consultas útiles que te ayudarán a encontrar los motivos de los reinicios o las expulsiones del trabajador o el programador de Airflow.

Las consultas que se presentan más adelante se pueden ajustar de la siguiente manera:

  • Puedes especificar la línea de tiempo requerida en Cloud Logging. Por ejemplo, las últimas 6 horas, los últimos 3 días o puedes definir tu intervalo de tiempo personalizado.

  • Debes especificar el nombre del clúster de tu entorno en CLUSTER_NAME.

  • Puedes limitar la búsqueda a un Pod específico agregando POD_NAME.

Descubre los contenedores reiniciados

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
  

Consulta alternativa para limitar los resultados a un pod específico:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
  

Descubre los contenedores que se cerraron como resultado de un evento de memoria insuficiente

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    

Consulta alternativa para limitar los resultados a un pod específico:

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

Descubre los contenedores que dejaron de ejecutarse

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    

Consulta alternativa para limitar los resultados a un pod específico:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

Impacto de las operaciones de actualización en las ejecuciones de tareas de Airflow

Las operaciones de actualización interrumpen las tareas de Airflow que se están ejecutando, a menos que una tarea se ejecute en el modo diferible.

Te recomendamos que realices estas operaciones cuando esperes un impacto mínimo en las ejecuciones de tareas de Airflow y que configures los mecanismos de reintento adecuados en tus DAGs y tareas.

Problemas comunes

En las siguientes secciones, se describen los síntomas y las posibles soluciones para algunos problemas comunes del DAG.

Negsignal.SIGKILL interrumpió la tarea de Airflow

A veces, es posible que tu tarea use más memoria de la que se asigna al trabajador de Airflow. En tal situación, Negsignal.SIGKILL podría interrumpirla. El sistema envía este indicador para evitar un mayor consumo de memoria que podría afectar la ejecución de otras tareas de Airflow. En el registro del trabajador de Airflow, es posible que veas la siguiente entrada de registro:

{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL

Negsignal.SIGKILL también puede aparecer como código -9.

Soluciones posibles:

  • worker_concurrency más bajo de los trabajadores de Airflow

  • Actualiza a un tipo de máquina más grande que se use en el clúster de Cloud Composer.

  • Optimiza tus tareas para que usen menos memoria.

La tarea falla sin emitir registros debido a errores de análisis del DAG

A veces, puede haber errores sutiles en el DAG que provoquen una situación en la que el programador de Airflow pueda programar tareas para su ejecución, el procesador de DAG pueda analizar el archivo DAG, pero, luego, el trabajador de Airflow no pueda ejecutar tareas desde el DAG porque hay errores de programación en el archivo DAG. Esto podría generar una situación en la que una tarea de Airflow se marque como Failedy no haya ningún registro de su ejecución.

Soluciones:

  • Verifica en los registros del trabajador de Airflow que no haya errores generados por el trabajador de Airflow relacionados con un DAG faltante o errores de análisis del DAG.

  • Aumenta los parámetros relacionados con el análisis del DAG:

    • Aumenta [dagbag-import-timeout][ext-airflow-dagrun-import-timeout] a, al menos, 120 segundos (o más, si es necesario).

    • Aumenta dag-file-processor-timeout a al menos 180 segundos (o más, si es necesario). Este valor debe ser superior a dagbag-import-timeout.

  • Consulta también Solución de problemas del procesador de DAG.

La tarea falla sin emitir registros debido a la presión de recursos

Síntoma: Durante la ejecución de una tarea, se interrumpe abruptamente el subproceso del trabajador de Airflow responsable de la ejecución de la tarea de Airflow. El error que se ve en el registro del trabajador de Airflow puede ser similar al siguiente:

...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task    R = retval = fun(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__    return self.run(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command    _execute_in_fork(command_to_exec)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...

Solución:

La tarea falla sin emitir registros debido a la expulsión del Pod

Los Pods de Google Kubernetes Engine están sujetos al ciclo de vida de los Pods de Kubernetes y a la expulsión de Pods. Los aumentos repentinos de tareas y la programación conjunta de los trabajadores son dos de las causas más comunes de expulsión de Pods en Cloud Composer.

La expulsión de Pods puede ocurrir cuando un Pod en particular usa recursos de un nodo, en relación con las expectativas de consumo de recursos configuradas para el nodo. Por ejemplo, la expulsión puede ocurrir cuando varias tareas con alto contenido de memoria se ejecutan en un Pod y su carga combinada hace que el nodo en el que se ejecuta este Pod supere el límite de consumo de memoria.

Si se expulsa un Pod de trabajador de Airflow, todas las instancias de tareas que se ejecutan en ese Pod se interrumpen y, luego, Airflow las marca como con errores.

Los registros están almacenados en búfer. Si se expulsa un Pod de trabajador antes de que se vacíe el búfer, no se emiten registros. La falla de la tarea sin registros indica que los trabajadores de Airflow se reiniciaron debido a la falta de memoria (OOM). Algunos registros pueden estar presentes en Cloud Logging, aunque los registros de Airflow no se hayan emitido.

Para ver los registros, haz lo siguiente:

  1. En la consola de Google Cloud , ve a la página Entornos.

    Ir a Entornos

  2. En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.

  3. Ve a la pestaña Registros.

  4. Consulta los registros de los trabajadores individuales de Airflow en Todos los registros > Registros de Airflow > Trabajadores.

Síntoma:

  1. En la consola de Google Cloud , ve a la página Cargas de trabajo.

    Ir a Cargas de trabajo

  2. Si hay pods airflow-worker que muestran Evicted, haz clic en cada pod expulsado y busca el mensaje The node was low on resource: memory en la parte superior de la ventana.

Solución:

  • Crea un nuevo entorno de Cloud Composer 1 con un tipo de máquina más grande que el tipo de máquina actual.

  • Verifica los registros de los pods de airflow-worker para encontrar posibles causas de expulsión. Para obtener más información sobre cómo recuperar registros de Pods individuales, consulta Soluciona problemas con cargas de trabajo implementadas.

  • Asegúrate de que las tareas del DAG sean idempotentes y se puedan reintentar.

  • Evita descargar archivos innecesarios en el sistema de archivos local de los trabajadores de Airflow.

    Los trabajadores de Airflow tienen una capacidad limitada del sistema de archivos local. Cuando se agota el espacio de almacenamiento, el plano de control de GKE expulsa el Pod del trabajador de Airflow. Esto hace que fallen todas las tareas que ejecutaba el trabajador expulsado.

    Ejemplos de operaciones problemáticas:

    • Descarga archivos u objetos y los almacena de forma local en un trabajador de Airflow. En cambio, almacena estos objetos directamente en un servicio adecuado, como un bucket de Cloud Storage.
    • Acceder a objetos grandes en la carpeta /data desde un trabajador de Airflow El trabajador de Airflow descarga el objeto en su sistema de archivos local. En su lugar, implementa tus DAG de modo que los archivos grandes se procesen fuera del Pod de trabajador de Airflow.

La ejecución del DAG no finaliza dentro del tiempo esperado

Síntoma:

A veces, una ejecución de DAG no finaliza porque las tareas de Airflow se bloquean y la ejecución de DAG dura más de lo esperado. En condiciones normales, las tareas de Airflow no permanecen indefinidamente en el estado de en cola o en ejecución, ya que Airflow tiene procedimientos de tiempo de espera y limpieza que ayudan a evitar esta situación.

Solución:

  • Usa el parámetro dagrun_timeout para los DAG. Por ejemplo: dagrun_timeout=timedelta(minutes=120). Como resultado, cada ejecución del DAG debe finalizar dentro del tiempo de espera de la ejecución del DAG. Para obtener más información sobre los estados de las tareas de Airflow, consulta la documentación de Apache Airflow.

  • Usa el parámetro tiempo de espera de ejecución de la tarea para definir un tiempo de espera predeterminado para las tareas que se ejecutan según los operadores de Apache Airflow.

Mayor tráfico de red desde y hacia la base de datos de Airflow

La cantidad de tráfico de red entre el clúster de GKE de tu entorno y la base de datos de Airflow depende de la cantidad de DAG, de tareas en DAG, y de cómo los DAG acceden a los datos en la base de datos de Airflow. Los siguientes factores pueden influir en el uso de la red:

  • Consultas a la base de datos de Airflow Si tus DAG realizan muchas consultas, generan grandes cantidades de tráfico. Por ejemplo, verificar el estado de las tareas antes de continuar con otras tareas, consultar la tabla XCom y volcar el contenido de la base de datos de Airflow.

  • Gran cantidad de tareas. Cuantas más tareas haya para programar, más tráfico de red se generará. Esta consideración se aplica a la cantidad total de tareas en tus DAG y a la frecuencia de programación. Cuando el programador de Airflow programa las ejecuciones de DAG, realiza consultas a la base de datos de Airflow y genera tráfico.

  • La interfaz web de Airflow genera tráfico de red, ya que realiza consultas a la base de datos de Airflow. El uso intensivo de páginas con grafos, tareas y diagramas puede generar grandes volúmenes de tráfico de red.

No programes los DAG generados de forma programática al mismo tiempo

Generar objetos DAG de forma programática a partir de un archivo DAG es un método eficiente para crear muchos DAG similares que solo tienen pequeñas diferencias.

Es importante no programar todos esos DAG para que se ejecuten de inmediato. Es muy probable que los trabajadores de Airflow no tengan suficientes recursos de CPU y memoria para ejecutar todas las tareas programadas al mismo tiempo.

Para evitar problemas con la programación de DAGs programáticos, haz lo siguiente:

  • Aumenta la simultaneidad de los trabajadores y amplía tu entorno para que pueda ejecutar más tareas de forma simultánea.
  • Genera DAGs de manera que distribuyan sus programaciones de forma uniforme a lo largo del tiempo para evitar programar cientos de tareas al mismo tiempo, de modo que los trabajadores de Airflow tengan tiempo para ejecutar todas las tareas programadas.

Error 504 al acceder al servidor web de Airflow

Consulta Error 504 cuando se accede a la IU de Airflow.

Se pierde la conexión con el servidor de Postgres o MySQL durante la excepción de la consulta durante la ejecución de la tarea o justo después de esta

Las excepciones de Lost connection to Postgres / MySQL server during query suelen ocurrir cuando se cumplen las siguientes condiciones:

  • El DAG usa PythonOperator o un operador personalizado.
  • El DAG realiza consultas a la base de datos de Airflow.

Si se realizan varias consultas desde una función que admite llamadas, los objetos tracebacks pueden apuntar de forma incorrecta a la línea self.refresh_from_db(lock_for_update=True) en el código de Airflow. Es la primera consulta de la base de datos después de la ejecución de la tarea. La causa real de la excepción ocurre antes de esto, cuando una sesión de SQLAlchemy no se cierra de forma correcta.

Las sesiones de SQLAlchemy se limitan a un subproceso y se crean en una sesión de función que admite llamadas que pueden continuar más adelante dentro del código de Airflow. Si hay retrasos significativos entre las consultas dentro de una sesión, es posible que el servidor de Postgres o MySQL ya cierre la conexión. El tiempo de espera de conexión en los entornos de Cloud Composer se establece en alrededor de 10 minutos.

Solución:

  • Usa el decorador airflow.utils.db.provide_session. Este decorador proporciona una sesión válida a la base de datos de Airflow en el parámetro session y cierra la sesión de forma correcta al final de la función.
  • No uses una sola función de larga duración. En cambio, mueve todas las consultas de base de datos a funciones separadas, de modo que haya varias funciones con el decorador airflow.utils.db.provide_session. En este caso, las sesiones se cierran de forma automática después de recuperar los resultados de la consulta.

Control del tiempo de ejecución de los DAG, las tareas y las ejecuciones paralelas del mismo DAG

Si deseas controlar cuánto dura una sola ejecución de un DAG en particular, puedes usar el parámetro dagrun_timeout del DAG. Por ejemplo, si esperas que una sola ejecución del DAG (independientemente de si la ejecución finaliza con éxito o con un error) no dure más de 1 hora, establece este parámetro en 3, 600 segundos.

También puedes controlar cuánto tiempo permites que dure una sola tarea de Airflow. Para hacerlo, puedes usar execution_timeout.

Si deseas controlar cuántas ejecuciones activas del DAG quieres tener para un DAG en particular, puedes usar la opción de configuración de Airflow [core]max-active-runs-per-dag para hacerlo.

Si deseas tener solo una instancia de una ejecución de DAG en un momento determinado, establece el parámetro max-active-runs-per-dag en 1.

Interrupciones transitorias al conectarse a la base de datos de metadatos de Airflow

Cloud Composer se ejecuta sobre una infraestructura distribuida. Esto significa que, de vez en cuando, pueden aparecer algunos problemas transitorios que podrían interrumpir la ejecución de tus tareas de Airflow.

En esas situaciones, es posible que veas los siguientes mensajes de error en los registros de los trabajadores de Airflow:

"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"

o

"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"

Estos problemas intermitentes también pueden deberse a las operaciones de mantenimiento que se realizan en tus entornos de Cloud Composer.

Por lo general, estos errores son intermitentes y, si tus tareas de Airflow son idempotentes y tienes configurados reintentos, no te afectarán. También puedes definir períodos de mantenimiento.

Otro motivo de estos errores podría ser la falta de recursos en el clúster de tu entorno. En esos casos, puedes aumentar la escala o optimizar tu entorno como se describe en las instrucciones para aumentar la escala de los entornos o optimizar tu entorno.

Se marca una ejecución de DAG como correcta, pero no tiene tareas ejecutadas

Si una ejecución del DAG execution_date es anterior a la start_date del DAG, es posible que veas ejecuciones del DAG que no tienen ninguna ejecución de tareas, pero que aún están marcadas como exitosas.

Una ejecución de DAG exitosa sin tareas ejecutadas
Figura 3. Una ejecución de DAG exitosa sin tareas ejecutadas (haz clic para ampliar)

Causa

Esta situación puede ocurrir en uno de los siguientes casos:

  • La discrepancia se debe a la diferencia de zona horaria entre execution_date y start_date del DAG. Esto puede ocurrir, por ejemplo, cuando se usa pendulum.parse(...) para establecer start_date.

  • El start_date del DAG se establece en un valor dinámico, por ejemplo, airflow.utils.dates.days_ago(1).

Solución

  • Asegúrate de que execution_date y start_date usen la misma zona horaria.

  • Especifica un start_date estático y combínalo con catchup=False para evitar ejecutar DAGs con fechas de inicio anteriores.

Síntomas de que la base de datos de Airflow está bajo una carga pesada

Para obtener más información, consulta Síntomas de que la base de datos de Airflow está bajo presión de carga.

¿Qué sigue?