Difference between revisions of "Wikidata Query Service/Streaming Updater"

From Wikitech-static
Jump to navigation Jump to search
imported>Nintendofan885
m (Nintendofan885 moved page Wikidata query service/Streaming Updater to Wikidata Query Service/Streaming Updater: match MediaWiki.org page)
 
imported>DCausse
Line 1: Line 1:
status: <mark> draft</mark>
{{Template:Draft}}


'''Caution: ''' All addresses here will change after restarting the cluster. If you happen to do that, please update this document.
The WDQS Streaming Updater is an [https://flink.apache.org/ Apache Flink] application whose purpose is to create a stream of diffs of RDF triples, meant to be fed into Blazegraph. It uses available change streams to calculate the diffs and push it to Kafka topic.
== Synopsis ==
= Design =
WDQS Streaming Updater is an Apache Flink application which puprose is to create a stream of diffs of RDF triples, meant to be fed into Blazegraph. It uses available change streams to calculate the diffs and push it to Kafka topic.
[[File:Wikidata Query Service Streaming Updater Design.svg|right|thumb|300px]]
== How to run ==
The application reads some of the topics populated by [[mw:Extension:EventBus]] and builds a diff of the RDF content as produced by [[mw:Wikibase/EntityData]] by comparing the last seen revision for this entity with the new revision seen from the ''mediawiki.revision-create'' topic. It is meant to integrate as a ''Stream processor'' part of the [[Event_Platform#Platform_Architecture_Diagram|Modern Event Platform]].
=== Job Manager ===
Right now Streaming Updater is in testing phase and we use Apache Flink cluster installed on Analytics HADOOP Yarn cluster. To deploy the app, you need to be able to access cluster itself.  


Currently, Apache Flink's job manager is run in yarn check yarn.wikimedia.org to find where it is running (look for the "Flink session cluster" run by analytics-search)
It relies on flink to provide the following functionalities:
* event time semantic to re-order the events out of multiple kafka topics
* state management consistent with the output of the stream
* scalability


=== Apache Flink Dashboard ===
The flink application (code name ''streaming-updater-producer'') is responsible for producing its data to a kafka topic, a client (named ''streaming-updater-consumer'') running on the same machines as the triple store (known as ''wdqs'' hosts) is responsible for reading this topic and performing updates.
Apache Flink Dashboard is available and there are two ways of accessing it:
==== through HADOOP ApplicationMaster ====
HADOOP ApplicationMaster proxy allows for limited visibility for Apache Flink cluster. It's currently available via the [https://yarn.wikimedia.org/ yarn web UI]


==== through SSH Tunnel ====
The dependencies of the flink application are:
ApplicationMaster proxy has limited functionality - e.g. submitting jobs is forbidden. To be able to do that via Dashboard, you need to connect to the full fledged Dashboard via ssh tunnel.  
* The mediawiki application servers for [[mw:Wikibase/EntityData]]
To do that, tunnel from the instance with analytics network access to: <pre>http://analytics1045.eqiad.wmnet:43543</pre>
* [[Kafka]] (main) for consuming Mediawiki changes and for producing its output
* Swift (thanos) for the object storage but the aim is to use future [[phab:T279621|MOS]]
* K8S services cluster to run flink as a [[Wikidata_Query_Service/Flink_On_Kubernetes|session cluster]]
* [https://schema.wikimedia.org schema.wikimedia.org] for verifying the validity of the event it emits against their [[Event_Platform/Schemas]]
* [https://meta.wikimedia.org meta.wikimedia.org] for fetching the [https://meta.wikimedia.org/w/api.php?action=help&modules=streamconfigs stream configurations]


=== Running the cluster ===
== Deployment strategy ==
Cluster is run by the ''analytics-search'' user, from ''/home/dcausse/flink-1.11.1'', with command: <pre>sudo -u analytics-search sh -c 'HADOOP_CLASSPATH=`hadoop classpath` ./bin/yarn-session.sh -tm 8192 -s 4'</pre>


After we finish initial testing, we'll puppetize the service.
The flink application is active/active and runs in both '''eqiad''' and '''codfw''' through the [[Kubernetes]] cluster hosting services. The WDQS machines in '''eqiad''' will consume the output of flink application running in '''eqiad'''.


=== Deploying Streaming Updater ===
The benefit of this approach are:
There are two ways of deploying updater:


==== via Dashboard ====
* simple to put in place in out setup: no need to have a failover strategy bringing up flink in the spare DC and resuming operations from where the failed one left (requires offsets replication between two kafka cluster)
You need to use SSH tunnel to access Dashboard. Once you do, select "Submit New Job" from the menu. Here you can upload the jar and input its parameters, parallelizm and Savepoint Path.
* Symmetry of the k8s deployed services
==== via command line ====
Log in to the host with analytics network access and Flink installation downloaded and extracted. From there, execute:
<pre>./bin/flink run <jar-path> <job parameters></pre>


Bootstrap job for a parallelism of 10:
Drawbacks:
<pre>
* No guarantee that the output of both flink pipelines will be the same
sudo -u analytics-search sh -c 'export HADOOP_CLASSPATH=`hadoop classpath`; ./bin/flink run -p 12 -c org.wikidata.query.rdf.updater.UpdaterBootstrapJob ../streaming-updater-0.3.36-SNAPSHOT-jar-with-dependencies.jar --savepoint_dir hdfs://analytics-hadoop/wmf/discovery/streaming_updater/savepoint/20200601 --revisions_file hdfs://analytics-hadoop/wmf/data/discovery/wdqs/entity_revision_map/20200720/rev_map.csv --checkpoint_dir hdfs://analytics-hadoop/wmf/discovery/streaming_updater/checkpoint/init_run --parallelism 10'
* Double compute
</pre>


Get the timestamp of the dump:
See [https://docs.google.com/presentation/d/10H9m7iF1W5MYdwWh6itKfJJpdDX-r6isZWngNRr6JS4/edit?usp=sharing this presentation] for a quick overview of two strategies evaluated.
<code>hive</code>


<pre>select object from discovery.wikidata_rdf
= Operations =
where date = '20200720' and subject = '<http://wikiba.se/ontology#Dump>' and predicate = '<http://schema.org/dateModified>'
== Kubernetes setup ==
order by object asc
limit 1;
</pre>


Set consumer group offset before starting (replace 2020-05-30T20:26:47 with the timestemp returned by the spark query):
Kubernetes only hosts the flink session cluster responsible for running the flink-session-clusterflink job. K8s does only manage a flink session cluster using the [https://github.com/wikimedia/operations-deployment-charts/tree/master/charts/flink-session-cluster flink-session-cluster] chart with the [https://github.com/wikimedia/operations-deployment-charts/tree/master/helmfile.d/services/rdf-streaming-updater rdf-streaming-updater] values.


<code>python3.7 set_offsets.py -t eqiad.mediawiki.revision-create -c test_wdqs_streaming_updater -b kafka-jumbo1001.eqiad.wmnet:9092 -s 2020-05-30T20:26:47</code>
Deploying the chart to staging (on ''deployment.eqiad.wmnet''):
<syntaxhighlight lang="shell">
$ cd /srv/deployment-charts/helmfile.d/services/rdf-streaming-updater/
$ helmfile -e staging -i apply
</syntaxhighlight>


Parameters for the job use at the moment (no matter the deployment option) are:
Looking at the jobmanager and then the taskmanager logs in staging
<syntaxhighlight lang="shell">
$ kube_env rdf-streaming-updater staging
$ kubectl logs -l component=jobmanager -c flink-session-cluster-main -f
$ kubectl logs -l component=taskmanager -c flink-session-cluster-main-taskmanager -f
</syntaxhighlight>


<pre>
The flink jobmanager UI and REST endpoint is exposed via the '''4007''' port.
sudo -u analytics-search sh -c 'export HADOOP_CLASSPATH=`hadoop classpath`; ./bin/flink run \
  -s hdfs://analytics-hadoop/wmf/discovery/streaming_updater/savepoint/20200601 \
  -p 10 \
  -c org.wikidata.query.rdf.updater.UpdaterJob /home/dcausse/streaming-updater-0.3.36-SNAPSHOT-jar-with-dependencies.jar \
  --hostname www.wikidata.org \
  --checkpoint_dir hdfs://analytics-hadoop/wmf/discovery/streaming_updater/checkpoints \
  --spurious_events_dir hdfs://analytics-hadoop/wmf/discovery/streaming_updater/spurious_events \
  --late_events_dir hdfs://analytics-hadoop/wmf/discovery/streaming_updater/late_events \
  --failed_ops_dir hdfs://analytics-hadoop/wmf/discovery/streaming_updater/failed_events \
  --brokers kafka-jumbo1001.eqiad.wmnet:9092 \
  --rev_create_topic eqiad.mediawiki.revision-create \
  --output_topic test_wdqs_streaming_updater \
  --output_topic_partition 0 \
  --consumer_group test_wdqs_streaming_updater' \
</pre>


=== Things to watch out for ===
This endpoint has no lvs endpoint setup and is only used for internal management (main application deploys):
* When uploading jar you get Internal server error, check job managers log (available from Job Manager menu). On the other hand, if job fails after starting it, search specific Task Managers logs.  
* staging: <code>https://staging.svc.eqiad.wmnet:4007/jobs</code>
They can be accessed via Jobs menu (either running or completed, after clicking a specific task and going to the Task Managers tab).
* eqiad: <code>https://kubernetes1003.eqiad.wmnet:4007</code> (beware to disable TLS host verification here, e.g. using <code>curl -k</code>)
* (Very) simple dashboard for streaming updater is available here: [https://grafana.wikimedia.org/d/_kZ1VGRGk/wdqs-pipeline?orgId=1&refresh=1m WDQS Pipeline].  
* codfw: <code>https://kubernetes2003.codfw.wmnet:4007</code> (ditto regarding TLS host verification)
** First graph (Pipeline latencies):
 
*** event handling latency - time between triggered event (e.g. actual change on wikidata.org) to the time the event has been handled (diffs were produced)
Note that the k8s cluster cannot yet be accessed via IPv6 thus IPv4 must be forced on your HTTP client (e.g. <code>curl -4</code>)
*** event processing latency - time between event entering the pipeline (after mapping from kafka message) to the time the event has been handled (diffs were produced)
 
** Second graph (Records per second (m1_rate)):
=== Logs ===
*** name is pretty self explanatory - what is actually measure are events reaching stage of the pipeline after main processing (which purpose is to basically measure the process itself)
Flink logs are collected in logstash and can be filtered using: <code><nowiki>kubernetes.master_url:"https://kubemaster.svc.codfw.wmnet" AND kubernetes.namespace_name:"rdf-streaming-updater"</nowiki></code>. Append <code>kubernetes.labels.component:jobmanager</code> to filter jobmanager's logs or ''taskmanager'' for the taskmanagers' logs.
 
=== Managing the streaming-updater-producer ===
TODO
 
== Monitoring ==
 
The flink session cluster activity can be monitored using the [https://grafana-rw.wikimedia.org/d/gCFgfpG7k/flink-session-cluster?orgId=1 flink-session-cluster] and the [https://grafana-rw.wikimedia.org/d/fdU5Zx-Mk/wdqs-streaming-updater wdqs-streaming-updater] graphana dashboards.
 
Important metrics:
* flink job uptime in the ''flink-session-cluster'' dashboard (''flink_jobmanager_job_uptime''), indicates for how long the job has been running
** a constant low uptime (below 10minutes) might indicate that the job is constantly restarting. Lag may start to rise.
* ''Triples Divergences'' on the ''wdqs-streaming-updater'' dashboard, gives an indication of the divergences detected when applying the diffs, sudden surge might indicate the following problems:
** on a single machine, the blazegraph journal was corrupted or copied from another source or a serious bug in the '''streaming-updater-consumer'''.
** on all the machines in one or two DC, might indicate a problem in the '''streaming-updater-producer'''.
* ''Consumer Poll vs Store time'' on the ''wdqs-streaming-updater'' gives an indication of the saturation of the writes of the '''streaming-updater-consumer'''. Poll time is how much time is spent polling/waiting on kafka, store time is how much is spent on writing to blazegraph.
 
== Troubleshooting ==
TODO

Revision as of 16:30, 10 September 2021

The WDQS Streaming Updater is an Apache Flink application whose purpose is to create a stream of diffs of RDF triples, meant to be fed into Blazegraph. It uses available change streams to calculate the diffs and push it to Kafka topic.

Design

Wikidata Query Service Streaming Updater Design.svg

The application reads some of the topics populated by mw:Extension:EventBus and builds a diff of the RDF content as produced by mw:Wikibase/EntityData by comparing the last seen revision for this entity with the new revision seen from the mediawiki.revision-create topic. It is meant to integrate as a Stream processor part of the Modern Event Platform.

It relies on flink to provide the following functionalities:

  • event time semantic to re-order the events out of multiple kafka topics
  • state management consistent with the output of the stream
  • scalability

The flink application (code name streaming-updater-producer) is responsible for producing its data to a kafka topic, a client (named streaming-updater-consumer) running on the same machines as the triple store (known as wdqs hosts) is responsible for reading this topic and performing updates.

The dependencies of the flink application are:

Deployment strategy

The flink application is active/active and runs in both eqiad and codfw through the Kubernetes cluster hosting services. The WDQS machines in eqiad will consume the output of flink application running in eqiad.

The benefit of this approach are:

  • simple to put in place in out setup: no need to have a failover strategy bringing up flink in the spare DC and resuming operations from where the failed one left (requires offsets replication between two kafka cluster)
  • Symmetry of the k8s deployed services

Drawbacks:

  • No guarantee that the output of both flink pipelines will be the same
  • Double compute

See this presentation for a quick overview of two strategies evaluated.

Operations

Kubernetes setup

Kubernetes only hosts the flink session cluster responsible for running the flink-session-clusterflink job. K8s does only manage a flink session cluster using the flink-session-cluster chart with the rdf-streaming-updater values.

Deploying the chart to staging (on deployment.eqiad.wmnet):

$ cd /srv/deployment-charts/helmfile.d/services/rdf-streaming-updater/
$ helmfile -e staging -i apply

Looking at the jobmanager and then the taskmanager logs in staging

$ kube_env rdf-streaming-updater staging
$ kubectl logs -l component=jobmanager -c flink-session-cluster-main -f
$ kubectl logs -l component=taskmanager -c flink-session-cluster-main-taskmanager -f

The flink jobmanager UI and REST endpoint is exposed via the 4007 port.

This endpoint has no lvs endpoint setup and is only used for internal management (main application deploys):

Note that the k8s cluster cannot yet be accessed via IPv6 thus IPv4 must be forced on your HTTP client (e.g. curl -4)

Logs

Flink logs are collected in logstash and can be filtered using: kubernetes.master_url:"https://kubemaster.svc.codfw.wmnet" AND kubernetes.namespace_name:"rdf-streaming-updater". Append kubernetes.labels.component:jobmanager to filter jobmanager's logs or taskmanager for the taskmanagers' logs.

Managing the streaming-updater-producer

TODO

Monitoring

The flink session cluster activity can be monitored using the flink-session-cluster and the wdqs-streaming-updater graphana dashboards.

Important metrics:

  • flink job uptime in the flink-session-cluster dashboard (flink_jobmanager_job_uptime), indicates for how long the job has been running
    • a constant low uptime (below 10minutes) might indicate that the job is constantly restarting. Lag may start to rise.
  • Triples Divergences on the wdqs-streaming-updater dashboard, gives an indication of the divergences detected when applying the diffs, sudden surge might indicate the following problems:
    • on a single machine, the blazegraph journal was corrupted or copied from another source or a serious bug in the streaming-updater-consumer.
    • on all the machines in one or two DC, might indicate a problem in the streaming-updater-producer.
  • Consumer Poll vs Store time on the wdqs-streaming-updater gives an indication of the saturation of the writes of the streaming-updater-consumer. Poll time is how much time is spent polling/waiting on kafka, store time is how much is spent on writing to blazegraph.

Troubleshooting

TODO