Wikidata Query Service/Streaming Updater
Caution: All addresses here will change after restarting the cluster. If you happen to do that, please update this document.
WDQS Streaming Updater is an Apache Flink application which puprose is to create a stream of diffs of RDF triples, meant to be fed into Blazegraph. It uses available change streams to calculate the diffs and push it to Kafka topic.
How to run
Right now Streaming Updater is in testing phase and we use Apache Flink cluster installed on Analytics HADOOP Yarn cluster. To deploy the app, you need to be able to access cluster itself.
Currently, Apache Flink's job manager is run in yarn check yarn.wikimedia.org to find where it is running (look for the "Flink session cluster" run by analytics-search)
Apache Flink Dashboard is available and there are two ways of accessing it:
through HADOOP ApplicationMaster
HADOOP ApplicationMaster proxy allows for limited visibility for Apache Flink cluster. It's currently available via the yarn web UI
through SSH Tunnel
ApplicationMaster proxy has limited functionality - e.g. submitting jobs is forbidden. To be able to do that via Dashboard, you need to connect to the full fledged Dashboard via ssh tunnel.
To do that, tunnel from the instance with analytics network access to:
Running the cluster
Cluster is run by the analytics-search user, from /home/dcausse/flink-1.11.1, with command:
sudo -u analytics-search sh -c 'HADOOP_CLASSPATH=`hadoop classpath` ./bin/yarn-session.sh -tm 8192 -s 4'
After we finish initial testing, we'll puppetize the service.
Deploying Streaming Updater
There are two ways of deploying updater:
You need to use SSH tunnel to access Dashboard. Once you do, select "Submit New Job" from the menu. Here you can upload the jar and input its parameters, parallelizm and Savepoint Path.
via command line
Log in to the host with analytics network access and Flink installation downloaded and extracted. From there, execute:
./bin/flink run <jar-path> <job parameters>
Bootstrap job for a parallelism of 10:
sudo -u analytics-search sh -c 'export HADOOP_CLASSPATH=`hadoop classpath`; ./bin/flink run -p 12 -c org.wikidata.query.rdf.updater.UpdaterBootstrapJob ../streaming-updater-0.3.36-SNAPSHOT-jar-with-dependencies.jar --savepoint_dir hdfs://analytics-hadoop/wmf/discovery/streaming_updater/savepoint/20200601 --revisions_file hdfs://analytics-hadoop/wmf/data/discovery/wdqs/entity_revision_map/20200720/rev_map.csv --checkpoint_dir hdfs://analytics-hadoop/wmf/discovery/streaming_updater/checkpoint/init_run --parallelism 10'
Get the timestamp of the dump:
select object from discovery.wikidata_rdf where date = '20200720' and subject = '<http://wikiba.se/ontology#Dump>' and predicate = '<http://schema.org/dateModified>' order by object asc limit 1;
Set consumer group offset before starting (replace 2020-05-30T20:26:47 with the timestemp returned by the spark query):
python3.7 set_offsets.py -t eqiad.mediawiki.revision-create -c test_wdqs_streaming_updater -b kafka-jumbo1001.eqiad.wmnet:9092 -s 2020-05-30T20:26:47
Parameters for the job use at the moment (no matter the deployment option) are:
sudo -u analytics-search sh -c 'export HADOOP_CLASSPATH=`hadoop classpath`; ./bin/flink run \ -s hdfs://analytics-hadoop/wmf/discovery/streaming_updater/savepoint/20200601 \ -p 10 \ -c org.wikidata.query.rdf.updater.UpdaterJob /home/dcausse/streaming-updater-0.3.36-SNAPSHOT-jar-with-dependencies.jar \ --hostname www.wikidata.org \ --checkpoint_dir hdfs://analytics-hadoop/wmf/discovery/streaming_updater/checkpoints \ --spurious_events_dir hdfs://analytics-hadoop/wmf/discovery/streaming_updater/spurious_events \ --late_events_dir hdfs://analytics-hadoop/wmf/discovery/streaming_updater/late_events \ --failed_ops_dir hdfs://analytics-hadoop/wmf/discovery/streaming_updater/failed_events \ --brokers kafka-jumbo1001.eqiad.wmnet:9092 \ --rev_create_topic eqiad.mediawiki.revision-create \ --output_topic test_wdqs_streaming_updater \ --output_topic_partition 0 \ --consumer_group test_wdqs_streaming_updater' \
Things to watch out for
- When uploading jar you get Internal server error, check job managers log (available from Job Manager menu). On the other hand, if job fails after starting it, search specific Task Managers logs.
They can be accessed via Jobs menu (either running or completed, after clicking a specific task and going to the Task Managers tab).
- (Very) simple dashboard for streaming updater is available here: WDQS Pipeline.
- First graph (Pipeline latencies):
- event handling latency - time between triggered event (e.g. actual change on wikidata.org) to the time the event has been handled (diffs were produced)
- event processing latency - time between event entering the pipeline (after mapping from kafka message) to the time the event has been handled (diffs were produced)
- Second graph (Records per second (m1_rate)):
- name is pretty self explanatory - what is actually measure are events reaching stage of the pipeline after main processing (which purpose is to basically measure the process itself)
- First graph (Pipeline latencies):