You are browsing a read-only backup copy of Wikitech. The live site can be found at wikitech.wikimedia.org

Difference between revisions of "Wikidata Query Service/Streaming Updater"

From Wikitech-static
Jump to navigation Jump to search
imported>Nintendofan885
m (Nintendofan885 moved page Wikidata query service/Streaming Updater to Wikidata Query Service/Streaming Updater: match MediaWiki.org page)
 
imported>DCausse
 
(4 intermediate revisions by the same user not shown)
Line 1: Line 1:
status: <mark> draft</mark>
{{Template:Draft}}


'''Caution: ''' All addresses here will change after restarting the cluster. If you happen to do that, please update this document.
The WDQS Streaming Updater is an [https://flink.apache.org/ Apache Flink] application whose purpose is to create a stream of diffs of RDF triples, meant to be fed into Blazegraph. It uses the available mediawiki change streams to calculate the diffs and push it to a Kafka topic.
== Synopsis ==
WDQS Streaming Updater is an Apache Flink application which puprose is to create a stream of diffs of RDF triples, meant to be fed into Blazegraph. It uses available change streams to calculate the diffs and push it to Kafka topic.
== How to run ==
=== Job Manager ===
Right now Streaming Updater is in testing phase and we use Apache Flink cluster installed on Analytics HADOOP Yarn cluster. To deploy the app, you need to be able to access cluster itself.  


Currently, Apache Flink's job manager is run in yarn check yarn.wikimedia.org to find where it is running (look for the "Flink session cluster" run by analytics-search)
= Design =


=== Apache Flink Dashboard ===
[[File:Wikidata Query Service Streaming Updater Design.svg|right|thumb|300px]]
Apache Flink Dashboard is available and there are two ways of accessing it:
The application reads some of the topics populated by [[mw:Extension:EventBus]] and builds a diff of the RDF content as produced by [[mw:Wikibase/EntityData]] by comparing the last seen revision for this entity with the new revision seen from the ''mediawiki.revision-create'' topic. It is meant to integrate as a ''Stream processor'' part of the [[Event_Platform#Platform_Architecture_Diagram|Modern Event Platform]].
==== through HADOOP ApplicationMaster ====
HADOOP ApplicationMaster proxy allows for limited visibility for Apache Flink cluster. It's currently available via the [https://yarn.wikimedia.org/ yarn web UI]


==== through SSH Tunnel ====
It relies on flink to provide:
ApplicationMaster proxy has limited functionality - e.g. submitting jobs is forbidden. To be able to do that via Dashboard, you need to connect to the full fledged Dashboard via ssh tunnel.
* event time semantic to re-order the events out of multiple kafka topics
To do that, tunnel from the instance with analytics network access to: <pre>http://analytics1045.eqiad.wmnet:43543</pre>
* state management consistent with the output of the stream
* scalability


=== Running the cluster ===
The flink application (code name ''streaming-updater-producer'') is responsible for producing its data to a kafka topic, a client (named ''streaming-updater-consumer'') running on the same machines as the triple store (known as ''wdqs'' hosts) is responsible for reading this topic and performing updates.
Cluster is run by the ''analytics-search'' user, from ''/home/dcausse/flink-1.11.1'', with command: <pre>sudo -u analytics-search sh -c 'HADOOP_CLASSPATH=`hadoop classpath` ./bin/yarn-session.sh -tm 8192 -s 4'</pre>


After we finish initial testing, we'll puppetize the service.
== Dependencies ==


=== Deploying Streaming Updater ===
The dependencies of the flink application are:
There are two ways of deploying updater:
* The mediawiki application servers for [[mw:Wikibase/EntityData]]
* [[Kafka]] (main) for consuming Mediawiki changes and for producing its output
* Swift (thanos) for the object storage but the aim is to use future [[phab:T279621|MOS]]
* K8S services cluster to run flink as a [[Wikidata_Query_Service/Flink_On_Kubernetes|session cluster]]
* [https://schema.wikimedia.org schema.wikimedia.org] for verifying the validity of the event it emits against their [[Event_Platform/Schemas]]
* [https://meta.wikimedia.org meta.wikimedia.org] for fetching the [https://meta.wikimedia.org/w/api.php?action=help&modules=streamconfigs stream configurations]


==== via Dashboard ====
== Deployment strategy ==
You need to use SSH tunnel to access Dashboard. Once you do, select "Submit New Job" from the menu. Here you can upload the jar and input its parameters, parallelizm and Savepoint Path.
==== via command line ====
Log in to the host with analytics network access and Flink installation downloaded and extracted. From there, execute:
<pre>./bin/flink run <jar-path> <job parameters></pre>


Bootstrap job for a parallelism of 10:
The flink application is active/active and runs in both '''eqiad''' and '''codfw''' through the [[Kubernetes]] cluster hosting services. The WDQS machines in '''eqiad''' will consume the output of flink application running in '''eqiad''' and similarly for codfw. In other words if the flink application stops in eqiad all wdqs machines in eqiad will stop being updated.
<pre>
sudo -u analytics-search sh -c 'export HADOOP_CLASSPATH=`hadoop classpath`; ./bin/flink run -p 12 -c org.wikidata.query.rdf.updater.UpdaterBootstrapJob ../streaming-updater-0.3.36-SNAPSHOT-jar-with-dependencies.jar --savepoint_dir hdfs://analytics-hadoop/wmf/discovery/streaming_updater/savepoint/20200601 --revisions_file hdfs://analytics-hadoop/wmf/data/discovery/wdqs/entity_revision_map/20200720/rev_map.csv --checkpoint_dir hdfs://analytics-hadoop/wmf/discovery/streaming_updater/checkpoint/init_run --parallelism 10'
</pre>


Get the timestamp of the dump:
The benefit of this approach are:
<code>hive</code>
* simple to put in place in our setup: no need to have a fail-over strategy
* Symmetry of the k8s deployed services


<pre>select object from discovery.wikidata_rdf
Drawbacks:
where date = '20200720' and subject = '<http://wikiba.se/ontology#Dump>' and predicate = '<http://schema.org/dateModified>'
* No guarantee that the output of both flink pipelines will be the same
order by object asc
* Double compute
limit 1;
</pre>


Set consumer group offset before starting (replace 2020-05-30T20:26:47 with the timestemp returned by the spark query):
See [https://docs.google.com/presentation/d/10H9m7iF1W5MYdwWh6itKfJJpdDX-r6isZWngNRr6JS4/edit?usp=sharing this presentation] for a quick overview of the two strategies evaluated.


<code>python3.7 set_offsets.py -t eqiad.mediawiki.revision-create -c test_wdqs_streaming_updater -b kafka-jumbo1001.eqiad.wmnet:9092 -s 2020-05-30T20:26:47</code>
= Operations =


Parameters for the job use at the moment (no matter the deployment option) are:
== Kubernetes setup ==


<pre>
Kubernetes only hosts the flink session cluster responsible for running the flink-session-clusterflink job. K8s does only manage a flink session cluster using the [https://github.com/wikimedia/operations-deployment-charts/tree/master/charts/flink-session-cluster flink-session-cluster] chart with the [https://github.com/wikimedia/operations-deployment-charts/tree/master/helmfile.d/services/rdf-streaming-updater rdf-streaming-updater] values.
sudo -u analytics-search sh -c 'export HADOOP_CLASSPATH=`hadoop classpath`; ./bin/flink run \
  -s hdfs://analytics-hadoop/wmf/discovery/streaming_updater/savepoint/20200601 \
  -p 10 \
  -c org.wikidata.query.rdf.updater.UpdaterJob /home/dcausse/streaming-updater-0.3.36-SNAPSHOT-jar-with-dependencies.jar \
  --hostname www.wikidata.org \
  --checkpoint_dir hdfs://analytics-hadoop/wmf/discovery/streaming_updater/checkpoints \
  --spurious_events_dir hdfs://analytics-hadoop/wmf/discovery/streaming_updater/spurious_events \
  --late_events_dir hdfs://analytics-hadoop/wmf/discovery/streaming_updater/late_events \
  --failed_ops_dir hdfs://analytics-hadoop/wmf/discovery/streaming_updater/failed_events \
  --brokers kafka-jumbo1001.eqiad.wmnet:9092 \
  --rev_create_topic eqiad.mediawiki.revision-create \
  --output_topic test_wdqs_streaming_updater \
  --output_topic_partition 0 \
  --consumer_group test_wdqs_streaming_updater' \
</pre>


=== Things to watch out for ===
Deploying the chart to staging (on ''deployment.eqiad.wmnet''):
* When uploading jar you get Internal server error, check job managers log (available from Job Manager menu). On the other hand, if job fails after starting it, search specific Task Managers logs.  
<syntaxhighlight lang="shell">
They can be accessed via Jobs menu (either running or completed, after clicking a specific task and going to the Task Managers tab).
$ cd /srv/deployment-charts/helmfile.d/services/rdf-streaming-updater/
* (Very) simple dashboard for streaming updater is available here: [https://grafana.wikimedia.org/d/_kZ1VGRGk/wdqs-pipeline?orgId=1&refresh=1m WDQS Pipeline].  
$ helmfile -e staging -i apply
** First graph (Pipeline latencies):
</syntaxhighlight>
*** event handling latency - time between triggered event (e.g. actual change on wikidata.org) to the time the event has been handled (diffs were produced)
 
*** event processing latency - time between event entering the pipeline (after mapping from kafka message) to the time the event has been handled (diffs were produced)
Looking at the jobmanager and then the taskmanager logs in staging
** Second graph (Records per second (m1_rate)):
<syntaxhighlight lang="shell">
*** name is pretty self explanatory - what is actually measure are events reaching stage of the pipeline after main processing (which purpose is to basically measure the process itself)
$ kube_env rdf-streaming-updater staging
$ kubectl logs -l component=jobmanager -c flink-session-cluster-main -f
$ kubectl logs -l component=taskmanager -c flink-session-cluster-main-taskmanager -f
</syntaxhighlight>
 
The flink jobmanager UI and REST endpoint is exposed via the '''4007''' port.
 
This endpoint has no lvs endpoint setup and is only used for internal management (main application deploys):
* staging: <code>https://staging.svc.eqiad.wmnet:4007/jobs</code>
* eqiad: <code>https://kubernetes1003.eqiad.wmnet:4007</code> (beware to disable TLS host verification here, e.g. using <code>curl -k</code>)
* codfw: <code>https://kubernetes2003.codfw.wmnet:4007</code> (ditto regarding TLS host verification)
 
Note that the k8s cluster cannot yet be accessed via IPv6 thus IPv4 must be forced on your HTTP client (e.g. <code>curl -4</code>)
 
=== Logs ===
Flink logs are collected in logstash and can be filtered using: <code><nowiki>kubernetes.master_url:"https://kubemaster.svc.codfw.wmnet" AND kubernetes.namespace_name:"rdf-streaming-updater"</nowiki></code>. Append <code>kubernetes.labels.component:jobmanager</code> to filter jobmanager's logs or ''taskmanager'' for the taskmanagers' logs.
 
If for some reasons the logs are not available in logstash they can still be inspected from the deployment server, e.g. to inspect the jobmanager and the taskmanager logs in staging:
 
<syntaxhighlight lang="shell">
$ kube_env rdf-streaming-updater staging
$ kubectl logs -l component=jobmanager -c flink-session-cluster-main -f
$ kubectl logs -l component=taskmanager -c flink-session-cluster-main-taskmanager -f
</syntaxhighlight>
 
Add the <code>--previous</code> option to look at the logs of the previous run of the container if you want to debug why it crashed.
 
=== Managing the streaming-updater-producer ===
The flink job is managed using the <code>flink/flink-job.py</code> python script available in <code>/srv/deployment/wdqs/wdqs/</code> on the [[Deployment_server#Service|deployment server]].
 
The script supports 3 environments:
{| class="wikitable"
|+ Important configuration per k8s environment
|-
!  !! staging (eqiad) !! eqiad !! codfw
|-
| '''swift container''' || rdf-streaming-updater-staging || rdf-streaming-updater-eqiad || rdf-streaming-updater-codfw
|-
| '''kafka cluster''' || kafka-main@eqiad || kafka-main@eqiad || kafka-main@codfw
|-
| '''consumer group''' || wdqs_streaming_updater_test || wdqs_streaming_updater || wdqs_streaming_updater
|-
| '''job name''' || WDQS Streaming Updater || WDQS Streaming Updater || WDQS Streaming Updater
|}
 
{{Note|All the <code>flink-job.py</code> commands relies on the job name to be unique}}
 
Commands accepting a path to the swift container accepts either an absolute path in the form: <code>swift://rdf-streaming-updater-staging.thanos-swift/wikidata/savepoints/bootstrap-20210918</code> or a relative path <code>savepoints/bootstrap-20210918</code>. You should prefer the relative path approach to avoid mixing checkpoint/savepoint data between the swift containers.
 
==== Start the job ====
 
Start the job with the version ''0.3.77'' in ''eqiad'' from the savepoint ''savepoint/bootstrap-20210918''.
 
<syntaxhighlight lang="shell">
python3 flink/flink-job.py \
      --env eqiad \
      --job-name "WDQS Streaming Updater" \
      deploy \
      --jar lib/streaming-updater-producer-0.3.77-jar-with-dependencies.jar \
      --options-file flink/rdf-streaming-updater.yaml \
      --initial-state savepoints/bootstrap-20210918
</syntaxhighlight>
 
==== Deploy version upgrade ====
The code of the ''streaming-updater-producer'' has changed and needs to be updated. After releasing the ''wikidata/query/rdf'' repo and updating the ''wikidata/query/deploy'' repo.
 
<syntaxhighlight lang="shell">
python3 flink/flink-job.py \
      --env eqiad \
      --job-name "WDQS Streaming Updater" \
      redeploy \
      --jar lib/streaming-updater-producer-0.3.77-jar-with-dependencies.jar \
      --options-file flink/rdf-streaming-updater.yaml \
      --savepoint savepoints
</syntaxhighlight>
 
The option ''--savepoints'' here just indicate when flink should store the savepoint to start the new code from. We do not rely on ''checkpoints'' for version upgrades.
 
==== Take a savepoint ====
Taking a savepoint might be needed for several reasons (inspect the state, backup) and can be done running:
 
<syntaxhighlight lang="shell">
python3 flink/flink-job.py \
      --env eqiad \
      --job-name "WDQS Streaming Updater" \
      save \
      --savepoint savepoints
</syntaxhighlight>
 
The savepoint path is printed in the command output.
 
==== Stop the job ====
Stopping the job is generally not needed except when the flink image must be upgraded or when the helm chart requires incompatible changes to be deployed.
 
<syntaxhighlight lang="shell">
python3 flink/flink-job.py \
      --env eqiad \
      --job-name "WDQS Streaming Updater" \
      stop \
      --savepoint savepoints
</syntaxhighlight>
 
The savepoint path is printed in the command output.
 
=== Flink/helm chart upgrade ===
 
# stop the job and note the savepoint
# apply the chart and wait for flink to be up and running
# start the job from the savepoint taken
 
=== Recover from a checkpoint ===
 
If job has failed and was not recovered automatically by flink H/A capabalities you may have to restart it manually from the last valid checkpoint.
To identify the checkpoint to use first identify the date of when the job stopped to work properly (e.g. the [https://grafana-rw.wikimedia.org/d/fdU5Zx-Mk/wdqs-streaming-updater?orgId=1|Producer (Flink) Kafka Lag graph] should stop being updated).
 
Inspect the logs for lines with the logstash query: <code>kubernetes.master_url:"https://kubemaster.svc.codfw.wmnet" AND kubernetes.namespace_name:"rdf-streaming-updater" AND message:"Completed checkpoint" </code>.
 
Lines with <code>Completed checkpoint 55466 for job bd5a9619a6dd893243db926a456ef42c (146097536 bytes in 6194 ms)</code> should appear with the last one around the time of the failure.
 
<code>bd5a9619a6dd893243db926a456ef42c</code> is the job id (beware to verify that it's the right one if this flink session cluster can run multiple jobs) and <code>55466</code> is the checkpoint number.
 
Verify in the corresponding swift container that this checkpoint is valid using the <code>swift</code> command line tool available on ''stat1004'':
 
<syntaxhighlight lang="shell">
swift -A https://thanos-swift.discovery.wmnet/auth/v1.0 -U wdqs:flink -K PASSWORD list rdf-streaming-updater-codfw -l -p wikidata/checkpoints/bd5a9619a6dd893243db926a456ef42c/chk-55466/
504991 2021-09-14 12:54:30 application/octet-stream wikidata/checkpoints/bd5a9619a6dd893243db926a456ef42c/chk-55466/_metadata
</syntaxhighlight>
 
If a <code>_metadata</code> is present then it's the correct checkpoint. If the checkpoint number cannot be found via the logs be sure to double check that you are inspecting the proper swift container (''rdf-streaming-updater-codfw'' vs ''rdf-streaming-updater-eqiad''). You can also list all the checkpoints under <code>wikidata/checkpoints/$JOB_ID</code> and verify that the one having a <code>_metadata</code> corresponds to the date the pipeline stopped.
 
Once the checkpoint is identified (e.g. <code>checkpoints/bd5a9619a6dd893243db926a456ef42c/chk-55466</code>) it can be used as the <code>--initial-state</code> of the [[#Start the job]] procedure.
 
== Monitoring ==
 
The flink session cluster activity can be monitored using the [https://grafana-rw.wikimedia.org/d/gCFgfpG7k/flink-session-cluster?orgId=1 flink-session-cluster] and the [https://grafana-rw.wikimedia.org/d/fdU5Zx-Mk/wdqs-streaming-updater wdqs-streaming-updater] graphana dashboards.
 
Important metrics:
* flink job uptime in the ''flink-session-cluster'' dashboard (''flink_jobmanager_job_uptime''), indicates for how long the job has been running
** a constant low uptime (below 10minutes) might indicate that the job is constantly restarting. Lag may start to rise.
* ''Triples Divergences'' on the ''wdqs-streaming-updater'' dashboard, gives an indication of the divergences detected when applying the diffs, sudden surge might indicate the following problems:
** on a single machine, the blazegraph journal was corrupted or copied from another source or a serious bug in the '''streaming-updater-consumer'''.
** on all the machines in one or two DC, might indicate a problem in the '''streaming-updater-producer'''.
* ''Consumer Poll vs Store time'' on the ''wdqs-streaming-updater'' gives an indication of the saturation of the writes of the '''streaming-updater-consumer'''. Poll time is how much time is spent polling/waiting on kafka, store time is how much is spent on writing to blazegraph.
 
== Runbooks ==
 
=== The job is not starting ===
''WdqsStreamingUpdaterFlinkJobUnstable''
 
The job uptime remains under 5 minutes probably means that the job is constantly restarting.
The cause of the failed restarts must be identified by inspecting the [[#Logs]], it might be that some of the [[#Dependencies]] are having issues.
Containers constantly being killed may lead to this problem as well (use <code>kubectl get pod -o yaml</code> to inspect <code>containerStatuses</code>).
 
=== The job is not running ===
''WdqsStreamingUpdaterFlinkJobNotRunning''
 
The job is not running, there are several reasons for this:
* someone is doing a maintenance operation and the alert was not down-timed
* flink is not running or crashing
* the job had crashed without being restarted
 
For the last two points try to identify the cause of the crash looking at the [[#Logs]], it could be that the k8s cluster does have enough resource to instantiate the required pods.
Once the cause is known the flink session cluster must be brought up if it was not running. The job should recover itself after the flink session cluster starts, if it is not the case then you might to [[#Recover from a checkpoint|recover from a checkpoint]].
 
=== The job processing latency is high ===
''WdqsStreamingUpdaterFlinkProcessingLatencyIsHigh''
 
The job processing time is higher than usual, it might be due to increased latencies of one or several of the job dependencies:
* the thanos swift cluster ([https://grafana.wikimedia.org/d/gCFgfpG7k/flink-session-cluster?orgId=1|check checkpoint times])
* the kafka cluster
* [https://grafana.wikimedia.org/d/000000550/mediawiki-application-servers?orgId=1 mediawiki application servers]
 
Cause should be identified and the impact on the pipeline monitored:
* backlog for the consumer group should not grow
* checkpoint times should not increase
 
== First run (bootstrap) ==
{{Template:Note|This section implies a lot of manual steps that may be dangerous if the flink job is currently running. Please be sure to coordinate with the Search Platform team if you believe you need run this.}}
 
The flink application must be given an initial-state, this initial state can be constructed from the RDF dumps using a flink job.
 
From <code>stat1004.eqiad.wmnet</code> install flink (same version as the one running in k8s) under your home directory.
 
Configure kerberos for ''analytics-search'' (in </code>conf/flink-conf.yaml</code>):
 
<syntaxhighlight lang=yaml>
security.kerberos.login.use-ticket-cache: false
security.kerberos.login.keytab: /etc/security/keytabs/analytics-search/analytics-search.keytab
security.kerberos.login.principal: analytics-search/stat1004.eqiad.wmnet@WIKIMEDIA
</syntaxhighlight>
 
Start flink:
 
<syntaxhighlight lang="shell">
sudo -u analytics-search kerberos-run-command analytics-search sh -c 'HADOOP_CLASSPATH="`hadoop classpath`" ./bin/yarn-session.sh -tm 8g -jm 2600m -s 4 -nm "WDQS Streaming Updater"'
</syntaxhighlight>
 
Start the bootstrap job
<syntaxhighlight lang="shell">
FLINK_JOB=/home/dcausse/streaming-updater-producer-0.3.76-jar-with-dependencies.jar
DUMP_DATE=20210718
REV_FILE=hdfs://analytics-hadoop/wmf/data/discovery/wdqs/entity_revision_map/$DUMP_DATE/rev_map.csv
# Use rdf-streaming-updater-eqiad or rdf-streaming-updater-codfw to create the savepoint for the eqiad or codfw flink job
SAVEPOINT_DIR=swift://rdf-streaming-updater-staging.thanos-swift/wikidata/savepoints/init_20210718
 
sudo -u analytics-search kerberos-run-command analytics-search sh -c "export HADOOP_CLASSPATH=`hadoop classpath`; ./bin/flink run -p 12 -c org.wikidata.query.rdf.updater.UpdaterBootstrapJob $FLINK_JOB --job_name bootstrap --revisions_file $REV_FILE --savepoint_dir $SAVEPOINT_DIR"
</syntaxhighlight>
 
Position the kafka offsets for the flink consumers.
 
First obtain the timestamp of the oldest start date of the dump script using <code>hive</code>.
 
<syntaxhighlight lang="sql">
select object
    from discovery.wikibase_rdf
    where `date` = '20210718' and wiki='wikidata' and
          subject = '<http://wikiba.se/ontology#Dump>' and
          predicate = '<http://schema.org/dateModified>'
    order by object asc
    limit 1;
</syntaxhighlight>
 
Position the offsets according to that date (can be done from stat1004 as well):
 
<syntaxhighlight lang="shell">
# This is the most dangerous command of this procedure as it may break
# an existing flink job by messing up their kafka consumer offsets.
# Be sure to create and activate a conda env with kafka-python
# Start obtained from the hql query above
START_DATE=2021-08-23T19:41:00
# Use kafka-main1001.eqiad.wmnet:9092 for eqiad and staging
KAFKA_BROKER=kafka-main2001.codfw.wmnet:9092
# Must match the options consumer_group of the flink_job
# note: set_initial_offsets.py is available at https://gerrit.wikimedia.org/r/plugins/gitiles/wikidata/query/rdf/+/refs/heads/master/streaming-updater-producer/scripts/set_initial_offsets.py
CONSUMER_GROUP=wdqs_streaming_updater
 
for c in eqiad codfw; do
    for t in mediawiki.revision-create mediawiki.page-delete mediawiki.page-undelete mediawiki.page-suppress; do
        python set_initial_offsets.py  -t $c.$t -c $CONSUMER_GROUP -b $KAFKA_BROKER -s $START_DATE;
    done;
done
</syntaxhighlight>
 
Then start the flink job on k8s using the savepoint.

Latest revision as of 09:27, 1 October 2021

The WDQS Streaming Updater is an Apache Flink application whose purpose is to create a stream of diffs of RDF triples, meant to be fed into Blazegraph. It uses the available mediawiki change streams to calculate the diffs and push it to a Kafka topic.

Design

Wikidata Query Service Streaming Updater Design.svg

The application reads some of the topics populated by mw:Extension:EventBus and builds a diff of the RDF content as produced by mw:Wikibase/EntityData by comparing the last seen revision for this entity with the new revision seen from the mediawiki.revision-create topic. It is meant to integrate as a Stream processor part of the Modern Event Platform.

It relies on flink to provide:

  • event time semantic to re-order the events out of multiple kafka topics
  • state management consistent with the output of the stream
  • scalability

The flink application (code name streaming-updater-producer) is responsible for producing its data to a kafka topic, a client (named streaming-updater-consumer) running on the same machines as the triple store (known as wdqs hosts) is responsible for reading this topic and performing updates.

Dependencies

The dependencies of the flink application are:

Deployment strategy

The flink application is active/active and runs in both eqiad and codfw through the Kubernetes cluster hosting services. The WDQS machines in eqiad will consume the output of flink application running in eqiad and similarly for codfw. In other words if the flink application stops in eqiad all wdqs machines in eqiad will stop being updated.

The benefit of this approach are:

  • simple to put in place in our setup: no need to have a fail-over strategy
  • Symmetry of the k8s deployed services

Drawbacks:

  • No guarantee that the output of both flink pipelines will be the same
  • Double compute

See this presentation for a quick overview of the two strategies evaluated.

Operations

Kubernetes setup

Kubernetes only hosts the flink session cluster responsible for running the flink-session-clusterflink job. K8s does only manage a flink session cluster using the flink-session-cluster chart with the rdf-streaming-updater values.

Deploying the chart to staging (on deployment.eqiad.wmnet):

$ cd /srv/deployment-charts/helmfile.d/services/rdf-streaming-updater/
$ helmfile -e staging -i apply

Looking at the jobmanager and then the taskmanager logs in staging

$ kube_env rdf-streaming-updater staging
$ kubectl logs -l component=jobmanager -c flink-session-cluster-main -f
$ kubectl logs -l component=taskmanager -c flink-session-cluster-main-taskmanager -f

The flink jobmanager UI and REST endpoint is exposed via the 4007 port.

This endpoint has no lvs endpoint setup and is only used for internal management (main application deploys):

Note that the k8s cluster cannot yet be accessed via IPv6 thus IPv4 must be forced on your HTTP client (e.g. curl -4)

Logs

Flink logs are collected in logstash and can be filtered using: kubernetes.master_url:"https://kubemaster.svc.codfw.wmnet" AND kubernetes.namespace_name:"rdf-streaming-updater". Append kubernetes.labels.component:jobmanager to filter jobmanager's logs or taskmanager for the taskmanagers' logs.

If for some reasons the logs are not available in logstash they can still be inspected from the deployment server, e.g. to inspect the jobmanager and the taskmanager logs in staging:

$ kube_env rdf-streaming-updater staging
$ kubectl logs -l component=jobmanager -c flink-session-cluster-main -f
$ kubectl logs -l component=taskmanager -c flink-session-cluster-main-taskmanager -f

Add the --previous option to look at the logs of the previous run of the container if you want to debug why it crashed.

Managing the streaming-updater-producer

The flink job is managed using the flink/flink-job.py python script available in /srv/deployment/wdqs/wdqs/ on the deployment server.

The script supports 3 environments:

Important configuration per k8s environment
staging (eqiad) eqiad codfw
swift container rdf-streaming-updater-staging rdf-streaming-updater-eqiad rdf-streaming-updater-codfw
kafka cluster kafka-main@eqiad kafka-main@eqiad kafka-main@codfw
consumer group wdqs_streaming_updater_test wdqs_streaming_updater wdqs_streaming_updater
job name WDQS Streaming Updater WDQS Streaming Updater WDQS Streaming Updater

Commands accepting a path to the swift container accepts either an absolute path in the form: swift://rdf-streaming-updater-staging.thanos-swift/wikidata/savepoints/bootstrap-20210918 or a relative path savepoints/bootstrap-20210918. You should prefer the relative path approach to avoid mixing checkpoint/savepoint data between the swift containers.

Start the job

Start the job with the version 0.3.77 in eqiad from the savepoint savepoint/bootstrap-20210918.

python3 flink/flink-job.py \
       --env eqiad \
       --job-name "WDQS Streaming Updater" \
       deploy \
       --jar lib/streaming-updater-producer-0.3.77-jar-with-dependencies.jar \
       --options-file flink/rdf-streaming-updater.yaml \
       --initial-state savepoints/bootstrap-20210918

Deploy version upgrade

The code of the streaming-updater-producer has changed and needs to be updated. After releasing the wikidata/query/rdf repo and updating the wikidata/query/deploy repo.

python3 flink/flink-job.py \
       --env eqiad \
       --job-name "WDQS Streaming Updater" \
       redeploy \
       --jar lib/streaming-updater-producer-0.3.77-jar-with-dependencies.jar \
       --options-file flink/rdf-streaming-updater.yaml \
       --savepoint savepoints

The option --savepoints here just indicate when flink should store the savepoint to start the new code from. We do not rely on checkpoints for version upgrades.

Take a savepoint

Taking a savepoint might be needed for several reasons (inspect the state, backup) and can be done running:

python3 flink/flink-job.py \
       --env eqiad \
       --job-name "WDQS Streaming Updater" \
       save \
       --savepoint savepoints

The savepoint path is printed in the command output.

Stop the job

Stopping the job is generally not needed except when the flink image must be upgraded or when the helm chart requires incompatible changes to be deployed.

python3 flink/flink-job.py \
       --env eqiad \
       --job-name "WDQS Streaming Updater" \
       stop \
       --savepoint savepoints

The savepoint path is printed in the command output.

Flink/helm chart upgrade

  1. stop the job and note the savepoint
  2. apply the chart and wait for flink to be up and running
  3. start the job from the savepoint taken

Recover from a checkpoint

If job has failed and was not recovered automatically by flink H/A capabalities you may have to restart it manually from the last valid checkpoint. To identify the checkpoint to use first identify the date of when the job stopped to work properly (e.g. the (Flink) Kafka Lag graph should stop being updated).

Inspect the logs for lines with the logstash query: kubernetes.master_url:"https://kubemaster.svc.codfw.wmnet" AND kubernetes.namespace_name:"rdf-streaming-updater" AND message:"Completed checkpoint" .

Lines with Completed checkpoint 55466 for job bd5a9619a6dd893243db926a456ef42c (146097536 bytes in 6194 ms) should appear with the last one around the time of the failure.

bd5a9619a6dd893243db926a456ef42c is the job id (beware to verify that it's the right one if this flink session cluster can run multiple jobs) and 55466 is the checkpoint number.

Verify in the corresponding swift container that this checkpoint is valid using the swift command line tool available on stat1004:

swift -A https://thanos-swift.discovery.wmnet/auth/v1.0 -U wdqs:flink -K PASSWORD list rdf-streaming-updater-codfw -l -p wikidata/checkpoints/bd5a9619a6dd893243db926a456ef42c/chk-55466/
504991 2021-09-14 12:54:30 application/octet-stream wikidata/checkpoints/bd5a9619a6dd893243db926a456ef42c/chk-55466/_metadata

If a _metadata is present then it's the correct checkpoint. If the checkpoint number cannot be found via the logs be sure to double check that you are inspecting the proper swift container (rdf-streaming-updater-codfw vs rdf-streaming-updater-eqiad). You can also list all the checkpoints under wikidata/checkpoints/$JOB_ID and verify that the one having a _metadata corresponds to the date the pipeline stopped.

Once the checkpoint is identified (e.g. checkpoints/bd5a9619a6dd893243db926a456ef42c/chk-55466) it can be used as the --initial-state of the #Start the job procedure.

Monitoring

The flink session cluster activity can be monitored using the flink-session-cluster and the wdqs-streaming-updater graphana dashboards.

Important metrics:

  • flink job uptime in the flink-session-cluster dashboard (flink_jobmanager_job_uptime), indicates for how long the job has been running
    • a constant low uptime (below 10minutes) might indicate that the job is constantly restarting. Lag may start to rise.
  • Triples Divergences on the wdqs-streaming-updater dashboard, gives an indication of the divergences detected when applying the diffs, sudden surge might indicate the following problems:
    • on a single machine, the blazegraph journal was corrupted or copied from another source or a serious bug in the streaming-updater-consumer.
    • on all the machines in one or two DC, might indicate a problem in the streaming-updater-producer.
  • Consumer Poll vs Store time on the wdqs-streaming-updater gives an indication of the saturation of the writes of the streaming-updater-consumer. Poll time is how much time is spent polling/waiting on kafka, store time is how much is spent on writing to blazegraph.

Runbooks

The job is not starting

WdqsStreamingUpdaterFlinkJobUnstable

The job uptime remains under 5 minutes probably means that the job is constantly restarting. The cause of the failed restarts must be identified by inspecting the #Logs, it might be that some of the #Dependencies are having issues. Containers constantly being killed may lead to this problem as well (use kubectl get pod -o yaml to inspect containerStatuses).

The job is not running

WdqsStreamingUpdaterFlinkJobNotRunning

The job is not running, there are several reasons for this:

  • someone is doing a maintenance operation and the alert was not down-timed
  • flink is not running or crashing
  • the job had crashed without being restarted

For the last two points try to identify the cause of the crash looking at the #Logs, it could be that the k8s cluster does have enough resource to instantiate the required pods. Once the cause is known the flink session cluster must be brought up if it was not running. The job should recover itself after the flink session cluster starts, if it is not the case then you might to recover from a checkpoint.

The job processing latency is high

WdqsStreamingUpdaterFlinkProcessingLatencyIsHigh

The job processing time is higher than usual, it might be due to increased latencies of one or several of the job dependencies:

Cause should be identified and the impact on the pipeline monitored:

  • backlog for the consumer group should not grow
  • checkpoint times should not increase

First run (bootstrap)

The flink application must be given an initial-state, this initial state can be constructed from the RDF dumps using a flink job.

From stat1004.eqiad.wmnet install flink (same version as the one running in k8s) under your home directory.

Configure kerberos for analytics-search (in conf/flink-conf.yaml):

security.kerberos.login.use-ticket-cache: false
security.kerberos.login.keytab: /etc/security/keytabs/analytics-search/analytics-search.keytab
security.kerberos.login.principal: analytics-search/stat1004.eqiad.wmnet@WIKIMEDIA

Start flink:

sudo -u analytics-search kerberos-run-command analytics-search sh -c 'HADOOP_CLASSPATH="`hadoop classpath`" ./bin/yarn-session.sh -tm 8g -jm 2600m -s 4 -nm "WDQS Streaming Updater"'

Start the bootstrap job

FLINK_JOB=/home/dcausse/streaming-updater-producer-0.3.76-jar-with-dependencies.jar
DUMP_DATE=20210718
REV_FILE=hdfs://analytics-hadoop/wmf/data/discovery/wdqs/entity_revision_map/$DUMP_DATE/rev_map.csv
# Use rdf-streaming-updater-eqiad or rdf-streaming-updater-codfw to create the savepoint for the eqiad or codfw flink job
SAVEPOINT_DIR=swift://rdf-streaming-updater-staging.thanos-swift/wikidata/savepoints/init_20210718

sudo -u analytics-search kerberos-run-command analytics-search sh -c "export HADOOP_CLASSPATH=`hadoop classpath`; ./bin/flink run -p 12 -c org.wikidata.query.rdf.updater.UpdaterBootstrapJob $FLINK_JOB --job_name bootstrap --revisions_file $REV_FILE --savepoint_dir $SAVEPOINT_DIR"

Position the kafka offsets for the flink consumers.

First obtain the timestamp of the oldest start date of the dump script using hive.

select object
    from discovery.wikibase_rdf
    where `date` = '20210718' and wiki='wikidata' and
          subject = '<http://wikiba.se/ontology#Dump>' and
          predicate = '<http://schema.org/dateModified>'
    order by object asc
    limit 1;

Position the offsets according to that date (can be done from stat1004 as well):

# This is the most dangerous command of this procedure as it may break
# an existing flink job by messing up their kafka consumer offsets.
# Be sure to create and activate a conda env with kafka-python
# Start obtained from the hql query above
START_DATE=2021-08-23T19:41:00
# Use kafka-main1001.eqiad.wmnet:9092 for eqiad and staging
KAFKA_BROKER=kafka-main2001.codfw.wmnet:9092
# Must match the options consumer_group of the flink_job
# note: set_initial_offsets.py is available at https://gerrit.wikimedia.org/r/plugins/gitiles/wikidata/query/rdf/+/refs/heads/master/streaming-updater-producer/scripts/set_initial_offsets.py
CONSUMER_GROUP=wdqs_streaming_updater

for c in eqiad codfw; do
    for t in mediawiki.revision-create mediawiki.page-delete mediawiki.page-undelete mediawiki.page-suppress; do
        python set_initial_offsets.py  -t $c.$t -c $CONSUMER_GROUP -b $KAFKA_BROKER -s $START_DATE;
    done;
done

Then start the flink job on k8s using the savepoint.