You are browsing a read-only backup copy of Wikitech. The primary site can be found at wikitech.wikimedia.org

Wikidata Query Service/Streaming Updater

From Wikitech-static
< Wikidata Query Service
Revision as of 20:58, 13 April 2021 by imported>Nintendofan885 (Nintendofan885 moved page Wikidata query service/Streaming Updater to Wikidata Query Service/Streaming Updater: match MediaWiki.org page)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Jump to navigation Jump to search

status: draft

Caution: All addresses here will change after restarting the cluster. If you happen to do that, please update this document.

Synopsis

WDQS Streaming Updater is an Apache Flink application which puprose is to create a stream of diffs of RDF triples, meant to be fed into Blazegraph. It uses available change streams to calculate the diffs and push it to Kafka topic.

How to run

Job Manager

Right now Streaming Updater is in testing phase and we use Apache Flink cluster installed on Analytics HADOOP Yarn cluster. To deploy the app, you need to be able to access cluster itself.

Currently, Apache Flink's job manager is run in yarn check yarn.wikimedia.org to find where it is running (look for the "Flink session cluster" run by analytics-search)

Apache Flink Dashboard

Apache Flink Dashboard is available and there are two ways of accessing it:

through HADOOP ApplicationMaster

HADOOP ApplicationMaster proxy allows for limited visibility for Apache Flink cluster. It's currently available via the yarn web UI

through SSH Tunnel

ApplicationMaster proxy has limited functionality - e.g. submitting jobs is forbidden. To be able to do that via Dashboard, you need to connect to the full fledged Dashboard via ssh tunnel.

To do that, tunnel from the instance with analytics network access to:

http://analytics1045.eqiad.wmnet:43543

Running the cluster

Cluster is run by the analytics-search user, from /home/dcausse/flink-1.11.1, with command:

sudo -u analytics-search sh -c 'HADOOP_CLASSPATH=`hadoop classpath` ./bin/yarn-session.sh -tm 8192 -s 4'

After we finish initial testing, we'll puppetize the service.

Deploying Streaming Updater

There are two ways of deploying updater:

via Dashboard

You need to use SSH tunnel to access Dashboard. Once you do, select "Submit New Job" from the menu. Here you can upload the jar and input its parameters, parallelizm and Savepoint Path.

via command line

Log in to the host with analytics network access and Flink installation downloaded and extracted. From there, execute:

./bin/flink run <jar-path> <job parameters>

Bootstrap job for a parallelism of 10:

sudo -u analytics-search sh -c 'export HADOOP_CLASSPATH=`hadoop classpath`; ./bin/flink run -p 12 -c org.wikidata.query.rdf.updater.UpdaterBootstrapJob ../streaming-updater-0.3.36-SNAPSHOT-jar-with-dependencies.jar --savepoint_dir hdfs://analytics-hadoop/wmf/discovery/streaming_updater/savepoint/20200601 --revisions_file hdfs://analytics-hadoop/wmf/data/discovery/wdqs/entity_revision_map/20200720/rev_map.csv --checkpoint_dir hdfs://analytics-hadoop/wmf/discovery/streaming_updater/checkpoint/init_run --parallelism 10'

Get the timestamp of the dump: hive

select object from discovery.wikidata_rdf
where date = '20200720' and subject = '<http://wikiba.se/ontology#Dump>' and predicate = '<http://schema.org/dateModified>'
order by object asc
limit 1;

Set consumer group offset before starting (replace 2020-05-30T20:26:47 with the timestemp returned by the spark query):

python3.7 set_offsets.py -t eqiad.mediawiki.revision-create -c test_wdqs_streaming_updater -b kafka-jumbo1001.eqiad.wmnet:9092 -s 2020-05-30T20:26:47

Parameters for the job use at the moment (no matter the deployment option) are:

sudo -u analytics-search sh -c 'export HADOOP_CLASSPATH=`hadoop classpath`; ./bin/flink run \
  -s hdfs://analytics-hadoop/wmf/discovery/streaming_updater/savepoint/20200601 \
  -p 10 \
  -c org.wikidata.query.rdf.updater.UpdaterJob /home/dcausse/streaming-updater-0.3.36-SNAPSHOT-jar-with-dependencies.jar \
  --hostname www.wikidata.org \
  --checkpoint_dir hdfs://analytics-hadoop/wmf/discovery/streaming_updater/checkpoints \
  --spurious_events_dir hdfs://analytics-hadoop/wmf/discovery/streaming_updater/spurious_events \
  --late_events_dir hdfs://analytics-hadoop/wmf/discovery/streaming_updater/late_events \
  --failed_ops_dir hdfs://analytics-hadoop/wmf/discovery/streaming_updater/failed_events \
  --brokers kafka-jumbo1001.eqiad.wmnet:9092 \
  --rev_create_topic eqiad.mediawiki.revision-create \
  --output_topic test_wdqs_streaming_updater \
  --output_topic_partition 0 \
  --consumer_group test_wdqs_streaming_updater' \

Things to watch out for

  • When uploading jar you get Internal server error, check job managers log (available from Job Manager menu). On the other hand, if job fails after starting it, search specific Task Managers logs.

They can be accessed via Jobs menu (either running or completed, after clicking a specific task and going to the Task Managers tab).

  • (Very) simple dashboard for streaming updater is available here: WDQS Pipeline.
    • First graph (Pipeline latencies):
      • event handling latency - time between triggered event (e.g. actual change on wikidata.org) to the time the event has been handled (diffs were produced)
      • event processing latency - time between event entering the pipeline (after mapping from kafka message) to the time the event has been handled (diffs were produced)
    • Second graph (Records per second (m1_rate)):
      • name is pretty self explanatory - what is actually measure are events reaching stage of the pipeline after main processing (which purpose is to basically measure the process itself)