You are browsing a read-only backup copy of Wikitech. The primary site can be found at


From Wikitech-static
Revision as of 18:27, 1 February 2017 by imported>Ottomata
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Jump to navigation Jump to search


EventStreams is a service that exposes continuous streams of structured event data. It does so over HTTP using chunked transfer encoding in the Server Sent Events format. EventStreams can be consumed directly via HTTP, but is more commonly used via an EventSource client library.

EventStreams provides access to arbitrary streams of data, including Mediawiki RecentChanges. It replaces RCStream, and possibly EventStreams is backed by Kafka.

Note: Often 'SSE' and EventSource are used interchangeably. This document refers to SSE as the server side library, and EventSource as the client side library.

Client Examples With RecentChanges Feed


NodeJS (with eventsource)

var EventSource = require('eventsource');
var url = '';

console.log(`Connecting to EventStreams at ${url}`);
var eventSource = new EventSource(url);

eventSource.onopen = function(event) {
    console.log('--- Opened connection.');

eventSource.onerror = function(event) {
    console.error('--- Encountered error', event);

eventSource.onmessage = function(event) {
    // will be a JSON string containing the message event.

Python (with sseclient)

from sseclient import SSEClient as EventSource
import json

url = 'http://localhost:8092/v2/stream/recentchange'
for event in EventSource(url):
    if event.event == 'message' and
        print(json.dumps(json.loads(, indent=4, ensure_ascii=False))
    elif event.event == 'error':
        print('--- Encountered error',

CLI with curl and jq

Grep for the data part of the event, strip out the data: prefix, and prettify the event with jq.

curl -s| 
  grep data |
  sed 's/^data: //g' |
  jq .


The list of streams that are available will change over time, so they will not be documented here. To see the active list of available streams, request the swagger spec at The available streams URI paths all begin with /v2/stream, e.g.

"/v2/stream/recentchange": {
    "get": {
      "produces": [
        "text/event-stream; charset=utf-8"
      "description": "Mediawiki RecentChanges feed. Schema:"

The examples here will all use the RecentChanges stream from This section describes the format an EventStreams stream endpoint will return in response body.

Requesting /v2/stream/recentchange will get you a SSE/EventSource formatted stream of data. This format is most easily interpreted using an EventSource client library. If you choose not to use one of these, you will still get a human readable stream of data in the following format.

event: message
id: [{"topic":"eqiad.mediawiki.recentchange","partition":0,"offset":142461965},{"topic":"codfw.mediawiki.recentchange","partition":0,"offset":-1}]
data: {"event": "data", "is": "here"}

Each event will be separated by 2 newlines (\n\n), and have event, id, and data fields.

The event will be message for data events, and error for error events. id is a JSON array of Kafka topic, partition and offset metadata. The id field can be used to tell EventStreams to start consuming from a earlier position in the stream. This enables clients to automatically resume from where they left off if they are disconnected. EventSource implementations handle this transparently. Note that the topic partition and offsets for all topics and partitions that make up this stream are included in every message's id field. This allows EventSource to be specific about where it left off even if the consumed stream is composed of multiple Kafka topic-partitions.

You may request that EventStreams begins streaming to you from different offsets by setting an array of topic, partition, offset objects in the Last-Event-ID HTTP header.


Unlike RCStream, EventStreams does not (yet) have $wgServerName (or any other) server side filtering capabilities. We would like to add this, but exactly how this would fit into the API and what filtering features is still under debate.

Until server side filtering exists, you'll need to do your filtering client side, e.g.

 * Calls cb(event) for every event where recentchange event.server_name == server_name.
function filterWiki(event, server_name, cb) {
    if (event.server_name == server_name) {

eventSource.onmessage = function(event) {
    // Print only events that from Wikimedia Commons.
    filterWiki(JSON.parse(, '', console.log);


SSE vs websockets/

RCStream was written, so why not continue to use it for its replacement?

Websockets doesn't use HTTP, which makes it different than most of the other services that Wikimedia runs. It is especially powerful when clients and servers need a bi-directional pipe to communicate with each other asynchronously. EventStreams only needs to send events from the server to clients, and is 100% HTTP. As such, it can be consumed using any HTTP client out there, without the need for programming several RPC like initialization steps.

We did originally build a Kafka -> library (Kasocki), but after doing so we decided that SSE was a better fit and built KafkaSSE.


KafkaSSE is a library that glues a Kafka Consumer to a connected HTTP SSE client. A Kafka Consumer is assigned topics, partitions, and offsets, and then events are streamed from the consumer to the HTTP client in chunked-transfer encoding. EventStreams maps stream routes (e.g /v2/stream/recentchanges) to specific topics in Kafka.


WMF maintains several internal Kafka clusters, producing hundreds of thousands of messages per second. It has proved to be highly scalable and feature-ful. It is multi producer and multi consumer. Our internal events are already produced through Kafka, so using it as the EventStreams backend was a natural choice.

Kafka allows us to begin consuming from any message offset (that is still present on the backend Kafka cluster). This feature is what allows connected EventStreams clients to auto-resume (via EventSource) when they disconnect. In the future, we may implement timestamp based consumption, so that a client could begin consuming an event stream from a timestamp in the past.

WMF Administration


See also

Source code: EventStreams
Source code: kafka-sse