Jump to content

This is a read-only backup copy of Wikitech. The live site can be found at wikitech.wikimedia.org

Data Platform/Systems/Refine

From Wikitech
The printable version is no longer supported and may have rendering errors. Please update your browser bookmarks and please use the default browser print function instead.

Refine (Airflow-based, current)

Refine is the production Spark job that evolves Hive schemas and refines "event" streams (JSON files in hourly Hive partitions) into Parquet-backed Hive tables in the event database. Most jobs have been migrated from systemd timers to "Airflow" and now run with a revised, simpler Scala implementation.

Scope

  • Refines "events only" (not arbitrary datasets) sourced from hourly JSON Hive partitions in /wmf/data/raw/event/ and /wmf/data/raw/eventlogging_legacy/.
  • Evolves/creates the destination Hive table and writes Parquet to /wmf/data/event/ (database: event).
  • Uses the "latest schema version" from the schema registry to evolve the Hive table and select columns from the JSON.
  • Everything is in place to support "Apache Iceberg" tables as output (rollout is prepared/compatible).

Scala entry points

There are two main entry points, plus a convenience bundle:

  • org.wikimedia.analytics.refinery.job.refine.cli.EvolveHiveTable
  • org.wikimedia.analytics.refinery.job.refine.cli.RefineHiveDataset
  • org.wikimedia.analytics.refinery.job.refine.cli.EvolveAndRefineToHiveTable (bundle used in Airflow to reduce the number of individual jobs: one Airflow task ⇒ one k8s pod ⇒ one Skein app ⇒ one Spark app)

Exception: the "netflow" job still uses the "legacy" Refine entry point org.wikimedia.analytics.refinery.job.refine.Refine (see Legacy Refine ).

Scheduling

  • Refine workloads are orchestrated by "Airflow" (DAG: refine_to_hive_hourly).
  • The DAG maps over streams; each mapped task launches a k8s pod running a Skein app that starts Spark.

Data readiness / lateness handling

  • Refine waits for raw data to be present for 2 consecutive hours before refining. This gives Gobblin time to ingest late events (e.g., hour H events arriving during hour H+1).

Stream requirements

To be refined, a stream must have:

  • A schema declared in schemas-event-primary/secondary
  • A declaration in the "Event Stream Configurator (ESC)":
    • enable analytics_hive_ingestion
    • enable analytics_hadoop_ingestion
    • "activate canary events" for the stream

Refine fetches the list of streams at the start of the DAG run:

  • it refines streams where consumer analytics_hive_ingestion is declared and enabled,
  • it skips streams without canary events,
  • and it respects Airflow variables streams_allow_list and streams_disallow_list.

ESC references:

Deployment

  1. Deploy the new "refinery-source" version.
  2. Update the "airflow-dags" artifact reference to that version.
  3. Deploy "airflow-dags".

Logs

  • Spark "driver logs are collected in Airflow" (verbosity reduced for readability) and usually contain what you need.
  • Full logs are still available via YARN:
 sudo -u hdfs yarn logs -applicationId <application_id>
  • If you need very verbose driver logs, adjust/skip the variable spark_driver_log4j_properties_file.

Refine job sizing

To match resources to stream volume we use a T-shirt sizing:

  • small — local Spark job within the Skein container/k8s pod (default for new streams)
  • medium — regular distributed Spark job via Skein
  • large — distributed Spark job with more resources

We set it on EventStreamConfig in consumers.analytics_hive_ingestion.spark_job_ingestion_scale

Tips:

  • Start with "small", then scale up if the job takes too long / crashes due to OOM.
  • Estimate from raw data volume in HDFS and Kafka Grafana dashboards by comparison with existing streams.
    • list the streams on EventStreamConfig and get some examples of medium or large
    • Look at the topic size on the Grafana Dashboard for your topic and the one you selected previously
    • Pick the T-shirt size identical to the stream with the nearest topic in terme of topic size. And use the topic size value within a peak if your stream is subject to high hour variability.

Markers / flags

  • The legacy _REFINE_FAILED and ignore_failure_flag mechanisms are "not used" anymore.
  • We add a _PROCESSED marker in the "raw" folders "before" Refine starts. Gobblin uses this to estimate the number of late-event files—hence the marker is set pre-refinement.

Data locations

  • Destination DB: event
  • Destination HDFS: /wmf/data/event/
  • Sources (raw JSON):
    • /wmf/data/raw/event/
    • /wmf/data/raw/eventlogging_legacy/ (single-DC legacy streams; names are CamelCase at source, downcased in HDFS/Hive)
  • Current DCs: eqiad and codfw

Failures

Sensor failures

Airflow sensors fail when the expected raw partition is missing. Common causes:

  • Gobblin delays/failures
  • Missing canary events or delays/failures in the canary DAG

Resolution:

  • Fix the upstream issue and "rerun the sensor".
  • In the unlikely case the absence is expected, you may mark the precise sensor tasks as "success".

Refine job failures

  • Schema evolution incompatibilities (a new JSON schema version cannot be applied directly by EvolveHiveTable)
  • Infrastructure issues (network/HDFS outages, etc.)

Mitigations:

  • Diagnose in Airflow driver logs or YARN full logs.
  • If a schema is marked "inactive", add dependent streams to streams_disallow_list and then disable analytics_hive_ingestion in ESC.

Manually disabling streams

  • Add a stream to Airflow variable streams_disallow_list to temporarily disable refinement.

Reruns / clearing in Airflow

General advice: when rerunning the large Refine DAG, be precise about which "task instances" you clear; avoid clearing the whole DAG (~600 tasks).

CLI example ("be careful"):

airflow tasks clear \
–dag-regex refine_to_hive_hourly \
–task-regex evolve_and_refine_to_hive_hourly \
–start-date "2023-08-01T05:00:00" \
–end-date   "2023-08-01T08:00:00" \
–only-failed

This clears "all mapped tasks for all streams" in that time window. If you need per-stream precision, first identify the stream’s map_index, then use a targeted script (see https://gitlab.wikimedia.org/-/snippets/245 ).

Manual invocation (new Refine)

Example (adjust artifact version/paths as needed):

Run it on a stat box for a single hour:

spark3-submit –master yarn   \
–conf write.spark.accept-any-schema=true \
–conf ‘spark.driver.extraJavaOptions= -Dlog4j.configuration=file:quieter_spark_log4j.properties’ \
–name refine_test \
–class org.wikimedia.analytics.refinery.job.refine.cli.EvolveAndRefineToHiveTable \
–deploy-mode client \
hdfs:///wmf/cache/artifacts/airflow/analytics/refinery-job-0.2.69-shaded.jar \
–table my_db.centralnoticebannerhistory \
–schema_uri /analytics/legacy/centralnoticebannerhistory/latest \
–location hdfs://analytics-hadoop/wmf/data/event/centralnoticebannerhistory \
–partition_columns year:LONG,month:LONG,day:LONG,hour:LONG \
–transform_functions org.wikimedia.analytics.refinery.job.refine.filter_allowed_domains,org.wikimedia.analytics.refinery.job.refine.remove_canary_events,org.wikimedia.analytics.refinery.job.refine.deduplicate,org.wikimedia.analytics.refinery.job.refine.geocode_ip,org.wikimedia.analytics.refinery.job.refine.parse_user_agent,org.wikimedia.analytics.refinery.job.refine.add_is_wmf_domain,org.wikimedia.analytics.refinery.job.refine.add_normalized_host,org.wikimedia.analytics.refinery.job.refine.normalizeFieldNamesAndWidenTypes \
–input_paths /wmf/data/raw/eventlogging_legacy/eventlogging_CentralNoticeBannerHistory/year=2025/month=08/day=21/hour=00 \
–partition_paths year=2025/month=8/day=21/hour=0 \
–spark_job_scale medium


See data that might have failed

kafkacat -b kafka-jumbo1002.eqiad.wmnet:9092 -t codfw.mediawiki.revision-score

Troubleshooting (quick tips)

  • Airflow task logs include stream name and the partition window; use them to scope the issue.
  • The destination partitions can be checked from Hive or HDFS (hdfs dfs -ls).
  • Source data can also be invalid; validate via Hive or HDFS listings.
  • If you can’t locate the failing app in the UI, re-run the minimal set of tasks to reproduce and capture the new applicationId, then fetch YARN logs.

New stream declaration

When a new stream is declared in EventStreamConfig , two intertwined processes may begin (if enabled):

  1. Canary event generation
  2. Stream refinement

Typically, to trigger a refinement job for a given hour, raw/input data must be available for both that hour and the following hour . This ensures that the streaming pipeline is healthy before refining any data.

For a newly declared stream, however, there is no guarantee that canary jobs will populate data in time. As a result, the first few hours may contain data. To avoid blocking Refine for new streams:

  • For the first three hourly partitions , we refine whatever data is available . (no crash in case of no data)
  • This refinement occurs 20 minutes after the end of the hour , regardless of data completeness.

Legacy Refine (systemd-based)

This section documents the previous Refine job and workflows. It remains relevant for streams that still use the legacy pipeline (e.g., "netflow").

Overview

The legacy Spark job (org.wikimedia.analytics.refinery.job.refine.Refine) “refined” arbitrary datasets into Parquet-backed Hive tables. It inferred table/partition names by regex-matching input paths.

Administration

  • Various legacy Refine jobs were scheduled via systemd/cron on the analytics coordinator (e.g., an-launcher1003.eqiad.wmnet) as user analytics, logging to /var/log/refinery.
  • Jobs run on YARN; local log files are minimal. Fetch logs via:
sudo -u hdfs yarn logs -applicationId <application_id>

Failed jobs

Causes included schema registry inaccessibility, conflicting types, and other unexpected errors. A failure wrote a _REFINE_FAILED flag file and sent an email alert. Limited automatic casting could occur (e.g., int→float), otherwise conflicting rows could be nullified.

Check if the schema is inactive

If the schema owners mark it "inactive", disable refinement. Add the schema to table_exclude_regex in Puppet (example patch linked in history).

Rerunning jobs

A failing hour wrote _REFINE_FAILED in the destination partition, e.g.:

  • Source: /wmf/data/raw/eventlogging/eventlogging_MobileWikiAppiOSFeed/hourly/2019/12/04/20
  • Dest: /wmf/data/event/MobileWikiAppiOSFeed/year=2019/month=12/day=4/hour=20

To rerun:

  • Remove the flag file "or"
  • Run with –ignore_failure_flag=true to re-attempt failed partitions within –since.

To go beyond default –since, provide both –since and –until (ISO-8601).

"To rerun a job that previously "succeeded", remove the _REFINED flag (e.g., hdfs -rm -r)."

Wrapper scripts (e.g., /usr/local/bin/refine_eventlogging_legacy) load properties from /etc/refinery/refine; CLI flags override any property.

Examples:

sudo -u analytics kerberos-run-command analytics /usr/local/bin/refine_eventlogging_legacy –ignore_failure_flag=true –table_include_regex=MobileWikiAppShareAFact –since=96 –verbose


sudo -u analytics kerberos-run-command analytics \
  /usr/local/bin/refine_eventlogging_legacy \
  --ignore_failure_flag=true \
  --since=2019-06-26T21:00:00 \
  --until=2019-06-26T22:00:00 \
  --table_include_regex="FeaturePolicyViolation|ResourceTiming|UploadWizardExceptionFlowEvent|MobileWikiAppOnThisDay|MultimediaViewerAttribution|MobileWikiAppSavedPages|WikidataCompletionSearchClicks|ClickTiming|QuickSurveysResponses|EchoMail|MobileWikiAppFeed|GuidedTourGuiderImpression" \
  --verbose


If the main job is currently running, these commands will no-op (they should warn and exit). Check status and retry later:

/usr/bin/yarn application -status <applicationId>

Malformed / corrupt records

Legacy Refine could be run with a permissive JSON mode to drop unparseable records:

sudo -u analytics kerberos-run-command analytics
/usr/local/bin/refine_eventlogging_legacy  
–table_include_regex=‘SearchSatisfaction’
–since=‘2020-05-29T21:00:00’
–until=2
–ignore_failure_flag=true
–dataframereader_options=‘mode:DROPMALFORMED’

See Spark DataFrameReader#json "mode" docs for details.

Running locally / yarn client

Because Spark SQL required direct Hive DDL connections, legacy Refine in local or yarn-client mode needed explicit Hive client jars and proxy options. Example:

/usr/bin/spark2-submit
–name otto_test_refine_eventlogging_0
–class org.wikimedia.analytics.refinery.job.refine.Refine
–master yarn
–deploy-mode client
–conf spark.driver.extraClassPath=/usr/lib/hadoop-mapreduce/hadoop-mapreduce-client-common.jar:/srv/deployment/analytics/refinery/artifacts/hive-jdbc-1.1.0-cdh5.10.0.jar:/srv/deployment/analytics/refinery/artifacts/hive-service-1.1.0-cdh5.10.0.jar
–driver-java-options=’-Dhttp.proxyHost=webproxy.eqiad.wmnet -Dhttp.proxyPort=8080 -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080’
/srv/deployment/analytics/refinery/artifacts/refinery-job.jar
–output_database=otto_json_refine_test
–hive_server_url=an-coord1001.eqiad.wmnet:10000
–input_path=/wmf/data/raw/eventlogging
–input_path_regex=‘eventlogging_(.+)/hourly/(\d+)/(\d+)/(\d+)/(\d+)’
–input_path_regex_capture_groups=‘table,year,month,day,hour’
–output_path=/user/otto/external/eventlogging13
–table_include_regex=’^ChangesListHighlights$’
–transform_functions=org.wikimedia.analytics.refinery.job.refine.deduplicate_eventlogging,org.wikimedia.analytics.refinery.job.refine.geocode_ip
–schema_base_uri=eventlogging
–since=12 –until=10


Legacy job catalog

Existing legacy jobs were documented in Puppet (e.g., refine.pp).

Legacy troubleshooting

  • Alert emails included the job name (some wrappers, like EventLoggingSanitization) and failing partitions.
  • Refine periodically re-refined and could fix failed partitions automatically.
  • Find source/destination HDFS paths and Hive tables in refine.pp or data_purge.pp.
  • Check presence of destination partitions (Hive or hdfs dfs -ls).
  • Validate source data similarly.
  • To find YARN logs: locate the applicationId in yarn.wikimedia.org (by job name and datetime) and fetch logs as above.
  • If you can’t find the applicationId, reproduce by re-running the job manually. On an-coord1001.eqiad.wmnet use:
systemctl list-timers → find the job → systemctl status <service> → extract the run script/command; add –since/–until as needed.