You are browsing a read-only backup copy of Wikitech. The live site can be found at


From Wikitech-static
< Analytics‎ | Systems
Revision as of 16:31, 4 September 2018 by imported>Ottomata (→‎Rerunning jobs)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Jump to navigation Jump to search

Refine is a Spark job that 'refines' arbitrary datasets into Parquet backed Hive tables. It passes over the incoming dataset, creates or alters a Hive table to match the dataset, and then inserts the data into the Hive table.

Refine expects that table names and partitions can be regex matched out of incoming dataset paths.

Refined data types

Refine infers field data types from the incoming data itself. It does not use any externally defined JSON Schemas. This allows it to be agnostic about the system where the data comes from. JSON data does not on its own have a schema, so in order to map a JSON dataset to a Hive table, the entire dataset must first be read. Every encountered field in each record will be added to a larger union schema representing every field in the dataset. Example: Let's say you have a dataset that has 3 records in it: { "id": 1 } { "id": 2, "name": "Donkey" } and { "status": "sleeping" }. Refine will scan these records, and notice that there are a total of 3 distinct fields: "id": int, "name": string, and "status": string. The resulting Hive table schema columns will look like

`id`: bigint,
`name`: string,
`status`: string

Schema inference is done recursively. Nested object become Hive struct types.

This gets especially tricky with JSON data and numbers. JSON does not have a distinction between decimal and integer numbers. Let's say you have records like { "measure": 1 } and { "measure": 1.1 }. In this case, Refine will be able to cast the type of `measure` to the wider data type of double. However, now let's say that this is not the first time the dataset has been refined, and that there is already an existent Hive table. The first time this dataset was refined, refine only encountered `measure` fields with integers. Refine does not handle type changes, so the existent Hive table's field type becomes the canonical one. When refining new datasets, if there is a decimal value of `measure`, that value will be cast to integer, and the decimal part will be lost. This should work in the opposite direction as well. If an existent Hive table has a double column, for which there is new data that looks like integers, those integers will be cast to doubles. There are also other conditions where this casting might work or not work. Refine tries to do its best, but it can't always succeed when casting.

The main lesson here is: BE EXPLICIT. If you are emitting JSON events and you want a field to be a decimal, you should ALWAYS include a decimal part. E.g. never emit { "measure": 1 } when what you really want is { "measure": 1.0 }.


There are various Refine jobs scheduled to run in the Analytics Cluster. These cron jobs run on the analytics coordinator host (currently analytics1003) as the hdfs user and log their output to files in /var/log/refinery.


The jobs themselves run in YARN, so the local log files will not have much useful information. Once one of these jobs has finished, you should be able to see the logs using the yarn CLI: sudo -u hdfs yarn logs -applicationId <application_id>

Failed jobs

If data for a partition contains fields with types that conflict with an existent Hive table, the Refine job for that partition will likely fail. This will cause the _REFINE_FAILED flag to be written, and an email alert sent. The entire partition is not refine-able. However in some simple cases, Refine will be able to cast between types. The only known example of this is casting an integer to a float field. In other cases, Refine will be able to choose the source table's field type for a field. In this case, it is likely that all of the fields of the record with the conflicting type field will be nullified.

Rerunning jobs

If a Refine job for a particular hour fails, it will write a _REFINE_FAILED flag file into the destination partition directory. To schedule a job to rerun, you may either remove this flag file, or run the Refine job with the --ignore-failed-flag option. The latter will cause the job to re-attempt refinement of any partition in its --since range that has a _REFINE_FAILED flag.

If you need to run a job that is farther in the past than the configured --since flag, you may use both the --since and --until flags with ISO 8601 date formats to specify a date range.

Wrapper scripts for each refine job are installed on analytics1003 in /usr/local/bin. To find the full command to re-run, cat the contents of the relevant refine job wrapper, and edit any parameters you might need. For example, if you had to re-run any failed partition refinements in the last 96 hours for the EventLogging analytics table MobileWikiAppShareAFact, you'd cat the contents of /usr/local/bin/refine_eventlogging_analytics, and remove --table-blacklist arg and replace it with --table-whitelist MobileWikiAppShareAFact, and then add the --ignore-failure-flag option on the end. E.g:

sudo -u hdfs /usr/bin/spark2-submit --master yarn --deploy-mode cluster --queue production --driver-memory 8G \
--conf spark.driver.extraClassPath=/usr/lib/hive/lib/hive-jdbc.jar:/usr/lib/hadoop-mapreduce/hadoop-mapreduce-client-common.jar:/usr/lib/hive/lib/hive-service.jar \
--conf spark.dynamicAllocation.maxExecutors=64 --files /etc/hive/conf/hive-site.xml \
--class \
--name refine_eventlogging_analytics /srv/deployment/analytics/refinery/artifacts/org/wikimedia/analytics/refinery/refinery-job-0.0.62.jar \
--parallelism 12 --since 96 \
--table-whitelist '^MobileWikiAppShareAFact$' \
--send-email-report --to-emails \
--input-base-path /wmf/data/raw/eventlogging \
--input-regex 'eventlogging_(.+)/hourly/(\d+)/(\d+)/(\d+)/(\d+)' \
--input-capture 'table,year,month,day,hour' \
--output-base-path /wmf/data/event --database event \
--transform-functions, \

See data that might have failed

kafkacat   -b kafka1012.eqiad.wmnet:9092 -t codfw.mediawiki.revision-score