Jump to content

This is a read-only backup copy of Wikitech. The live site can be found at wikitech.wikimedia.org

Dumps/Airflow

From Wikitech

As part of T352650 WE 5.4 KR - Hypothesis 5.4.4 - Q3 FY24/25 - Migrate current-generation dumps to run on kubernetes , we decided to migrate the dumps infrastructure to Kubernetes. Specifically, we wanted to replace:

  • the dumps scheduler and systemd timers by an airflow instance
  • the dumps worker jobs running on the snapshot hosts by airflow tasks running on Kubernetes
  • the dumps NFS storage layer by Ceph

The dumps code would still be used (although it might have to be adjusted a bit to be able to run in Kubernetes, to account for ie some networking discrepancies), and the final dumps artifacts would still be hosted by the clouddumps hosts. In that regard, this project differs from the so-called "dumps v2" project, as it is not about modernizing the dumps themselves, but merely its orchestration.

DAGs and Airflow instance

The DAGs are defined under https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/tree/main/test_k8s/dags/dumps and run on https://airflow-test-k8s.wikimedia.org .

Architecture

At its core, the Dumps on Airflow architecture is quite simple: Airflow execute the dumps DAG in the dedicated airflow namespace, and the DAG tasks are responsible for managing the dump pods in the mediawiki-dumps-legacy namespace, themselves running the dump command/script.

Airflow schedules DAG tasks in its own namespace, themselves in charge of running dump jobs in the mediawiki dumps namespace

This allows a clear separation between the airflow and dumps Kubernetes resources, and gives us a clear visibility over the resources required by the dumps themselves.

The overall structure of any dump DAG is the following:

  • the first task fetches a Pod spec from a CronJob defined in the mediawiki-dumps-legacy namespace.
  • Each dump task will then be scheduled as a KubernetesPodOperator , itself in charge of running a dump Pod using the spec fetched in the firs task, with dump command specific to the dump itself.
  • Each dump task is followed by a synchronization task, that rsync s the ceph volume to the clouddumps hosts (exposed on the internet), to make the previous dump generation artifacts available to the public.

We have defined the following Dumps#Dumps types :

XML/SQL dumps DAGs

A mediawiki wiki dump DAG is composed of a first step, fetching the dump pod spec, as well as parallel dump pipelines, one per wiki, each running multiple stages. Each stage runs multiple dump jobs.

This is, by far, the most complex dump DAGs we maintain, as we have to differentiate

  • full dumps vs partial dumps
  • regular wikis vs large wikis
Regular vs Large wikis
Instead of requesting these wiki at runtime, using dynamic mapped tasks, we hardcode these wiki into https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/wmf_airflow_common/includes/regular_wikis.json and https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/wmf_airflow_common/includes/large_wikis.json which allows the DAG serialization to fully happen at parsing time, which substantially alleviates load on the scheduler
Full dumps
The tasks of a XML/SQL full dump for a regular-sized wiki
The tasks of a XML/SQL full dump for a regular-sized wiki
The tasks of a XML/SQL full dump for a large-sized wiki
The tasks of a XML/SQL full dump for a large-sized wiki

Full dump DAGs are executed on the 1st day of each month, and runs all dump stages for each wiki:

  • xmlstubsdump
  • xmlstubsdumprecombine (large wikis only)
  • tables
  • articlesdump
  • articlesdumprecombine (large wikis only)
  • articlesmultistreamdump
  • articlesmultistreamdumprecombine (large wikis only)
  • metacurrentdump
  • metacurrentdumprecombine (large wikis only)
  • remaining (all remaining jobs)
The recombine steps are executed for large wikis only. These wikis are large enough that the associated dump stage produces several files in parallel, and the recombine steps are there to.. recombine these files into a single one.
Dependency graph between the dump and sync tasks

We run sync steps after certain dump steps, and we run them in parallel to the following dump step, to speed up the execution.

Partial dumps

Partial dumps DAGs are executed on the 20th day of each month, and skip some dump jobs, for each wiki:

  • metahistorybz2dump
  • metahistorybz2dumprecombine
  • metahistory7zdump
  • metahistory7zdumprecombine
We define multiple of these DAGs to keep the amount of wikis managed by each DAG between 100 and 150, in order to keep the airflow UI fast enough and its memory usage within control
Alerting

All alert emails go to data-engineering-alerts@wikimedia.org here and are managed by the Ops-week rotation.

Lifetime of a task pod

Once a dump DAG task pod is started by the Airflow KubernetesExecutor , it will itself start a dump pod in the mediawiki-legacy-dumps namespace via a WikimediaDumpOperator operator subclass (itself a subclass of KubernetesPodOperator ). That operator class encapsulate the whole logic of running a sequence of dump jobs for a given wiki and a given dump type (SQL/XML, Cirrus, added/removed, etc).

The task pod will start a dump pod using the job spec fetched in the first DAG task, create the dump pod, and manage its lifetime. The "real work" really happens in the dump pod itself.

This pod will execute the dump worker script with appropriate arguments.

The dump pod itself has multiple containers:

  • the dump container, in charge of running the dump jobs
  • the envoy service mesh proxy, in charge of proxying traffic to internal services
  • an rsyslog container (which use remains to be proven

Once the dump container terminates, the pod will transition between the 3/3 Running state to 2/3 Unready , which means that the dump container has terminated but that the sidecar containers keep running. That state will be picked up on by the k8s-controller-sidecars operator, that will exec into both sidecar containers and kill their PID 1. At which point, the pod will enter the 0/3 Completed state, causing the dump operator (the task pod) to delete it altogether.

Pools and parallelism

Running the dumps on Airflow will put pressure on the following resources:

  • the kubernetes API servers load : creating and managing the lifetime of many pods at the same time causes many requests to be sent to the Kubernetes API
  • the mediawiki-dumps-legacy namespace resource (cpu/memory request/limits) quotas : as pods get created in mediawiki-dumps-legacy , one of the resource quotas can be reached, causing further pod scheduling to fail
  • the dbstore MariaDB servers: requested for all wiki medadata, except the articles content themselves
  • the external storage MariaDB servers: requested for the articles content themselves
  • the dump pod memory itself: when running the xmlpagelogsdump dump job, the associated php process in the dump pod has been known to be OOMKilled . This is well tolerated by the worker process itself, as it will simply re-execute it.
  • the airflow scheduler CPU: running many different DAG tasks without overwhelming the scheduler has proven to be a challenge. We have consigned all the different optimizations (whether in configuration, DAG structure, resource allotment, etc) in T390945
  • the airflow webserver memory : observing a large dump DAG in the airflow web UI has proven to be memory hungry as well. We have increased the default allotted memory to the test-k8s webserver.
  • the CPU usage of the k8s-controller-sidecars pod. If this pod gets overwhelmed, the terminated dump pods take time to get fully deleted, thus slowing down the whole dump DAG.
The main issue is managing the load on the external storage MariaDB servers. Everything else is internal, but the external storage servers are shared with production wiki traffic. Which means that if we overwhelm them, we negatively impact production traffic, up to possibly causing an outage.

To that effect, we have provisioned a dedicated airflow pool for the regular SQL/XML wiki dumps, and an another one for the SQL/XML large wiki dumps. We can change the number of pool slots at runtime , effectively controlling the maximal dump parallelism we allow, if we ever get notified of a high load on the external storage servers.

All other dumps go through the default pool.

Making the dumps publicly available

The dumps jobs write their private, temporary and public files to Ceph, under /mnt/dumpsdata . However, Ceph itself is not exposed to the public internet. However, https://dumps.wikimedia.org is served from the clouddumps hosts:

~  host dumps.wikimedia.org
dumps.wikimedia.org is an alias for clouddumps1002.wikimedia.org.
clouddumps1002.wikimedia.org has address 208.80.154.71
clouddumps1002.wikimedia.org has IPv6 address 2620:0:861:3:208:80:154:71

In order to make the public dumps data available to the public, we interweave synchronization tasks with dump tasks. This way, every time a dump task has finished, its result is immediately synchronized to the clouddumps hosts.

We do this by running a sync-utils pod that synchronizes the public files for a given wiki and a given dump date to the clouddumps hosts, via parallel-rsync / ssh .

Operations

See Dumps/Airflow/Operations