You are browsing a read-only backup copy of Wikitech. The live site can be found at wikitech.wikimedia.org
Wikidata Query Service/Streaming Updater
This page is currently a draft.
More information and discussion about changes to this draft on the talk page.
The WDQS Streaming Updater is an Apache Flink application whose purpose is to create a stream of diffs of RDF triples, meant to be fed into Blazegraph. It uses available change streams to calculate the diffs and push it to Kafka topic.
The application reads some of the topics populated by mw:Extension:EventBus and builds a diff of the RDF content as produced by mw:Wikibase/EntityData by comparing the last seen revision for this entity with the new revision seen from the mediawiki.revision-create topic. It is meant to integrate as a Stream processor part of the Modern Event Platform.
It relies on flink to provide the following functionalities:
- event time semantic to re-order the events out of multiple kafka topics
- state management consistent with the output of the stream
The flink application (code name streaming-updater-producer) is responsible for producing its data to a kafka topic, a client (named streaming-updater-consumer) running on the same machines as the triple store (known as wdqs hosts) is responsible for reading this topic and performing updates.
The dependencies of the flink application are:
- The mediawiki application servers for mw:Wikibase/EntityData
- Kafka (main) for consuming Mediawiki changes and for producing its output
- Swift (thanos) for the object storage but the aim is to use future MOS
- K8S services cluster to run flink as a session cluster
- schema.wikimedia.org for verifying the validity of the event it emits against their Event_Platform/Schemas
- meta.wikimedia.org for fetching the stream configurations
The flink application is active/active and runs in both eqiad and codfw through the Kubernetes cluster hosting services. The WDQS machines in eqiad will consume the output of flink application running in eqiad.
The benefit of this approach are:
- simple to put in place in out setup: no need to have a failover strategy bringing up flink in the spare DC and resuming operations from where the failed one left (requires offsets replication between two kafka cluster)
- Symmetry of the k8s deployed services
- No guarantee that the output of both flink pipelines will be the same
- Double compute
See this presentation for a quick overview of two strategies evaluated.
Kubernetes only hosts the flink session cluster responsible for running the flink-session-clusterflink job. K8s does only manage a flink session cluster using the flink-session-cluster chart with the rdf-streaming-updater values.
Deploying the chart to staging (on deployment.eqiad.wmnet):
$ cd /srv/deployment-charts/helmfile.d/services/rdf-streaming-updater/ $ helmfile -e staging -i apply
Looking at the jobmanager and then the taskmanager logs in staging
$ kube_env rdf-streaming-updater staging $ kubectl logs -l component=jobmanager -c flink-session-cluster-main -f $ kubectl logs -l component=taskmanager -c flink-session-cluster-main-taskmanager -f
The flink jobmanager UI and REST endpoint is exposed via the 4007 port.
This endpoint has no lvs endpoint setup and is only used for internal management (main application deploys):
https://kubernetes1003.eqiad.wmnet:4007(beware to disable TLS host verification here, e.g. using
https://kubernetes2003.codfw.wmnet:4007(ditto regarding TLS host verification)
Note that the k8s cluster cannot yet be accessed via IPv6 thus IPv4 must be forced on your HTTP client (e.g.
Flink logs are collected in logstash and can be filtered using:
kubernetes.master_url:"https://kubemaster.svc.codfw.wmnet" AND kubernetes.namespace_name:"rdf-streaming-updater". Append
kubernetes.labels.component:jobmanager to filter jobmanager's logs or taskmanager for the taskmanagers' logs.
Managing the streaming-updater-producer
Start the job
Deploy version upgrade
Take a savepoint
Stop the job
- flink job uptime in the flink-session-cluster dashboard (flink_jobmanager_job_uptime), indicates for how long the job has been running
- a constant low uptime (below 10minutes) might indicate that the job is constantly restarting. Lag may start to rise.
- Triples Divergences on the wdqs-streaming-updater dashboard, gives an indication of the divergences detected when applying the diffs, sudden surge might indicate the following problems:
- on a single machine, the blazegraph journal was corrupted or copied from another source or a serious bug in the streaming-updater-consumer.
- on all the machines in one or two DC, might indicate a problem in the streaming-updater-producer.
- Consumer Poll vs Store time on the wdqs-streaming-updater gives an indication of the saturation of the writes of the streaming-updater-consumer. Poll time is how much time is spent polling/waiting on kafka, store time is how much is spent on writing to blazegraph.
First run (bootstrap)
This section implies a lot of manual steps that may be dangerous if the flink job is currently running. Please be sure to coordinate with the Search Platform team if you believe you need run this.
The flink application must be given an initial-state, this initial state can be constructed from the RDF dumps using a flink job.
stat1004.eqiad.wmnet install flink (same version as the one running in k8s) under your home directory.
Configure kerberos for analytics-search (in conf/flink-conf.yaml):
security.kerberos.login.use-ticket-cache: false security.kerberos.login.keytab: /etc/security/keytabs/analytics-search/analytics-search.keytab security.kerberos.login.principal: analytics-search/stat1004.eqiad.wmnet@WIKIMEDIA
sudo -u analytics-search kerberos-run-command analytics-search sh -c 'HADOOP_CLASSPATH="`hadoop classpath`" ./bin/yarn-session.sh -tm 8g -jm 2600m -s 4 -nm "WDQS Streaming Updater"'
Start the bootstrap job
FLINK_JOB=/home/dcausse/streaming-updater-producer-0.3.76-jar-with-dependencies.jar DUMP_DATE=20210718 REV_FILE=hdfs://analytics-hadoop/wmf/data/discovery/wdqs/entity_revision_map/$DUMP_DATE/rev_map.csv # Use rdf-streaming-updater-eqiad or rdf-streaming-updater-codfw to create the savepoint for the eqiad or codfw flink job SAVEPOINT=swift://rdf-streaming-updater-staging.thanos-swift/wikidata/savepoints/init_20210718 sudo -u analytics-search kerberos-run-command analytics-search sh -c "export HADOOP_CLASSPATH=`hadoop classpath`; ./bin/flink run -p 12 -c org.wikidata.query.rdf.updater.UpdaterBootstrapJob $FLINK_JOB --job_name bootstrap --revisions_file $REV_FILE --savepoint_dir $SAVEPOINT_DIR"
Position the kafka offsets for the flink consumers.
First obtain the timestamp of the oldest start date of the dump script using
select object from discovery.wikibase_rdf where date = '20210718' and project='wikidata' and subject = '<http://wikiba.se/ontology#Dump>' and predicate = '<http://schema.org/dateModified>' order by object asc limit 1;
Position the offsets according to that date (can be done from stat1004 as well):
# This is the most dangerous command of this procedure as it may break # an existing flink job by messing up their kafka consumer offsets. # Be sure to create and activate a conda env with kafka-python # Start obtained from the hql query above START_DATE=2021-08-23T19:41:00 # Use kafka-main1001.eqiad.wmnet:9092 for eqiad and staging KAFKA_BROKER=kafka-main2001.codfw.wmnet:9092 # Must match the options consumer_group of the flink_job # note: set_offsets.py is available from CONSUMER_GROUP=wdqs_streaming_updater for c in eqiad codfw; do for t in mediawiki.revision-create mediawiki.page-delete mediawiki.page-undelete mediawiki.page-suppress; do python set_offsets.py -t $c.$t -c $CONSUMER_GROUP -b $KAFKA_BROKER -s $START_DATE; done; done
Then start the flink job on k8s using the savepoint.