You are browsing a read-only backup copy of Wikitech. The live site can be found at wikitech.wikimedia.org

Analytics/Systems/Airflow/Developer guide: Difference between revisions

From Wikitech-static
Jump to navigation Jump to search
imported>Ottomata
(→‎Artifacts: Added gitlab example)
imported>Xcollazo
(→‎SparkSQLOperator: Make link permalink.)
Line 438: Line 438:


=== SparkSQLOperator ===
=== SparkSQLOperator ===
https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/18182aa041ccae4290f8f35a8001eebcdb71fe44/wmf_airflow_common/operators/spark.py#L276
This operator executes a query in SparkSQL using the org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver (by default). It allows us to pass in query_parameters to correctly populate the application arguments.
This operator executes a query in SparkSQL using the org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver (by default). It allows us to pass in query_parameters to correctly populate the application arguments.


Line 450: Line 452:
  )
  )


More in-depth details on the SparkSQLOperator can be found in:
=== SparkSubmitOperator ===
https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/wmf_airflow_common/operators/spark.py#L176
https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/18182aa041ccae4290f8f35a8001eebcdb71fe44/wmf_airflow_common/operators/spark.py#L16


=== Spark Submit Operator ===
This custom operator uses the wmf_airflow_common SparkSubmitHook to extend the functionality of the Airflow SparkSubmitOperator, providing us with more features such as the ability to launch spark jobs via skein. For this operator, we must set the application parameter to load the artifact that contains the spark job to be run and also specify the java class.  
This custom operator uses the wmf_airflow_common SparkSubmitHook to extend the functionality of the Airflow SparkSubmitOperator, providing us with more features such as the ability to launch spark jobs via skein. For this operator, we must set the application parameter to load the artifact that contains the spark job to be run and also specify the java class.  


Line 465: Line 466:
     sla=timedelta(days=7)
     sla=timedelta(days=7)
  )
  )
More in-depth details on the SparkSubmitOperator can be found in:
https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/wmf_airflow_common/operators/spark.py#L9


=== URLTouch Operator ===
=== URLTouch Operator ===

Revision as of 14:53, 17 June 2022

Airflow overview

For documentation about Apache Airflow, please refer to https://airflow.apache.org/docs/. This section is not intended to summarize or replace Airflow docs, but rather bring out some basic concepts and common gotchas.

A little summary of the vocabulary

Term Definition
DAG Dynamic Acyclic Graph. In Airflow it’s used as a synonym of job.
DAG run (or dag_run) The execution of a DAG for a particular time interval.
Task Unit (node) of an Airflow DAG. It’s usually a Sensor, an Operator or a TaskGroup.
Sensor Task that waits for a particular asset to be present in the specified location. For instance, it waits for a Hive partition to exist, or an HDFS path to exist. Until a Sensor doesn’t confirm the presence of its target, the subsequent tasks in the DAG are not triggered.
Operator Task that applies some workflow action, like computing a dataset, transforming some data, uploading a dataset to a data store, or creating a success file.
TaskGroup Group of tasks that can be treated as a task itself. Also, you can see a TaskGroup as an encapsulated sub-DAG.
Hive partition A division of a Hive table. In our setup, Hive tables are usually partitioned by time, so that at each new time interval (i.e. each hour/day/month/…) we add a new partition to the Hive table, containing the data that corresponds to that time interval. Partitions allow to prune the data that a query/job needs to read, thus making it more efficient. Partitions are usually named after the time period they represent, like: `database.table_name/year=2022/month=3/day=5/hour=14`.
Artifact Any external asset needed for the Airflow job to run, usually code dependencies like Maven JARs, Python virtualenvs, HQL queries, etc.
SLA Service Level Agreement. In practice, you set an SLA time on a Task. If the task instance has not finished after that time has elapsed, Airflow triggers an email alert.

Dag interpretation time vs dag execution time

Although Airflow DAGs are written in a single Python file, they are run in two stages: Dag interpretation stage and Task execution stage.

Dag interpretation stage

The DAG files are read and interpreted by Airflow about every minute. So, if you deploy a code change to your production instance (or testing instance), Airflow should pick it up quickly without the need of a restart. However, this stage does not execute your tasks, it simply parses the DAG and updates Airflow's internal representation of it. This means your Sensors and Operators will not be triggered yet; also, callables and templates will not be executed until the next stage. The DAG interpretation stage happens in the Airflow machine, and happens every minute for each DAG. Thus, it should be very efficient. We should i.e. avoid connecting to a database or querying HDFS within DAG files.

Task execution stage

When Airflow determines that a DAG should be triggered for a particular date, it will launch a "dag_run" and kick off the task execution stage. In this stage the DAG is not re-interpreted any more, only the Sensors and Operators you defined in your DAG will be executed. Sensors run from the Airflow machine, but they are mostly lightweight. Now, Operators are triggered from the Airflow machine, but ultimately (if done properly) they are executed in the Hadoop cluster. Before Sensors or Operators run, templated values will be resolved.

What are sensors and why use them?

Sensors are Airflow's way of defining data dependencies. Sometimes, we just want to trigger a data processing job on a timely schedule, i.e. a job that deletes data from a given place. It can run every hour, sharp. In this case, we don't need Sensors. However, most data processing jobs need to wait for input data to be present in the data store prior to doing any computation. Airflow sensors define those dependencies, and wait until the specified data is present (or other conditions!). In our case, sensors usually wait for 1 or more Hive partitions, or for 1 or more files/directories in a file system.

Dag runs

In Airflow a “dag_run” is the execution of a DAG for a particular time interval. DAG runs have a start datetime and an end datetime. Those delimit the period for which data will be processed/computed. The Airflow macro execution_date is very confusing, because of its misleading name. It is not the date a particular DAG is executed, but rather the start datetime of a dag_run. Note that the start datetime of a dag_run that processes data for 2022-05-12 is going to be the same (2022-05-12), regardless of when you execute the DAG. Thus, DAGs should always process/compute data starting at execution_date and ending at execution_date + granularity, where granularity is the DAG’s schedule_interval, namely: @hourly, @daily, @monthly, etc.

e.g. when we talk about the daily job of 2022-04-01

  • it handles the data from 2022-04-01-00:00 to 2022-04-01-23:59.999
  • it creates a new partition in hive which may be named 2022-04-01
  • its execution_date is going to be 2022-04-01-00:00
  • the dag run could be created and enqueued within Airflow at 2022-04-02-00:12
  • then the task instance may run at 2022-04-02-00:15, and complete at 2022-04-02-00:32

Writing a DAG using our Airflow setup

Our Airflow setup (instances, repository and libraries) offers you a set of tools for writing your DAGs. Those are designed to make your DAG code shorter and easier to read and to test.

Importing code

For your DAG, you’ll need to import Airflow native libraries, but also you can import libraries that we have developed to be shared by all WMF Airflow users. Finally, you can also develop your own helper libraries that are specific to your project.

Airflow native libraries

The Conda environment where Airflow runs (both your production instance and the development instance) has airflow installed in it. So, to import from airflow native libraries you should just use regular imports, like:

from airflow import DAG
from airflow.providers.apache.hive.sensors.named_hive_partition import NamedHivePartitionSensor

WMF Airflow common

WMF Airflow common (wmf_airflow_common) is a library that we have developed to provide tools that are tailored to WMF’s Airflow needs. It currently lives within the airflow-dags repository, but we are considering factoring it out in the future. In the meantime, because of the way the Airflow instances’ PYTHONPATH is configured, you can import libraries from it like so:

from wmf_airflow_common.operators.url import URLTouchOperator
from wmf_airflow_common.partitions_builder import partition_names_by_granularity

Instance level imports

Each Airflow instance has its own folder in the airflow-dags repository. For example, the Analytics Airflow instance, which runs in an-launcher1002.eqiad.wmnet, has an analytics folder at the root of the airflow-dags repository. All DAGs defined within that folder will be executed in (and only in) the Analytics Airflow instance. If you want to create a library that is specific to your instance, you can add your code to your instance folder within airflow-dags, for example:

Airflow-dags
    |-- your_instance_folder
    |       |-- dags
    |       |-- config
    |       \-- your_library_code

And then, because of the way the Airflow instances’ PYTHONPATH is configured, you can import libraries from it like:

from your_instance_folder.your_library_code import your_object
# Example for the analytics instance
from analytics.config import dag_config

Configuration and defaults

One of the goals of our Airflow setup is to reduce the amount of boilerplate code needed to write our jobs. Our Airflow setup offers a set of WMF defaults for the most used parameters (usually well suited for our Hadoop cluster).

WMF common DAG default args

Defaults are usually passed to your DAG via the default_args parameter (a Python dictionary). All its key-value pairs will be passed to all sensors and all operators of the corresponding DAG. For example, this will set all the WMF Airflow common defaults to your DAG:

from wmf_airflow_common.config import dag_default_args
with DAG(
    default_args=dag_default_args.get(),
    …
) as dag:

Instance level configuration

WMF Airflow common defaults are good in many cases, but there are values that need to be specified per instance. For example, the user that will run all jobs (in Airflow naming: the owner), or the email address that will receive all alerts. To do that, we advise to add instance level configuration to the WMF defaults, like so:

from wmf_airflow_common.config import dag_default_args
instance_default_args = {
    'owner': 'your_user',
    'email': ‘your_email’,
}
# This returns a new dictionary with instance defaults merged on top of wmf defaults.
default_args = dag_default_args.get(instance_default_args)

To prevent having to write this for every DAG, this is usually defined in a separate file that can be reused by all DAGs. By convention we use a file named dag_config.py within a config directory within the instance folder. Thus, when defining a DAG, we do:

from your_instance.config import dag_config
with DAG(
    default_args=dag_config.default_args,
    …
) as dag:

Feel free to add any other instance-level properties to the dag_config.py file, so that you can access them from all DAGs. For example: hadoop_name_node, hdfs_temp_directory, etc.

DAG level configuration

If you want to override or add any new configuration to any Sensor or Operator within your DAG, just pass the configuration directly to the Sensor or Operator in question. This will override the defaults passed to the DAG’s default_args parameter. For example:

from your_instance.config import dag_config
with DAG(
    default_args=dag_config.default_args,
    …
) as dag:
    sensor = SomeSensor(
        some_property=’this value overrides the defaults’,
        …
    )

Variable properties

The VariableProperties module allows you to override DAG configuration at run-time. This is useful when testing your DAG or when back-filling your production DAG. By properties we mean any configuration value. DAGs are ultimately a collection of job configuration properties. The VariableProperties code has some nice documentation that can help as well: https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/wmf_airflow_common/config/variable_properties.py

Which properties should be overridable?

Why override properties at run-time? A DAG specifies a list of properties that point to the production environment. When testing your DAG, you don’t want Airflow to use those production configs. Instead, you want to use temporary testing configurations. Here’s a list of common properties you’ll want to make overridable:

start_date You might want to test a DAG and start it at a different date than the one intended for production. You also might want to back-fill starting i.e. at an earlier date.
default_args You might want to override default_args values when executing a DAG, for instance, you want to pass your username as the owner when testing.
hql_path/artifact_path When testing, the query/artifact that you are using might not be deployed to the production cluster yet. So you might want to override the production configuration with a temporary run-time value.
destination_table/destination_path When testing, you don’t want to write on top of production data, rather write to a temp location, like your home folder.

In the end, you can make any property overridable, it’s up to you to decide.

How to make them overridable?

VariableProperties works with Airflow Variables (hence the name). It will look for property overrides in the Airflow Variable that you specify. If it finds an override, it will apply it; otherwise, it will apply the property production value. Usually you would specify an Airflow DAG configuration like this:

some_property = ‘some_production_value’
another_property = ‘another_production_value’

To make these properties overridable you must do:

var_props = VariableProperties(‘your_variable_name’')
some_property = var_props.get(‘some_property_key’, ‘some_production_value’)
another_property = var_props.get(‘another_property_key’, ‘another_production_value’)

How to override them at run-time?

To override properties at run-time you have to create an Airflow variable. Its name must be the same you specified when creating the VariableProperties object in your DAG code. Its value must be a JSON string containing a map, i.e. {“some_property_key”: “some_value”, “another_property_key”: “another_value”}. You can create an Airflow Variable in 2 ways: Using the Airflow UI or defining an environment variable in the command line. The latter is useful if you’re testing your DAG using the command line (airlfow dags test …).

Create an Airflow Variable using UI

Click on Admin > Variables. Add a new record. Key is the variable name (same as you specified in the DAG code), Val is the value (JSON string).

Create an Airflow Variable using environment variables

Run export AIRFLOW_VAR_<your_variable_name>=<JSON string> in your command line. The name of the environment variable must be the Variable name you chose in the DAG code, with the AIRFLOW_VAR_ prefix. The value must be the mentioned JSON string. e.g. `myjob_config={"start_date":"2022-04-20"}`

Handy methods

Most properties are strings or numbers. These are handled fine by Airflow Variables. However, some Airflow properties are expected to be datetimes, timedeltas or dictionaries. For those particular cases, you can use the following getters:

var_props.get_datetime(‘start_date’, datetime(2022, 4, 1))
var_props.get_timedelta(‘delay’, timedelta(days=2))
var_props.get_merged(‘default_args’, default_args)

These methods will ensure that the default value passed as a second parameter is of the expected type, and also that the JSON override specified in the Variable matches the expected type. For datetime and timedelta JSON values, use ISO8601 format i.e. 2022-04-01T00:30:15 for datetimes and i.e. P3M2D for timedeltas. On the other hand, get_merged will merge the dictionary parsed from the JSON string on top of the default value dictionary passed as a second parameter.

Artifacts

Our Airflow setup tries to make it easy to pull and use code dependencies from DAGs. It expects those dependencies to be packaged into maven JAR files or Python virtual envs. This section assumes you already have an artifact available in a public registry, like WMF’s Archiva or in a GitLab package registry. To use the artifact from a DAG, you need 2 things:

Define your artifact in artifacts.yaml

Add a snippet that identifies your artifact and tells Airflow where to find it to the artifacts.yaml file in your instance’s config folder:

artifacts:
  # No source declared, so this uses the default URL based source: id is a URL to a package in GitLab.
  example-job-project-0.15.0.conda.tgz:
    id: https://gitlab.wikimedia.org/repos/data-engineering/example-job-project/-/package_files/487/download

  # From wmf_archiva_releases, for which the id should be a maven coordinate
  refinery-job-0.1.23-shaded.jar:
    id: org.wikimedia.analytics.refinery.job:refinery-job:jar:shaded:0.1.23
    source: wmf_archiva_releases

The first key is the artifact name, you will use it to reference the artifact from your DAG. In the case of Maven JAR files the id is the Maven coordinate uniquely identifying the artifact. The source is the artifact registry where your artifact lives. Read airflow-dags/wmf_airflow_common/config/artifact_config.yaml to see all configured sources. For a more detailed documentation of the artifact configuration, see: https://gitlab.wikimedia.org/repos/data-engineering/workflow_utils/-/tree/main/#artifact-module

(Note that the artifact will be cached by its name, so you should probably make sure the artifact name has the appropriate file extension, e.g. .jar or .tgz.)

Reference your artifact from your DAG

To reference your artifact from a DAG you can use the wmf_airflow_common.artifact.ArtifactRegistry module. To initialize an ArtifactRegistry object you need to do:

artifact_registry = ArtifactRegistry.for_wmf_airflow_instance('your_instance_name')

The resulting artifact registry object provides a method that will return the location (url) of the artifact specified by the artifact_name, like so:

artifact_registry.artifact_url(refinery-job-0.1.23-shaded.jar)

So that you don’t have to create an artifact_registry object in each DAG, the dag_config.py file in your instance’s config folder already does it. It also prepares some syntactic sugar to shorten your code. Ultimately, in your DAG code, whenever you want to get the path to one of the artifacts you specified in the artifacts.yaml file, you should do:

from your_instance.config.dag_config import artifact
path_to_artifact = artifact(artifact_name)
refinery_job_jar_path = artifact(refinery-job-0.1.23-shaded.jar)

Deployment

Once your artifact is declared in your instance's artifact.yaml, you will need to deploy airflow-dags to your airflow instance. Scap will copy the artifact from it's source to configured cache locations, usually in HDFS. Once there, the artifact can be used from its cached location.

If you want to force sync, you can use the artifact-cache CLI includued in the airflow conda environment via workflow_utils. E.g.

/usr/lib/airflow/bin/artifact-cache status \
  /srv/deployment/airflow-dags/<your_instance_name>/wmf_airflow_common/config/artifact_config.yaml \
  /srv/deployment/airflow-dags/<your_instance_name>/<your_instance_name>/config/artifacts.yaml

How it works

Whenever Airflow is deployed to one of our instances, scap will launch a script that reads the artifacts.yaml file, pulls all artifacts from their sources and caches them by name in a predefined location (usually HDFS). When your DAG uses ArtifactRegistry (or artifact(‘...’)) to reference an artifact, Airflow will use the same logic scap deployment used to determine the cached location of the artifact and return that. This way the developer doesn’t need to handle artifact paths in HDFS.

Filters

Airflow allows you to use Jinja2 to templatize some fields when writing a DAG. It also provides a set of macros you can use to generate dynamic DAG parameters. The most used macro is probably execution_date. If your DAG has a @weekly schedule_interval, or it has several heterogeneous data sources, you might need to modify that execution_date to adapt to your needs. For that, you can use a set of filters provided by the wmf_airflow_common.templates module. You can use them like so:

‘{{ execution_date | to_ds_month }}’  # returns the execution_date formatted like YYYY-MM
‘{{ execution_date | start_of_current_week }}’  # snaps the execution date to the start of its week
‘{{ execution_date | end_of_next_day | add_hours(2) }}’  # snaps the execution date to the end of next day and adds 2 hours

See a full list of all available custom filters here: https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/wmf_airflow_common/templates/time_filters.py

Don’t forget

For the filters to work, you need to pass them explicitly to your DAG, like this:

from wmf_airflow_common.templates.time_filters import filters
with DAG(
    …
    user_defined_filters=filters,
) as dag:
    …

Conventions & good practices

The following paragraphs can be used as a good-practices guide to write DAGs. However, the advice here is very Data-Engineering centric. This is the way we D.E. have decided/agreed to write our DAGs. It is what we think is best now, but it might not be what you need for your use case. Also, we might change it in the future. See if it makes sense for you! :-)

Task ids

Task ids (in Operators and Sensors) should describe actions i.e. “process_certain_dataset” or “wait_for_dataset_partitions”, if possible starting with a verb and following with an object. It is a good idea to be explicit about which data is the object of the action. The task id should be unique within the DAG. Also, use snake_case (all lowercase with underscores as word separators).

DAG ids

Use snake_case for DAG ids, too (all lowercase with underscores as word separators). Also, DAG names should be unique within the Airflow instance. Naming DAGs is hard, and it’s difficult to find a formula that fits all cases. But here’s some ideas of what a DAG id could contain: The name of the resulting dataset, the granularity of the job (if there are several jobs with different granularities for the same dataset), and the tool that the DAG uses to process or store the resulting dataset (i.e. druid or anomaly_detection). Avoid very generic words like “data”, “table” or “dag”, which don’t give much information. Finally, it’s a good idea to match the DAG file name and DAG id, so if the DAG file is “blah_blah_dag.py” the id should be “blah_blah” (without the _dag.py suffix).

DAG file names

DAG file names should be the same as DAG ids, except they will have an _dag.py suffix. It’s a good idea to include any directory levels the DAG file might live in into the file name. For instance, if the DAG lives in dags/druid/monthly/ and we want to name it ‘blah_blah’, then a good file name would be druid_monthly_blah_blah_dag.py. Including the directory tree namespace in the DAG name (and in the DAG id) helps identifying the DAG in the UI, and when receiving email alerts.

Variable properties

The name of the variable used for a VariableProperties object should match the DAG id with an extra _config suffix. For example, if the DAG id is druid_monthly_navtiming, the variable for VariableProperties should be named druid_monthly_navtiming_config.

Tags

It’s very difficult to scope which are good tags to add to a DAG. But some ideas are: Don’t use concepts that are already displayed elsewhere as a tag i.e. granularity; Don’t use concepts that are going to be shared by most DAGs, like “spark”, “hive”, “hql” or “hdfs”. Don’t use concepts that are most likely only going to be used by 1 DAG. A good idea for a tag is the tool used to store the resulting dataset of the DAG, i.e. “druid” or “cassandra”, maybe also the factory used to generate the dag, like “anomaly_detection” or “refine”. Use snake_case for tags as well.

Schedule interval

Use the predefined schedule intervals whenever possible (use @daily instead of 0 0 * * *). For weekly DAGs, note the preset @weekly starts the DAGs on Sunday. If you want to start your DAG on Monday, use 0 0 * * 1. If possible, all your weekly DAGs should start on the same day (either Sunday or Monday).

SLAs

Specify SLAs at the task level (not for the whole DAG), i.e. by passing the parameter sla=timedelta(days=3) to the corresponding Operator. Choose the main operator of the DAG, usually the last one to run. Do not set SLAs for the Sensors. This way we make alerts more efficient. Note that (as opposed to with Oozie) in Airflow SLA deltas start counting when the dag_run is triggered, which is after the whole logical period has passed. So, an Oozie SLA of 30 hours for a daily job translates to an Airflow SLA of 6 hours (30 - 24).

Artifact versioning

In some cases, multiple versions of the same artifact are going to be specified in your artifacts.yaml file. To prevent having lots of different artifact versions (1 for each new DAG), let’s use this convention: Whenever we are modifying an existing DAG or adding a new one, let’s try to update their artifact versions to the latest one already available in the artifact.yaml config. If the latest available version of the artifact does not implement the necessary code, only then let’s add a new entry in the artifacts.yaml file.

DAG docs

Airflow DAGs accept a doc_md parameter. Whatever you pass it through that parameter, will show up in the Airflow UI in a convenient collapsible widget. The idea is to reuse the Python docstring (triple quote comment) at the top of the DAG file, and pass it to the DAG using:

with DAG(
    doc_md=__doc__,
    …
) as dag:
    …

You can use Markdown for a neat presentation in the Airflow UI. NOTE: Please, list all overridable properties (for VariableProperties) of your DAG in the top docstring.

Start date

The start_date should match the start of the schedule_interval. For instance, if the schedule_interval is @daily, the start_date should always point to the start of a day. If the schedule_interval is @monthly, the start_date should point to the start of a month, etc.

Sensor poke_interval

It is very important to specify the parameter poke_interval for all Sensors. The poke_interval is the frequency at which Airflow checks whether the Sensor’s target is available. A too small poke_interval on a large enough set of Sensors can easily saturate Airflow. The value you will set depends on the urgency of the job, and also on its granularity. If you don’t know which value to use, you can use the following rule of thumb:

Hourly poke_interval=timedelta(minutes=5).total_seconds()
Daily poke_interval=timedelta(minutes=20).total_seconds()
Weekly poke_interval=timedelta(hours=1).total_seconds()
Monthly poke_interval=timedelta(hours=3).total_seconds()

Sensor timeout

Sensors have another important parameter: timeout. It tells Airflow when it should stop trying to check for the Sensor’s target availability. When the schedule_interval of your DAG is significantly greater than the time the Sensors will normally wait, then we should indicate the Sensor’s timeout. For example, if our DAG is @monthly, and we expect our Sensors to wait for just 1 day or 2, we should set a timeout to prevent Airflow from checking the target during weeks. The value of the timeout should be between the DAG’s SLA and its schedule_interval. Use seconds to specify a Sensor’s timeout, i.e. timeout=timedelta(days=3).total_seconds().

Config and context

As opposed to Oozie (which completely separates configuration properties from DAG structure), Airflow’s proposal is to put both configuration properties and DAG structure in the same file. This reduces the boilerplate a lot and makes code shorter, provided common structures are factored out into custom Operators, Sensors and TaskGroups. Another advantage is that configuration properties can be side by side with their context. Consider avoiding this style:

property_1 = ‘value_1’
property_2 = ‘value_2’
property_3 = ‘value_3’
property_4 = ‘value_4’
…
with DAG(
    …
) as dag:
    op1 = Operator(
        property_1=property_1,
        property_2=property_2,
    )
    Op2 = Operator(
        property_3=property_3,
        property_4=property_4,
    )

Which separates the configuration from its context, and consider using this style:

with DAG(
    …
) as dag:
    op1 = Operator(
        property_1=’value_1’,
        property_2=’value_2’,
    )
    Op2 = Operator(
        property_3=’value_3’,
        property_4=’value_4’,
    )

An exception to this rule can be when a property is used in more than one place, then by defining it at the top, we make core more DRY.

TaskGroup factories vs. DAG factories

In some cases we might want to factor out a common sub-DAG into a factory that can be reused in multiple jobs. At first we tried implementing DAG factories that would accept a collection of DAG parameters and return a finished “closed-box” DAG. This approach is OK but has some disadvantages: Since the resulting DAG is closed, it can never fulfill all corner cases of various jobs and it can not be adapted to their special needs. Instead, we agreed that implementing TaskGroup factories was a better approach. TaskGroups are sub-DAGs that can be treated as tasks (you can define dependency relations between them and other tasks or TaskGroups). This way factories become much more flexible and powerful.


wmf_airflow_common Sensors and Operators

The WMF Airflow common library provides a set of custom Sensors and Operators tailored to our Hadoop setup. Here’s a quick overview. For more detail, see the custom Sensors and Operators code. IF WMF’s custom sensors do not implement what you need, it means there’s already a native Airflow sensor you can use, see them here. TODO: Move this Sensors and Operators documentation to airflow-dags repository, using some automatic documentation generator, like Sphinx.

Sensors

RangeHivePartitionSensor

https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/wmf_airflow_common/sensors/hive.py#L6

This Sensor dynamically generates a range of Hive partitions to be checked. Its main advantage is that it renders the list of partitions at run-time, after Airflow macros (like execution_date) have been resolved. This allows, for instance, to generate a set of hourly partitions for a given weekly interval. Be careful though, this Sensor does not implement the necessary interfaces to be considered a SmartSensor by Airflow, so it’s less efficient than regular Airflow Sensors. If you can, you should use Airflow’s NamedHivePartitionSensor instead.

sensor = RangeHivePartitionSensor(
    task_id='wait_for_some_dataset',
    table_name='some_db.some_table',
    from_timestamp='2022-02-10',
    to_timestamp='2022-02-13',
    granularity='@hourly',
)

The resulting list of partitions would be:

some_db.some_table/year=2022/month=2/day=10/hour=0
some_db.some_table/year=2022/month=2/day=10/hour=1
some_db.some_table/year=2022/month=2/day=10/hour=2
…
some_db.some_table/year=2022/month=2/day=10/hour=23
some_db.some_table/year=2022/month=2/day=11/hour=0
some_db.some_table/year=2022/month=2/day=11/hour=1
…
some_db.some_table/year=2022/month=2/day=12/hour=22
some_db.some_table/year=2022/month=2/day=12/hour=23

daily_partitions

https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/wmf_airflow_common/partitions_builder.py#L40

This is not a Sensor, it’s just a helper, but it will always be used in combination with a Sensor. It returns a list of Hive partitions for a day of data, the partitions can be @hourly or @daily.

sensor = NamedHivePartitionSensor(
    task_id='wait_for_some_dataset',
    partition_names=daily_partitions(
        table=’some_db.some_table’,
        granularity=’@hourly’,
    ),
)

The results of the call to daily_partitions would be:

some_db.some_table/year={{execution_date.year}}/month={{execution_date.month}}/day={{execution_date.day}}/hour=0
some_db.some_table/year={{execution_date.year}}/month={{execution_date.month}}/day={{execution_date.day}}/hour=1
some_db.some_table/year={{execution_date.year}}/month={{execution_date.month}}/day={{execution_date.day}}/hour=2
…
some_db.some_table/year={{execution_date.year}}/month={{execution_date.month}}/day={{execution_date.day}}/hour=22
some_db.some_table/year={{execution_date.year}}/month={{execution_date.month}}/day={{execution_date.day}}/hour=23

Note that the resulting partitions are templated and use Airflow’s execution_date macro. The generation of mentioned partitions happens at DAG interpretation time, which is more efficient than with RangeHivePartitionSensor.

URLSensor

https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/wmf_airflow_common/sensors/url.py#L25

This sensor waits for a given URL to exist. For instance, it waits for a file to exist in a file system.

hdfs_sensor = URLSensor(
    task_id='wait_for_some_dataset',
    url='/wmf/data/wmf/some/dataset/snapshot={{ ds }}/_SUCCESS',
)

If execution_date was 2022-01-01 This sensor would wait for the following URL to be present:

/wmf/data/wmf/some/dataset/snapshot=2022-01-01/_SUCCESS

Operators

SparkSQLOperator

https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/18182aa041ccae4290f8f35a8001eebcdb71fe44/wmf_airflow_common/operators/spark.py#L276

This operator executes a query in SparkSQL using the org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver (by default). It allows us to pass in query_parameters to correctly populate the application arguments.

task_name = SparkSqlOperator(
    task_id='task_name',
    sql='hql_directory/path_to_hql_file.hql',
    query_parameters={
        'some_parameter_key': ‘some_parameter_value’,
        'another_parameter_key': ‘another_parameter_value’
    },
    sla=timedelta(hours=6),
)

SparkSubmitOperator

https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/18182aa041ccae4290f8f35a8001eebcdb71fe44/wmf_airflow_common/operators/spark.py#L16

This custom operator uses the wmf_airflow_common SparkSubmitHook to extend the functionality of the Airflow SparkSubmitOperator, providing us with more features such as the ability to launch spark jobs via skein. For this operator, we must set the application parameter to load the artifact that contains the spark job to be run and also specify the java class.

task_name = SparkSubmitOperator(
    task_id='task_name',
    application=artifact('artifact_name'),
    java_class='custom_java_class_name',
    application_args={
        '--argument_name': ‘argument_value’
    },
    sla=timedelta(days=7)
)

URLTouch Operator

This operator is useful for writing SUCCESS flags or any other files. It is the bridge between jobs that are still on Oozie and jobs that have been migrated to Airflow. Note that this operator only works with URLs that are supported by fsspec on writable file systems.

success = URLTouchOperator(
    task_id='task_name',
    url=(dag_config.hadoop_name_node + ‘destination_path’ + 'partition’/_SUCCESS),
)



Testing

Unit tests

Setting up the environment

When in your DAG folder, Create your virtual environment with conda:

conda create --name airflow-dags python=3.7
conda activate airflow-dags

Or virtualenv:

virtualenv -p python3 venv
source venv/bin/activate

Then get your build dependencies with

pip install.

and your test dependencies with

pip install .[test]

These install dependencies from the setup.cfg file at the root of your DAG folder. For these steps to succeed you might have to install system dependencies before. If the process of install starts downloading multiple versions of some software, it probably means a previous step has failed and you should trouble why. On Debian I had to install dependencies for SASL (see https://pypi.org/project/sasl/0.1.3/) and kerberos (https://github.com/apple/ccs-pykerberos/issues/66#issuecomment-838285162).

Finally, once all dependencies have been installed, be sure to deactivate and reactivate your conda environement for all settings to be correctly applied:

conda deactivate airflow-dags

conda activate airflow-dags

And now tests can be run with:

pytest

Definition test

This is a test to check if the DAG definition is properly read by the Airflow scheduler. We can do it directly in python with python path/to/you/file_dag.py. Or by writing a unit test. See an example here: https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/tests/analytics/aqs/hourly_dag_test.py

Testing the DAG itself

As we try to keep the logic outside of our DAGs, the tests are mainly for sanity checks. 3 noteable use cases:

  • Check the interpolations of the dates generated with Jinja
  • Check the generation of the partitions lists / the sensors
  • Check the global number of tasks in the DAG

Testing the libraries

In our project, libraries live in wmf_airflow_common. This is where we define Custom operators and other helpers. Those files are important to unit test.

CI

Currently, we have tests running with pytest on python 3.7 and 3.9 + linting with flake8 and mypy. All is defined here:

Development instance

We may run a custom dev instance in the cluster (e.g., stat1004, stat1007) . Handy for testing your code in production.

Limits

  • Not easy to run Skein with it (Skein needs a keytab).
  • 1 process to do the job of the worker and the scheduler’s job.
  • Your user <> analytics user

Setup

Git clone your dag repo on a stats box. Then run ./run_dev_instance.sh. It’s going to create a virtual environment, a sqlite DB, and launch the Airflow processes. Open an ssh tunnel to access the UI as described in the scrip logs.

Input/ouput override procedure

Make sure to unpause your dag only when the output of the script has been overridden by the following procedure. Use custom variable properties to configure your DAG for test. E.g. var to override:

  • start_date
  • end_date
  • output_dir / output_table
  • tmp_dir

Analytics test Cluster

Limits:

  • The lack of data
  • The barrier to production (also, you can’t break production)
  • The small size of the cluster
  • Some datasets are subsets of the original with different partition names

How to deploy

As simple as a scap deploy from the deployment machine

You may deploy a custom branch:

cd /srv/deployment/airflow-dags/analytics_test
git fetch
scap deploy -r origin/my_branch

Use cases

  • Canary for production
  • Deploying custom branch