Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
En esta página, se proporcionan información y pasos para solucionar problemas comunes en el flujo de trabajo.
Algunos problemas de ejecución de DAG pueden deberse a que el programador de Airflow no funciona de forma correcta u óptima. Sigue las instrucciones para solucionar problemas del Programador y resolver estos inconvenientes.
Flujo de trabajo para la solución de problemas
Para comenzar con la solución de problemas, siga estos pasos:
Consulta los registros de Airflow.
Puedes aumentar el nivel de registro de Airflow anulando la siguiente opción de configuración de Airflow.
Sección Clave Valor logging
(core
en Airflow 1)logging_level
El valor predeterminado es INFO
. Establece el valor enDEBUG
para obtener más verbosidad en los mensajes de registro.Consulta el Panel de supervisión.
Revisa Cloud Monitoring.
En la consola de Google Cloud , busca errores en las páginas de los componentes de tu entorno.
En la interfaz web de Airflow, consulta la Vista de gráfico del DAG para ver las instancias de tareas fallidas.
Sección Clave Valor webserver
dag_orientation
LR
,TB
,RL
, oBT
Depura fallas del operador
Para depurar una falla del operador, sigue estos pasos:
- Verifica si hay errores específicos de la tarea.
- Consulta los registros de Airflow.
- Revisa Cloud Monitoring.
- Verifica los registros específicos del operador.
- Corrige los errores.
- Sube el DAG a la carpeta
/dags
. - En la interfaz web de Airflow, borra los estados anteriores del DAG.
- Reanuda o ejecuta el DAG.
Soluciona problemas de ejecución de tareas
Airflow es un sistema distribuido con muchas entidades, como el programador, el ejecutor y los trabajadores, que se comunican entre sí a través de una lista de tareas en cola y la base de datos de Airflow, y envían señales (como SIGTERM). En el siguiente diagrama, se muestra una descripción general de las interconexiones entre los componentes de Airflow.
En un sistema distribuido como Airflow, puede haber algunos problemas de conectividad de red o la infraestructura subyacente puede experimentar problemas intermitentes. Esto puede generar situaciones en las que las tareas pueden fallar y reprogramarse para su ejecución, o las tareas pueden no completarse correctamente (por ejemplo, tareas Zombie o tareas que se atascaron en la ejecución). Airflow tiene mecanismos para lidiar con estas situaciones y reanudar automáticamente el funcionamiento normal. En las siguientes secciones, se explican los problemas comunes que ocurren durante la ejecución de tareas de Airflow: tareas zombi, instancias de finalización y señales SIGTERM.
Solución de problemas relacionados con los procesos zombi
Airflow detecta dos tipos de discrepancias entre una tarea y un proceso que ejecuta la tarea:
Las tareas zombi son tareas que deberían estar en ejecución, pero no lo están. Esto puede ocurrir si se finalizó el proceso de la tarea o no responde, si el trabajador de Airflow no informó el estado de la tarea a tiempo porque está sobrecargado o si se apagó la VM en la que se ejecuta la tarea. Airflow encuentra estas tareas de forma periódica y las reintenta o las marca como fallidas, según la configuración de la tarea.
Descubre tareas zombi
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("airflow-scheduler") textPayload:"Detected zombie job"
Las tareas zombie son tareas que no deberían estar en ejecución. Airflow encuentra estas tareas periódicamente y las finaliza.
En las siguientes secciones, se describen los motivos y las soluciones más comunes para las tareas Zombie.
El trabajador de Airflow se quedó sin memoria
Cada trabajador de Airflow puede ejecutar hasta [celery]worker_concurrency
instancias de tareas de forma simultánea. Si el consumo de memoria acumulativo de esas instancias de tareas supera el límite de memoria de un trabajador de Airflow, se finaliza un proceso aleatorio en él para liberar recursos.
Descubre eventos de memoria insuficiente del trabajador de Airflow
resource.type="k8s_node" resource.labels.cluster_name="GKE_CLUSTER_NAME" log_id("events") jsonPayload.message:"Killed process" jsonPayload.message:("airflow task" OR "celeryd")
A veces, la falta de memoria en un trabajador de Airflow puede provocar que se envíen paquetes con formato incorrecto durante una sesión de SQL Alchemy a la base de datos, a un servidor DNS o a cualquier otro servicio al que llame un DAG. En este caso, el otro extremo de la conexión podría rechazar o descartar las conexiones del trabajador de Airflow. Por ejemplo:
"UNKNOWN:Error received from peer
{created_time:"2024-11-31T10:09:52.217738071+00:00", grpc_status:14,
grpc_message:"failed to connect to all addresses; last error: UNKNOWN:
ipv4:<ip address>:443: handshaker shutdown"}"
Soluciones:
Optimiza las tareas para que usen menos memoria, por ejemplo, evitando el código de nivel superior.
Disminuir
[celery]worker_concurrency
.En Cloud Composer 1, actualiza a un tipo de máquina más grande.
Se expulsó el trabajador de Airflow
Las expulsiones de Pods son una parte normal de la ejecución de cargas de trabajo en Kubernetes. GKE expulsa los Pods si se quedaron sin almacenamiento o para liberar recursos para cargas de trabajo con mayor prioridad.
Descubre las expulsiones de trabajadores de Airflow
resource.type="k8s_pod" resource.labels.cluster_name="GKE_CLUSTER_NAME" resource.labels.pod_name:"airflow-worker" log_id("events") jsonPayload.reason="Evicted"
Soluciones:
- Si la expulsión se debe a la falta de almacenamiento, puedes reducir el uso del almacenamiento o quitar los archivos temporales en cuanto ya no los necesites.
Como alternativa, puedes aumentar el almacenamiento disponible o ejecutar cargas de trabajo en un pod dedicado con
KubernetesPodOperator
.
Se cerró el trabajador de Airflow
Es posible que los trabajadores de Airflow se quiten de forma externa. Si las tareas en ejecución no finalizan durante un período de finalización ordenado, se interrumpen y es posible que se detecten como zombis.
Descubre las finalizaciones de los pods de trabajadores de Airflow
resource.type="k8s_cluster" resource.labels.cluster_name="GKE_CLUSTER_NAME" protoPayload.methodName:"pods.delete" protoPayload.response.metadata.name:"airflow-worker"
Situaciones y soluciones posibles:
Los trabajadores de Airflow se reinician durante las modificaciones del entorno, como las actualizaciones o la instalación de paquetes:
Descubre las modificaciones del entorno de Composer
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("cloudaudit.googleapis.com%2Factivity")
Puedes realizar estas operaciones cuando no se estén ejecutando tareas críticas o habilitar los reintentos de tareas.
Es posible que varios componentes no estén disponibles temporalmente durante las operaciones de mantenimiento.
Descubre las operaciones de mantenimiento de GKE
resource.type="gke_nodepool" resource.labels.cluster_name="GKE_CLUSTER_NAME" protoPayload.metadata.operationType="UPGRADE_NODES"
Puedes especificar períodos de mantenimiento para minimizar
se superpone con la ejecución de las tareas críticas.
El trabajador de Airflow estaba bajo una carga pesada
La cantidad de recursos de CPU y memoria disponibles para un trabajador de Airflow está limitada por la configuración del entorno. Si el uso de recursos se acerca a los límites, es posible que se produzca una contención de recursos y demoras innecesarias durante la ejecución de la tarea. En situaciones extremas, cuando faltan recursos durante períodos más largos, esto puede causar tareas zombie.
Soluciones:
- Supervisa el uso de CPU y memoria de los trabajadores y ajústalo para evitar superar el 80%.
La base de datos de Airflow estaba bajo una carga pesada
Varios componentes de Airflow utilizan una base de datos para comunicarse entre sí y, en particular, para almacenar los latidos de las instancias de tareas. La escasez de recursos en la base de datos genera tiempos de consulta más largos y puede afectar la ejecución de tareas.
A veces, los siguientes errores están presentes en los registros de un trabajador de Airflow:
(psycopg2.OperationalError) connection to server at <IP address>,
port 3306 failed: server closed the connection unexpectedly
This probably means the server terminated abnormally before or while
processing the request.
Soluciones:
- Evita usar muchas instrucciones
Variables.get
en el código del DAG de nivel superior. En su lugar, usa plantillas de Jinja para recuperar los valores de las variables de Airflow. - Optimiza (reduce) el uso de las instrucciones xcom_push y xcom_pull en las plantillas de Jinja en el código de DAG de nivel superior.
- Considera actualizar a un tamaño de entorno más grande (mediano o grande).
- Disminuye la cantidad de programadores
- Reduce la frecuencia del análisis del DAG.
- Supervisa el uso de CPU y memoria de la base de datos.
La base de datos de Airflow no estuvo disponible temporalmente
Un trabajador de Airflow puede tardar en detectar y controlar correctamente los errores intermitentes, como los problemas de conectividad temporales. Es posible que supere el umbral predeterminado de detección de zombies.
Descubre los tiempos de espera de la señal de monitoreo de Airflow
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("airflow-worker") textPayload:"Heartbeat time limit exceeded"
Soluciones:
Aumenta el tiempo de espera para las tareas zombi y anula el valor de la opción de configuración de Airflow
[scheduler]scheduler_zombie_task_threshold
:Sección Clave Valor Notas scheduler
scheduler_zombie_task_threshold
Nuevo tiempo de espera (en segundos) El valor predeterminado es 300
.
Soluciona problemas relacionados con la finalización de instancias
Airflow usa el mecanismo de instancia de finalización para cerrar las tareas de Airflow. Este mecanismo se usa en las siguientes situaciones:
- Cuando un programador finaliza una tarea que no se completó a tiempo.
- Cuando una tarea agota el tiempo de espera o se ejecuta durante demasiado tiempo.
Cuando Airflow finaliza las instancias de tareas, puedes ver las siguientes entradas de registro en los registros de un trabajador de Airflow que ejecutó la tarea:
INFO - Subtask ... WARNING - State of this instance has been externally set
to success. Terminating instance.
INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.
Soluciones posibles:
Verifica el código de la tarea para detectar errores que puedan hacer que se ejecute durante demasiado tiempo.
Aumenta el valor de la opción de configuración de Airflow
[celery_broker_transport_options]visibility_timeout
.Como resultado, el programador espera más tiempo a que finalice una tarea antes de considerarla una tarea zombie. Esta opción es especialmente útil para las tareas que llevan mucho tiempo y duran varias horas. Si el valor es demasiado bajo (por ejemplo, 3 horas), el programador considera que las tareas que se ejecutan durante 5 o 6 horas están "colgadas" (tareas zombie).
Aumenta el valor de la opción de configuración de Airflow
[core]killed_task_cleanup_time
.Un valor más largo proporciona más tiempo a los trabajadores de Airflow para finalizar sus tareas correctamente. Si el valor es demasiado bajo, las tareas de Airflow podrían interrumpirse de forma abrupta, sin tiempo suficiente para finalizar su trabajo correctamente.
Solución de problemas relacionados con los indicadores SIGTERM
Linux, Kubernetes, el programador de Airflow y Celery usan señales SIGTERM para finalizar los procesos responsables de ejecutar los trabajadores o las tareas de Airflow.
Puede haber varios motivos por los que se envían señales SIGTERM en un entorno:
Una tarea se convirtió en tarea zombi y debe detenerse.
El programador descubrió un duplicado de una tarea y envía señales de SIGTERM y de finalización de la instancia a la tarea para detenerla.
En el ajuste de escala automático horizontal de Pods, el plano de control de GKE envía señales SIGTERM para quitar los Pods que ya no son necesarios.
El programador puede enviar indicadores SIGTERM al proceso DagFileProcessorManager. El programador usa estas señales SIGTERM para administrar el ciclo de vida del proceso DagFileProcessorManager y se pueden ignorar de forma segura.
Ejemplo:
Launched DagFileProcessorManager with pid: 353002 Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: [] Sending the signal Signals.SIGTERM to group 353002 Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
Condición de carrera entre la devolución de llamada de latido y las devoluciones de llamada de salida en el local_task_job, que supervisa la ejecución de la tarea. Si el latido detecta que una tarea se marcó como exitosa, no puede distinguir si la tarea en sí se completó correctamente o si se le indicó a Airflow que la considere exitosa. Sin embargo, finalizará un ejecutor de tareas sin esperar a que salga.
Estos indicadores SIGTERM se pueden ignorar sin inconvenientes. La tarea ya está en el estado correcto y la ejecución de la DAG en su totalidad no se verá afectada.
La entrada de registro
Received SIGTERM.
es la única diferencia entre la salida normal y la finalización de la tarea en el estado correcto.Figura 2. Condición de carrera entre las devoluciones de llamada de latido y salida (haz clic para ampliar) Un componente de Airflow usa más recursos (CPU, memoria) de los que permite el nodo del clúster.
El servicio de GKE realiza operaciones de mantenimiento y envía señales SIGTERM a los Pods que se ejecutan en un nodo que está a punto de actualizarse.
Cuando una instancia de tarea se finaliza con SIGTERM, puedes ver las siguientes entradas de registro en los registros de un trabajador de Airflow que ejecutó la tarea:
{local_task_job.py:211} WARNING - State of this instance has been externally set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed with exception
Soluciones posibles:
Este problema ocurre cuando una VM que ejecuta la tarea se queda sin memoria. Esto no se relaciona con la configuración de Airflow, sino con la cantidad de memoria disponible para la VM.
En Cloud Composer 1, puedes volver a crear tu entorno con un tipo de máquina que tenga más rendimiento.
Puedes reducir el valor de la opción de configuración de Airflow de
[celery]worker_concurrency
concurrencia. Esta opción determina cuántas tareas ejecuta de forma simultánea un trabajador de Airflow determinado.
Consultas de Cloud Logging para descubrir los motivos de los reinicios o las expulsiones de Pods
Los entornos de Cloud Composer usan clústeres de GKE como capa de infraestructura de procesamiento. En esta sección, puedes encontrar consultas útiles que te ayudarán a encontrar los motivos de los reinicios o las expulsiones del trabajador o el programador de Airflow.
Las consultas que se presentan más adelante se pueden ajustar de la siguiente manera:
Puedes especificar la línea de tiempo requerida en Cloud Logging. Por ejemplo, las últimas 6 horas, los últimos 3 días o puedes definir tu intervalo de tiempo personalizado.
Debes especificar el nombre del clúster de tu entorno en CLUSTER_NAME.
Puedes limitar la búsqueda a un Pod específico agregando POD_NAME.
Descubre los contenedores reiniciados
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"will be restarted" resource.labels.cluster_name="CLUSTER_NAME"
Consulta alternativa para limitar los resultados a un pod específico:
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"will be restarted" resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
Descubre los contenedores que se cerraron como resultado de un evento de memoria insuficiente
resource.type="k8s_node" log_id("events") (jsonPayload.reason:("OOMKilling" OR "SystemOOM") OR jsonPayload.message:("OOM encountered" OR "out of memory")) severity=WARNING resource.labels.cluster_name="CLUSTER_NAME"
Consulta alternativa para limitar los resultados a un pod específico:
resource.type="k8s_node" log_id("events") (jsonPayload.reason:("OOMKilling" OR "SystemOOM") OR jsonPayload.message:("OOM encountered" OR "out of memory")) severity=WARNING resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
Descubre los contenedores que dejaron de ejecutarse
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"ContainerDied" severity=DEFAULT resource.labels.cluster_name="CLUSTER_NAME"
Consulta alternativa para limitar los resultados a un pod específico:
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"ContainerDied" severity=DEFAULT resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
Impacto de las operaciones de actualización en las ejecuciones de tareas de Airflow
Las operaciones de actualización interrumpen las tareas de Airflow que se están ejecutando, a menos que una tarea se ejecute en el modo diferible.
Te recomendamos que realices estas operaciones cuando esperes un impacto mínimo en las ejecuciones de tareas de Airflow y que configures los mecanismos de reintento adecuados en tus DAGs y tareas.
Problemas comunes
En las siguientes secciones, se describen los síntomas y las posibles soluciones para algunos problemas comunes del DAG.
Negsignal.SIGKILL
interrumpió la tarea de Airflow
A veces, es posible que tu tarea use más memoria de la que se asigna al trabajador de Airflow.
En tal situación, Negsignal.SIGKILL
podría interrumpirla. El sistema envía este indicador para evitar un mayor consumo de memoria que podría afectar la ejecución de otras tareas de Airflow. En el registro del trabajador de Airflow, es posible que veas la siguiente entrada de registro:
{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL
Negsignal.SIGKILL
también puede aparecer como código -9
.
Soluciones posibles:
worker_concurrency
más bajo de los trabajadores de AirflowActualiza a un tipo de máquina más grande que se use en el clúster de Cloud Composer.
Optimiza tus tareas para que usen menos memoria.
La tarea falla sin emitir registros debido a errores de análisis del DAG
A veces, puede haber errores sutiles en el DAG que provoquen una situación en la que el programador de Airflow pueda programar tareas para su ejecución, el procesador de DAG pueda analizar el archivo DAG, pero, luego, el trabajador de Airflow no pueda ejecutar tareas desde el DAG porque hay errores de programación en el archivo DAG. Esto podría generar una situación en la que una tarea de Airflow se marque como Failed
y no haya ningún registro de su ejecución.
Soluciones:
Verifica en los registros del trabajador de Airflow que no haya errores generados por el trabajador de Airflow relacionados con un DAG faltante o errores de análisis del DAG.
Aumenta los parámetros relacionados con el análisis del DAG:
Aumenta [dagbag-import-timeout][ext-airflow-dagrun-import-timeout] a, al menos, 120 segundos (o más, si es necesario).
Aumenta dag-file-processor-timeout a al menos 180 segundos (o más, si es necesario). Este valor debe ser superior a
dagbag-import-timeout
.
Consulta también Solución de problemas del procesador de DAG.
La tarea falla sin emitir registros debido a la presión de recursos
Síntoma: Durante la ejecución de una tarea, se interrumpe abruptamente el subproceso del trabajador de Airflow responsable de la ejecución de la tarea de Airflow. El error que se ve en el registro del trabajador de Airflow puede ser similar al siguiente:
...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task R = retval = fun(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__ return self.run(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command _execute_in_fork(command_to_exec) File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...
Solución:
En Cloud Composer 1, crea un nuevo entorno con un tipo de máquina más grande que el tipo de máquina actual. Considera agregar más nodos a tu entorno y reducir el valor de
[celery]worker_concurrency
para tus trabajadores.Si tu entorno también genera tareas zombie, consulta Solución de problemas relacionados con las tareas zombie.
Para ver un instructivo sobre cómo depurar problemas de memoria insuficiente, consulta Cómo depurar problemas de DAG por falta de memoria y almacenamiento.
La tarea falla sin emitir registros debido a la expulsión del Pod
Los Pods de Google Kubernetes Engine están sujetos al ciclo de vida de los Pods de Kubernetes y a la expulsión de Pods. Los aumentos repentinos de tareas y la programación conjunta de los trabajadores son dos de las causas más comunes de expulsión de Pods en Cloud Composer.
La expulsión de Pods puede ocurrir cuando un Pod en particular usa recursos de un nodo, en relación con las expectativas de consumo de recursos configuradas para el nodo. Por ejemplo, la expulsión puede ocurrir cuando varias tareas con alto contenido de memoria se ejecutan en un Pod y su carga combinada hace que el nodo en el que se ejecuta este Pod supere el límite de consumo de memoria.
Si se expulsa un Pod de trabajador de Airflow, todas las instancias de tareas que se ejecutan en ese Pod se interrumpen y, luego, Airflow las marca como con errores.
Los registros están almacenados en búfer. Si se expulsa un Pod de trabajador antes de que se vacíe el búfer, no se emiten registros. La falla de la tarea sin registros indica que los trabajadores de Airflow se reiniciaron debido a la falta de memoria (OOM). Algunos registros pueden estar presentes en Cloud Logging, aunque los registros de Airflow no se hayan emitido.
Para ver los registros, haz lo siguiente:
En la consola de Google Cloud , ve a la página Entornos.
En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña Registros.
Consulta los registros de los trabajadores individuales de Airflow en Todos los registros > Registros de Airflow > Trabajadores.
Síntoma:
En la consola de Google Cloud , ve a la página Cargas de trabajo.
Si hay pods
airflow-worker
que muestranEvicted
, haz clic en cada pod expulsado y busca el mensajeThe node was low on resource: memory
en la parte superior de la ventana.
Solución:
Crea un nuevo entorno de Cloud Composer 1 con un tipo de máquina más grande que el tipo de máquina actual.
Verifica los registros de los pods de
airflow-worker
para encontrar posibles causas de expulsión. Para obtener más información sobre cómo recuperar registros de Pods individuales, consulta Soluciona problemas con cargas de trabajo implementadas.Asegúrate de que las tareas del DAG sean idempotentes y se puedan reintentar.
Evita descargar archivos innecesarios en el sistema de archivos local de los trabajadores de Airflow.
Los trabajadores de Airflow tienen una capacidad limitada del sistema de archivos local. Cuando se agota el espacio de almacenamiento, el plano de control de GKE expulsa el Pod del trabajador de Airflow. Esto hace que fallen todas las tareas que ejecutaba el trabajador expulsado.
Ejemplos de operaciones problemáticas:
- Descarga archivos u objetos y los almacena de forma local en un trabajador de Airflow. En cambio, almacena estos objetos directamente en un servicio adecuado, como un bucket de Cloud Storage.
- Acceder a objetos grandes en la carpeta
/data
desde un trabajador de Airflow El trabajador de Airflow descarga el objeto en su sistema de archivos local. En su lugar, implementa tus DAG de modo que los archivos grandes se procesen fuera del Pod de trabajador de Airflow.
La ejecución del DAG no finaliza dentro del tiempo esperado
Síntoma:
A veces, una ejecución de DAG no finaliza porque las tareas de Airflow se bloquean y la ejecución de DAG dura más de lo esperado. En condiciones normales, las tareas de Airflow no permanecen indefinidamente en el estado de en cola o en ejecución, ya que Airflow tiene procedimientos de tiempo de espera y limpieza que ayudan a evitar esta situación.
Solución:
Usa el parámetro
dagrun_timeout
para los DAG. Por ejemplo:dagrun_timeout=timedelta(minutes=120)
. Como resultado, cada ejecución del DAG debe finalizar dentro del tiempo de espera de la ejecución del DAG. Para obtener más información sobre los estados de las tareas de Airflow, consulta la documentación de Apache Airflow.Usa el parámetro tiempo de espera de ejecución de la tarea para definir un tiempo de espera predeterminado para las tareas que se ejecutan según los operadores de Apache Airflow.
Mayor tráfico de red desde y hacia la base de datos de Airflow
La cantidad de tráfico de red entre el clúster de GKE de tu entorno y la base de datos de Airflow depende de la cantidad de DAG, de tareas en DAG, y de cómo los DAG acceden a los datos en la base de datos de Airflow. Los siguientes factores pueden influir en el uso de la red:
Consultas a la base de datos de Airflow Si tus DAG realizan muchas consultas, generan grandes cantidades de tráfico. Por ejemplo, verificar el estado de las tareas antes de continuar con otras tareas, consultar la tabla XCom y volcar el contenido de la base de datos de Airflow.
Gran cantidad de tareas. Cuantas más tareas haya para programar, más tráfico de red se generará. Esta consideración se aplica a la cantidad total de tareas en tus DAG y a la frecuencia de programación. Cuando el programador de Airflow programa las ejecuciones de DAG, realiza consultas a la base de datos de Airflow y genera tráfico.
La interfaz web de Airflow genera tráfico de red, ya que realiza consultas a la base de datos de Airflow. El uso intensivo de páginas con grafos, tareas y diagramas puede generar grandes volúmenes de tráfico de red.
No programes los DAG generados de forma programática al mismo tiempo
Generar objetos DAG de forma programática a partir de un archivo DAG es un método eficiente para crear muchos DAG similares que solo tienen pequeñas diferencias.
Es importante no programar todos esos DAG para que se ejecuten de inmediato. Es muy probable que los trabajadores de Airflow no tengan suficientes recursos de CPU y memoria para ejecutar todas las tareas programadas al mismo tiempo.
Para evitar problemas con la programación de DAGs programáticos, haz lo siguiente:
- Aumenta la simultaneidad de los trabajadores y amplía tu entorno para que pueda ejecutar más tareas de forma simultánea.
- Genera DAGs de manera que distribuyan sus programaciones de forma uniforme a lo largo del tiempo para evitar programar cientos de tareas al mismo tiempo, de modo que los trabajadores de Airflow tengan tiempo para ejecutar todas las tareas programadas.
Error 504 al acceder al servidor web de Airflow
Consulta Error 504 cuando se accede a la IU de Airflow.
Se pierde la conexión con el servidor de Postgres o MySQL durante la excepción de la consulta durante la ejecución de la tarea o justo después de esta
Las excepciones de Lost connection to Postgres / MySQL server during query
suelen ocurrir cuando se cumplen las siguientes condiciones:
- El DAG usa
PythonOperator
o un operador personalizado. - El DAG realiza consultas a la base de datos de Airflow.
Si se realizan varias consultas desde una función que admite llamadas, los objetos tracebacks pueden apuntar de forma incorrecta a la línea self.refresh_from_db(lock_for_update=True)
en el código de Airflow. Es la primera consulta de la base de datos después de la ejecución de la tarea. La causa real de la excepción ocurre antes de esto, cuando una sesión de SQLAlchemy no se cierra de forma correcta.
Las sesiones de SQLAlchemy se limitan a un subproceso y se crean en una sesión de función que admite llamadas que pueden continuar más adelante dentro del código de Airflow. Si hay retrasos significativos entre las consultas dentro de una sesión, es posible que el servidor de Postgres o MySQL ya cierre la conexión. El tiempo de espera de conexión en los entornos de Cloud Composer se establece en alrededor de 10 minutos.
Solución:
- Usa el decorador
airflow.utils.db.provide_session
. Este decorador proporciona una sesión válida a la base de datos de Airflow en el parámetrosession
y cierra la sesión de forma correcta al final de la función. - No uses una sola función de larga duración. En cambio, mueve todas las consultas de base de datos a funciones separadas, de modo que haya varias funciones con el decorador
airflow.utils.db.provide_session
. En este caso, las sesiones se cierran de forma automática después de recuperar los resultados de la consulta.
Control del tiempo de ejecución de los DAG, las tareas y las ejecuciones paralelas del mismo DAG
Si deseas controlar cuánto dura una sola ejecución de un DAG en particular, puedes usar el parámetro dagrun_timeout
del DAG. Por ejemplo, si esperas que una sola ejecución del DAG (independientemente de si la ejecución finaliza con éxito o con un error) no dure más de 1 hora, establece este parámetro en 3, 600 segundos.
También puedes controlar cuánto tiempo permites que dure una sola tarea de Airflow. Para hacerlo, puedes usar execution_timeout
.
Si deseas controlar cuántas ejecuciones activas del DAG quieres tener para un DAG en particular, puedes usar la opción de configuración de Airflow [core]max-active-runs-per-dag
para hacerlo.
Si deseas tener solo una instancia de una ejecución de DAG en un momento determinado, establece el parámetro max-active-runs-per-dag
en 1
.
Interrupciones transitorias al conectarse a la base de datos de metadatos de Airflow
Cloud Composer se ejecuta sobre una infraestructura distribuida. Esto significa que, de vez en cuando, pueden aparecer algunos problemas transitorios que podrían interrumpir la ejecución de tus tareas de Airflow.
En esas situaciones, es posible que veas los siguientes mensajes de error en los registros de los trabajadores de Airflow:
"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"
o
"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"
Estos problemas intermitentes también pueden deberse a las operaciones de mantenimiento que se realizan en tus entornos de Cloud Composer.
Por lo general, estos errores son intermitentes y, si tus tareas de Airflow son idempotentes y tienes configurados reintentos, no te afectarán. También puedes definir períodos de mantenimiento.
Otro motivo de estos errores podría ser la falta de recursos en el clúster de tu entorno. En esos casos, puedes aumentar la escala o optimizar tu entorno como se describe en las instrucciones para aumentar la escala de los entornos o optimizar tu entorno.
Se marca una ejecución de DAG como correcta, pero no tiene tareas ejecutadas
Si una ejecución del DAG execution_date
es anterior a la start_date
del DAG, es posible que veas ejecuciones del DAG que no tienen ninguna ejecución de tareas, pero que aún están marcadas como exitosas.

Causa
Esta situación puede ocurrir en uno de los siguientes casos:
La discrepancia se debe a la diferencia de zona horaria entre
execution_date
ystart_date
del DAG. Esto puede ocurrir, por ejemplo, cuando se usapendulum.parse(...)
para establecerstart_date
.El
start_date
del DAG se establece en un valor dinámico, por ejemplo,airflow.utils.dates.days_ago(1)
.
Solución
Asegúrate de que
execution_date
ystart_date
usen la misma zona horaria.Especifica un
start_date
estático y combínalo concatchup=False
para evitar ejecutar DAGs con fechas de inicio anteriores.
Síntomas de que la base de datos de Airflow está bajo una carga pesada
Para obtener más información, consulta Síntomas de que la base de datos de Airflow está bajo presión de carga.
¿Qué sigue?
- Solución de problemas de instalación de paquetes de PyPI
- Solución de problemas de actualizaciones y mejoras de entornos