Data Platform/Systems/Airflow/Kubernetes
This page relates specifically to our Airflow instances deployed to Kubernetes, and their specificity. We assume that the airflow instance is deployed alongside a dedicated CloudnativePG cluster , running in the same namespace.
Note : replace airflow-test-k8s by other instance names where appropriate. See list of airflow instances .
Architecture
Components
Airflow is deployed via the
airflow
chart. The release is composed of multiple
Deployment
resources:
- the webserver
- the scheduler
- the kerberos ticket renewer
- an envoy proxy, allowing task pods to connect to internal services via the service mesh
-
the hadoop shell, a container which sole purpose is to be exec-ed into to run
yarn/hdfsad-hoc commands
Executor
All Kubernetes instances use the KubernetesExecutor , which means that DAG tasks are executed as Kubernetes Pods. Some instances even use both the Kubernetes and Local executors.
DAGs deployment
For the moment, any Airlfow instance running on Kubernetes syncs up the
main
branch of
airflow-dags
every 5 minutes using
https://github.com/kubernetes/git-sync
, meaning that any merged MR should be reflected in Airflow in about 5 minutes.
In the near future, the model will be turned into a push vs a pull. When an MR is merged, the Gitlab CI will send a POST request to
blunderbuss
, which will trigger a sync of the
main
branch into the volume mounted by the airflow schedulers.
Even when that is in place, we'll still be able to use
git-sync
to synchronize feature branches in development instances.
Artifacts deployment
Artifacts used by DAGs are defined in instance-specific YAML configuration files located in
<instance_folder>/config/artifacts.yaml
. Each artifact has its original source (a URL, a Maven repository coordinate, etc.) and by default each instance configures its own artifact cache location in our main HDFS file system. When referenced from within a DAG, artifact files are pulled from these cache locations so that the DAGs could run.
Artifacts deployment is the process of synchronizing the state of artifacts.yaml files and their defined HDFS cache locations. The synchronization process is cache-additive, in the sense that adding an artifact to
artifacts.yaml
will cause that artifact to be added to the cache, but removing an artifact from YAML will not cause that artifact to be removed from the cache.
Artifacts deployment is initiated manually from a GitLab CI/CD pipeline that is accessible after a successful MR merge. The deployment process will, by default, update only the new artifact files that are not already present in the cache. In case you need to make sure
all artifacts are refreshed
in cache, you can do a forced cache warmup by providing an environment variable to the GitLab CI/CD job
artifact-cache-warmup
named
CI_FORCE_CACHE_REASON
with a string explaining the reason for a forced warmup.
Logging
The logs of the airflow components themselves are sent to our observability pipeline and are accessible through logstash . However, the DAG task logs themselves are uploaded to S3 after completion. Streaming the logs of an ongoing DAG task can be done from the web UI, and relies on the Kubernetes Logs API.
Security
Webserver authentication and authorization
Access to the webserver is OIDC authenticated, and the user role is derived from its LDAP groups. For example, SREs (members of the
ops
LDAP group) are automatically given the
Admin
role. The mapping can be customized per instance, so that we can define LDAP groups for per-instance admins and per-instance members.
API access
Access to the Airflow API will be Kerberos authenticated, meaning that:
- services will be able to access the API by authenticating to Kerberos via their own Keytab
-
users will be able to access the API by authenticated to Kerberos via their password and
kinit
Kerberos
We generate a keytab for each instance. It will be stored as a base64-encoded secret, and only mounted on the
airflow-kerberos
pod, in charge of obtaining (as well as regularly renewing) a
TGT
, itself mounted into every single pod that will need to communicate with Kerberised systems (aka the worker pods).
Note : the keytab is also mounted in the webserver pod, but is only to setup the Kerberised-API access at init time.
Kubernetes RBAC
When using the
KubernetesExecutor
, the scheduler needs to be able to perform CRUD operations on
Pods
, and the webserver needs to be able to tail
Pod
logs. As the user used deploy charts does not have permissions to create
Role
and
RoleBinding
resources, we deploy the chart with a specific user/role that can, called
deploy-airflow
.
UNIX user impersonation
Each airflow instance has a dedicated keytab, with first principal of the form
<user>/airflow-<instance-name>.discovery.wmnet@WIKIMEDIA
. This will ensure that any interaction with HDFS, Spark, etc, will impersonate the
<user>
user.
For example, the first principal of airflow-test-k8s instance is
analytics/airflow-test-k8s.discovery.wmnet@WIKIMEDIA
, which enables impersonation of the
analytics
user in Hadoop.
Database access
The airflow chart was designed to run alongside a
CloudnativePG cluster
running in the same namespace. However, it can be configured to use an "external" PG database, such as
an-db1001.eqiad.wmnet
for transitioning purposes. The ultimate goal is to have each instance run alongside its own PG cluster.
When configured to use a Cloudnative PG cluster, access to the DB goes through PGBouncer, instead of hitting PG directly. This, as described in the airflow documentation , was made to mitigate the fact that:
Airflow is known - especially in high-performance setup - to open many connections to metadata database. This might cause problems for Postgres resource usage, because in Postgres, each connection creates a new process and it makes Postgres resource-hungry when a lot of connections are opened. Therefore we recommend to use PGBouncer as database proxy for all Postgres production installations. PGBouncer can handle connection pooling from multiple components, but also in case you have remote database with potentially unstable connectivity, it will make your DB connectivity much more resilient to temporary network problems.
Connections
Connections
are managed via helm values, under
.Values.config.airflow.connections
. As such, they are managed by a
LocalFilesystemBackend
secret manager, and will not be visible in the web UI.
Management DAGs
The Kubernetes Airflow instances come with built-in maintenance DAGs, performing actions such as:
- removing task logs from S3 after they reach a certain age
- expunging DB tables from data that has reached a certain age
- removing obsolete Airflow DAG/Task lineage data from DataHub
- ...
These DAGs are tagged with
airflow_maintenance
.
You can set the following Airflow variables in your release values, under
config.airflow.variables
, to configure the Airflow maintenance DAGs:
-
s3_log_retention_days(default value: 30): number of days of task logs to keep in S3 -
db_cleanup_tables: a comma-separated list of tables that will be regularly expunged of old data, to keep the database as lean as possible -
db_cleanup_retention_days: if specified along withdb_cleanup_tables, specifies the number of days after which data will be cleaned from the these tables.
Administration
Moved to Data Platform/Systems/Airflow/Kubernetes/Administration
Operations
Moved to Data Platform/Systems/Airflow/Kubernetes/Operations
I'm getting paged
Pods are not running
If you're getting an alert or getting paged because the app isn't running, investigate if something in the application logs (see the checklist section) could explain the crash. In case of a recurring crash, the pod would be in
CrashloopBackoff
state in Kubernetes. To check whether this is the case, ssh to the deployment server and run the following commands
$ ssh deployment.eqiad.wmnet brouberol@deploy1003:~$ kube_env airflow-<instance>-deploy dse-k8s-eqiad brouberol@deploy1003:~$ kubectl get pod
Then you can tail the logs as needed. Feel free to refer to the log dashboard listed in the checklist.
If no pod at all is displayed, re-deploy the app by following the Kubernetes deployment instructions .
An instance DAG Bag is empty
This has happened in T394459 and we think it was caused by the scheduler having lost access to either the database or the Ceph filesystem. Your first reflex should be to restart the scheduler.
$ ssh deployment.eqiad.wmnet
brouberol@deploy1003:~$ kube_env airflow-<instance>-deploy dse-k8s-eqiad
brouberol@deploy1003:~$ kubectl delete pod -l app=airflow,component=scheduler
If that does not fix the issue, then have a look at the Airflow instance dashboard , especially to the Dag Processing section. If you see DAG processing time shot up, try to identify any particular file which processing time recently increased. There might have been a recent code change that introduced extra complexity to the DAG itself. Reach out to the author to revert.
A namespace is nearing its resources quota
The airflow namespaces are assigned pretty large resources quotas (
requests.cpu
,
requests.memory
,
limits.cpu
,
limits.memory
) . It can happen that these quotas are reached, thus preventing new pods to be scheduled. You can inspect the evolution of resource usage over time on this
dashboard
.
If a namespace is nearing / has reached its quota, you can do 2 (non-exclusive) things:
- Increase the quotas to unblock the situation (see example )
- reach out to the team owning the DAGs to investigate which DAGs are taking the majority of resources, and assign them a smaller resource profile if we can afford it. See example .
The scheduler is not heartbeating
If the scheduler is not heartbeating, it means that it is so busy with scheduling tasks that it's not doing anything else. If you know that this is a temporary thing (the scheduler is currently enqueuing 1000s of tasks that will get dequeued over the next couple of hours/days), consider redeploying the airflow instance with increased scheduler resources. If this alert fires because a change landed that makes this scheduler activity the new normal, increase the scheduler resources and reach out to the DPE SRE team to review the changes.
How to
Use the airflow CLI
ssh into the deployment host (
ssh deployment.eqiad.wmnet
) and run the following commands:
test-k8s
by the appropriate airflow instance name.
brouberol@deploy2002:~$ kube_env airflow-test-k8s-deploy dse-k8s-eqiad
brouberol@deploy2002:~$ kubectl exec -it $(kubectl get pod -l app=airflow,component=webserver --no-headers -o custom-columns=":metadata.name") -c airflow-production -- bash
% airflow --help
Usage: airflow [-h] GROUP_OR_COMMAND ...
Positional Arguments:
GROUP_OR_COMMAND
Groups
config View configuration
connections Manage connections
dags Manage DAGs
db Database operations
jobs Manage jobs
kubernetes Tools to help run the KubernetesExecutor
pools Manage pools
providers Display providers
roles Manage roles
tasks Manage tasks
users Manage users
variables Manage variables
Commands:
cheat-sheet Display cheat sheet
dag-processor Start a standalone Dag Processor instance
info Show information about current Airflow and environment
kerberos Start a kerberos ticket renewer
plugins Dump information about loaded plugins
rotate-fernet-key
Rotate encrypted connection credentials and variables
scheduler Start a scheduler instance
standalone Run an all-in-one copy of Airflow
sync-perm Update permissions for existing roles and optionally DAGs
triggerer Start a triggerer instance
version Show the version
webserver Start a Airflow webserver instance
Options:
-h, --help show this help message and exit
airflow command error: the following arguments are required: GROUP_OR_COMMAND, see help above.
command terminated with exit code 2
Use the
yarn
CLI
ssh into the deployment host (
ssh deployment.eqiad.wmnet
) and run the following commands:
test-k8s
by the appropriate airflow instance name.
brouberol@deploy2002:~$ kube_env airflow-test-k8s-deploy dse-k8s-eqiad
brouberol@deploy2002:~$ kubectl exec -it $(kubectl get pod -l app=airflow,component=hadoop-shell --no-headers -o custom-columns=":metadata.name") -- bash
% yarn --help
Usage: yarn [--config confdir] [COMMAND | CLASSNAME]
CLASSNAME run the class named CLASSNAME
or
where COMMAND is one of:
resourcemanager run the ResourceManager
Use -format-state-store for deleting the RMStateStore.
Use -remove-application-from-state-store <appId> for
removing application from RMStateStore.
nodemanager run a nodemanager on each slave
timelinereader run the timeline reader server
timelineserver run the timeline server
rmadmin admin tools
router run the Router daemon
sharedcachemanager run the SharedCacheManager daemon
scmadmin SharedCacheManager admin tools
version print the version
jar <jar> run a jar file
application prints application(s)
report/kill application
applicationattempt prints applicationattempt(s)
report
container prints container(s) report
node prints node report(s)
queue prints queue information
logs dump container logs
schedulerconf updates scheduler configuration
classpath prints the class path needed to
get the Hadoop jar and the
required libraries
cluster prints cluster information
daemonlog get/set the log level for each
daemon
top run cluster usage tool
Most commands print help when invoked w/o parameters.
command terminated with exit code 1
Use the hdfs CLI
ssh into the deployment host (
ssh deployment.eqiad.wmnet
) and run the following commands:
test-k8s
by the appropriate airflow instance name.
brouberol@deploy2002:~$ kube_env airflow-test-k8s-deploy dse-k8s-eqiad
brouberol@deploy2002:~$ kubectl exec -it $(kubectl get pod -l app=airflow,component=hadoop-shell --no-headers -o custom-columns=":metadata.name") -- bash
% hdfs --help
Usage: hdfs [--config confdir] [--loglevel loglevel] COMMAND
where COMMAND is one of:
dfs run a filesystem command on the file systems supported in Hadoop.
classpath prints the classpath
namenode -format format the DFS filesystem
secondarynamenode run the DFS secondary namenode
namenode run the DFS namenode
journalnode run the DFS journalnode
zkfc run the ZK Failover Controller daemon
datanode run a DFS datanode
debug run a Debug Admin to execute HDFS debug commands
dfsadmin run a DFS admin client
dfsrouter run the DFS router
dfsrouteradmin manage Router-based federation
haadmin run a DFS HA admin client
fsck run a DFS filesystem checking utility
balancer run a cluster balancing utility
jmxget get JMX exported values from NameNode or DataNode.
mover run a utility to move block replicas across
storage types
oiv apply the offline fsimage viewer to an fsimage
oiv_legacy apply the offline fsimage viewer to an legacy fsimage
oev apply the offline edits viewer to an edits file
fetchdt fetch a delegation token from the NameNode
getconf get config values from configuration
groups get the groups which users belong to
snapshotDiff diff two snapshots of a directory or diff the
current directory contents with a snapshot
lsSnapshottableDir list all snapshottable dirs owned by the current user
Use -help to see options
portmap run a portmap service
nfs3 run an NFS version 3 gateway
cacheadmin configure the HDFS cache
crypto configure HDFS encryption zones
storagepolicies list/get/set block storage policies
version print the version
Most commands print help when invoked w/o parameters.
Test a task network policies
We run a toolbox with the same network policies than the ones applied to the task pods themselves. This way, you can exec into it to dynamically test whether a given host/port would be reachable from the task themselves.
brouberol@deploy1003:~$ kube_env airflow-test-k8s-deploy dse-k8s-eqiad
brouberol@deploy1003:~$ kubectl exec -it $(kubectl get pod -l app=airflow,component=task-pod,role=toolbox --no-headers -o custom-columns=":metadata.name") -- bash
airflow@airflow-task-shell-79946fcb75-bmszm:/opt/airflow$ /opt/airflow/usr/bin/is_port_open schema.discovery.wmnet 443
schema.discovery.wmnet:443 is open
airflow@airflow-task-shell-79946fcb75-bmszm:/opt/airflow$ /opt/airflow/usr/bin/is_port_open apus.discovery.wmnet 443
apus.discovery.wmnet:443 is closed
Test a WIP DAG in a development environment
See Data Platform/Systems/Airflow/Developer guide#On Kubernetes
Fetch the content of XCOM stored in S3
We've configured Airflow to store large (>256KB) XCOM values in S3, to reduce database traffic. This means that these XCOMs will be visible as e.g.
s3://s3_dpe@logs.airflow-main.dse-k8s-eqiad/xcoms/refine_to_hive_hourly/scheduled__2026-02-05T19:00:00+00:00/fetch_stream_configurations/a625c919-945d-4bcf-a0fe-df8710a0cedb
in the Airflow UI. To fetch the actual value, you can rely on a CLI available in the airflow scheduler pod.
brouberol@deploy2002:~$ kube_env airflow-test-k8s-deploy dse-k8s-eqiad
brouberol@deploy2002:~$ kubectl exec -it $(kubectl get pod -l app=airflow,component=scheduler --no-headers -o custom-columns=":metadata.name") -- bash
runuser@airflow-scheduler-665697dd9-mcx67:/opt/airflow$ ./usr/bin/get-xcom-from-s3 s3://s3_dpe@logs.airflow-main.dse-k8s-eqiad/xcoms/refine_to_hive_hourly/scheduled__2026-02-05T19:00:00+00:00/fetch_stream_configurations/a625c919-945d-4bcf-a0fe-df8710a0cedb
[{"table_format": "hive", "hive_table": "event.centralnoticebannerhistory", "hive_partition_columns": "year:LONG,month:LONG,day:LONG,hour:LONG", "hive_partition_paths": "year=2026/month=2/day=5/hour=19", "hdfs_input_processed_flags": "hdfs:///wmf/data/raw/eventlogging_legacy/eventlogging_CentralNoticeBannerHistory/year=2026/month=02/day=05/hour=19/_PROCESSED", ...