Jump to content

This is a read-only backup copy of Wikitech. The live site can be found at wikitech.wikimedia.org

Event Platform/Producing events

From Wikitech

This is a guide for producing event streams using WMF's Event Platform . It walks through the schema and stream config, the available producer clients, local development, deployment, and how to consume the events once they're flowing.

For most analytics instrumentation, you should use Test Kitchen .

Overview

To produce events using Event Platform, you need:

  1. An event JSONSchema in one of the WMF schema repositories .
  2. A stream declared in EventStreamConfig .
  3. Valid events produced via one of:

Once events are in Kafka, they are (by default) automatically ingested into the Data Lake as a Hive table. They are also consumable from Kafka, and may also available in the EventStreams HTTP API .

Event Platform requirements

Event schemas

WMF event schemas are written as YAML JSONSchemas, kept in Git, and identified by a versioned URI. They live in one of the WMF event schema repositories (e.g. schemas-event-primary for operational production schemas, schemas-event-secondary for analytics).

A schema's title matches its path in the repository, and its $id is the path with a semver version on the end. For example, jsonschema/my_namespace/thing_happened/1.0.0.yaml might have:

title: my_namespace/thing_happened
$id: /my_namespace/thing_happened/1.0.0

A minimal unmaterialized current.yaml schema file might like this.

title: my_namespace/thing_happened
description: An event about when thing happened
$id: /my_namespace/thing_happened/1.0.0
$schema: https://json-schema.org/draft-07/schema#
type: object
allOf:
  - $ref: /fragment/common/2.0.0#
  - properties:
      some_field:
        description: The value of the thing that happened
        type: string

$ref pulls in required and common fields ( see below ) so you don't have to define them yourself.

For the full schema authoring workflow (including how jsonschema-tools materializes versioned files), see Event Platform/Schemas and Event Platform/Schemas/Guidelines . The README in each schema repository also documents its own conventions. Read those before creating or modifying a schema.

Once a schema change is merged, it should be auto deployed to https://schema.wikimedia.org within 30 minutes.

Required event fields

Every event schema must have these fields:

  • $schema : the versioned schema URI. Must match the schema's $id . EventGate uses this to look up which schema to validate against.
  • meta.stream : the name of the stream this event belongs to.
  • dt : the event timestamp, as an ISO-8601 UTC date-time.
  • meta.dt : the system receive timestamp, as an ISO-8601 UTC date-time.
Generally, you should not set meta.dt values in your events. EventGate will set this for you.

See also Event Platform/Schemas/Guidelines#Required fields .

Stream configuration

Declare your stream by adding an entry to wgEventStreams in mediawiki-config/wmf-config/ext-EventStreamConfig.php . A minimum entry is:

'wgEventStreams' => [
    'default' => [
        // ...
        'my_namespace.thing_happened' => [
            'schema_title' => 'my_namespace/thing_happened',
            'destination_event_service' => 'eventgate-analytics-external',
        ],
    ],
],

schema_title must match the schema's title field exactly. This is used to ensure that only events of that schema are allowed in the stream.

The stream name does not have to match the schema title. Multiple streams can reuse the same schema.

destination_event_service names the EventGate cluster the event stream is allowed to be produced to.

See also other common stream settings .

Deploying stream config

  1. Edit ext-EventStreamConfig.php and get the patch reviewed.
  2. Schedule a Backport window to deploy mediawiki-config (or deploy it on your own).
  3. Verify your stream is in the API: curl ' https://meta.wikimedia.org/w/api.php?action=streamconfigs&streams=my_namespace.thing_happened' | jq .
  4. If your stream targets an EventGate cluster that only requests stream configs at startup (check the docs ), ask for that cluster to be restarted .

To override settings for beta only, edit InitialiseSettings-labs.php instead. See #Per-wiki and beta overrides below.

Producing events

There are several ways to produce events to Kafka:

  • EventBus 's PHP API for non-analytics events.
  • HTTP POST to EventGate /v1/events endpoint.
  • EventLogging 's PHP API: simpler API for analytics events (this uses EventBus).
  • MediaWiki EventLogging JS API: POSTs to an externally exposed EventGate, usually for analytics.
  • Directly to Kafka: not recommended unless using a supported client library

Producing with EventBus

use MediaWiki\Deferred\DeferredUpdates;
use MediaWiki\Extension\EventBus\EventBus;

DeferredUpdates::addCallableUpdate( static function () {
    $event = [
        '$schema' => '/my_namespace/thing_happened/1.0.0',
        'meta' => [
            'stream' => 'my_namespace.thing_happened',
        ],
        'dt' => wfTimestamp( TS_ISO_8601 ),
        'page_id' => 12345,
        'action' => 'edited',
    ];

    EventBus::getInstanceForStream( 'my_namespace.thing_happened' )
        ->send( [ $event ] );
} );

EventBus::getInstanceForStream uses your stream's destination_event_service choose the EventGate cluster.

Producing to EventGate

The POST body can be a single event or an array of events.

echo '
{
  "$schema": "/my_namespace/thing_happened/1.0.0",
  "meta": { "stream": "my_namespace.thing_happened" },
  "dt": "2026-05-29T12:34:56Z",
  "some_field": "value"
}
' | curl -H 'Content-Type: application/json' -d @- \
  https://intake-analytics.wikimedia.org/v1/events

EventGate has two producer modes 'guaranteed' and 'hasty'. 'guaranteed' is the default. See EventGate producer modes for the difference. EventGate has different endpoint URLs depending on which EventGate you are targeting and where you are producing from.

Producing to Kafka

You can produce directly to Kafka, but you should do everything EventGate would do: schema lookup, validation, setting meta fields, picking the right Kafka topic, etc. Event Platform/Producer Requirements explains the producer contract.

wikimedia-event-utilities has a Java library for producing Event Platform streams to Kafka. It can be used via pyflink through eventutilties-python .

Otherwise, you should only produce to Kafka directly if you know what you are doing.

MediaWiki local development

You'll need a local EventGate devserver. The MediaWiki-Docker EventGate recipe adds an eventgate service to your docker-compose.override.yml .

Point EventBus at it by adding this to LocalSettings.php :

wfLoadExtension( 'EventBus' );

$wgEventServices = [
    'default' => [
        'url' => 'http://eventgate:8192/v1/events',
    ],
];
$wgEventServiceDefault = 'default';
$wgEnableEventBus = 'TYPE_EVENT';

This EventGate devserver fetches schemas from https://schema.wikimedia.org by default, and accepts events for any stream (no EventStreamConfig required). If you also want to validate against a schema you are currently developing, mount a local checkout of the schema repository into the container and point EventGate at it via the schema_base_uris setting in its config file. (The EventLogging recipe linked below shows that pattern.)

If you don't have MediaWiki Docker running yet, see DEVELOPERS.md in mediawiki/core.

Producing analytics events

For most analytics instrumentation, you should use Test Kitchen .

From a technical perspective, analytics events are not special. However, because they have the potential to collect sensitive data, they require some special care.

Consult the Data Collection Guidelines before starting instrumentation to determine which risk tier your planned data collection activity falls under. If it is Low Risk, you do not need to submit a request for approval. If it is Medium or High Risk, you need to submit a request through L3SC . For affiliates (such as WMDE working on features and instruments that are deployed on Foundation infrastructure) who cannot access L3SC, please submit a request through the Data Platform Engineering intake process so that someone from DPE can submit a request to L3SC on your behalf. It is recommended to perform this step before starting instrumentation because during the review process you may learn that you cannot collect certain data you were planning to collect, so you will save yourself time by not writing code that you will have to remove. For more information about what your request should contain, refer to this draft guide on measurement plans and instrumentation specifications .

Analytics schemas live in schemas-event-secondary under the analytics namespace. The schema authoring workflow is the same as for any other Event Platform schema. See Event Platform/Schemas .

If not using Test Kitchen, you can produce events through the EventLogging PHP or JS APIs.

Registering the stream with EventLogging

EventLogging needs to know it should be allowed to produce the stream. Add the stream name to wgEventLoggingStreamNames in ext-EventLogging.php :

'wgEventLoggingStreamNames' => [
    'default' => [
        // ...
        'analytics.thing_happened',
    ],
],

EventLogging JavaScript

mw.eventLog.submit( 'analytics.thing_happened', {
    // $schema must match the $id of the schema version your event conforms to.
    $schema: '/analytics/thing_happened/1.0.0',
    dt: new Date().toISOString(),
    page_id: mw.config.get( 'wgArticleId' ),
    action: 'clicked',
    target: 'example-widget'
} );

The first argument is the stream name. It must match what's declared in wgEventStreams in production. EventLogging sets meta.stream for you from this value.

EventLogging PHP

$event = [
    // $schema must match the $id of the schema version your event conforms to.
    '$schema' => '/analytics/thing_happened/1.0.0',
    'dt' => wfTimestamp( TS_ISO_8601 ),
    'field_a' => 'value_a',
    // ...
];

EventLogging::submit( 'analytics.thing_happened', $event );

MediaWiki local development

The MediaWiki-Docker EventLogging recipe is sets up a local EventGate together with EventLogging. It adds an eventlogging service to docker-compose.override.yml running the EventLogging devserver (which bundles EventGate), and gives you a LocalSettings.php snippet that points EventBus and EventLogging at it.

The recipe has two flavors. The minimal one fetches schemas from https://schema.wikimedia.org , which uses existent schemas. The one with local schema repositories mounts clones of schemas-event-primary and schemas-event-secondary into the container, allowing you to develop schema changes locally.

Events ends up in cache/events.json . tail -f cache/events.json to view events as they flow.

You can also tail eventgate service logs to see validation and other errors:

docker compose tail -f eventlogging

You can quickly test that everything is working from the browser console using the 'test.event' stream.

mw.eventLog.submit( 'test.event', {
    $schema: '/test/event/1.0.0',
    test: 'Hello from JavaScript!'
} );

Viewing produced events

Once events are being produced you can read them from EventStreams , from the Data Lake , or directly from Kafka.

EventStreams

There are three EventStreams instances:

Data Lake

Almost all streams are ingested into the Data Lake within a few hours. The Hive table name is a normalized version of the stream name, in the event database. Our example my_namespace.thing_happened stream lands in event.my_namespace_thing_happened . From there you can query with Hive , Spark or Presto , and dashboard via Superset .

Event data is retained for 90 days by default. See Event Sanitization to extend that.

Directly from Kafka

Streams are produced into datacenter prefixed Kafka topics . The my_namespace.thing_happened stream produces to eqiad.my_namespace.thing_happened and codfw.my_namespace.thing_happened . To get the full stream, consume both.

To find out which topics and clusters your stream is on, ask the stream config API:

curl 'https://meta.wikimedia.org/w/api.php?action=streamconfigs&streams=my_namespace.thing_happened' | jq .

Most streams are mirrored into jumbo-eqiad , which is the easiest place to consume from on a stat host :

# -C:     consume mode
# -u:     unbuffered
# -b:     broker (any broker in the cluster; it'll discover the rest)
# -t:     topic
# -o end: start at the end of the topic (only new events)
kafkacat -C -u -b kafka-jumbo1010.eqiad.wmnet:9092 \
  -t eqiad.my_namespace.thing_happened -o end | jq .

To pretty-print the last 5 messages instead:

kafkacat -C -b kafka-jumbo1010.eqiad.wmnet:9092 \
  -t eqiad.my_namespace.thing_happened -o -5 -e -q | jq .

For Test Kitchen -based instrumentation, events end up in eqiad.product_metrics.web_base (or a similar Test Kitchen topic), and you filter by instrument_name or experiment.enrolled :

kafkacat -C -b kafka-jumbo1010.eqiad.wmnet:9092 \
  -t eqiad.product_metrics.web_base -o -5 -e -q | \
  jq 'select(.instrument_name == "YOUR_INSTRUMENT_NAME")'
Kafka topic names are case sensitive. Capitalization is whatever you used in wgEventStreams . Note also that legacy EventLogging streams are named differently and are not split by datacenter, e.g. eventlogging_InukaPageView .

Evolving your schema

You can only make backwards-compatible schema changes, which in practice means adding new optional fields. To add a field, edit current.yaml , bump the version in $id (a minor version bump for an added field), and materialize:

$ npm run build-modified
$ git add jsonschema/analytics/thing_happened/*
$ git commit -m 'analytics/thing_happened - add link_text field, bump to 1.1.0'

Then update the producer code to set the new field and the new $schema version URI. jsonschema-tools checks compatibility in CI, but you can run it locally too:

$ npm test
...
  Schema Compatibility in Repository ./jsonschema/
    analytics/thing_happened
      Major Version 1
        ✓ 1.1.0 must be compatible with 1.0.0

Once the new schema is merged and your producer code is deployed, events with the new field will be produced. Old events with the old version keep validating against the old schema.

Backwards-incompatible changes

In general, backwards-incompatible changes are not allowed, because there is no way to do them without coordination with all consumers. If you really need one, file a Phabricator ticket tagged with #Data-Engineering. The process will be manual and vary depending on the change.

Per-wiki and beta stream config overrides

Stream config is just MediaWiki config, so you can override it per wiki or per wiki group with the standard +wikiname merge syntax. This works for any MediaWiki based usage of stream config settings, e.g. EventLogging, EventBus, etc.

It does not work for EventGate or Data Lake ingestion settings, because those are is not wiki-aware: it always reads stream config from metawiki. Anything that affects validation or production ( schema_title , destination_event_service ) has to live in default or in a +metawiki override.

For beta , the same syntax applies in InitialiseSettings-labs.php , with one catch: the default section in InitialiseSettings-labs.php doesn't merge with default from InitialiseSettings.php . Only per-wiki overrides merge. So if your stream isn't yet declared in production, declare it under +metawiki in InitialiseSettings-labs.php so EventGate (which always reads from metawiki) can see it.

Schema validation errors

Events that fail validation are not produced. Instead, EventGate produces a validation error event into a corresponding *.error.validation stream. Events sent through EventLogging end up in eventgate-analytics-external.error.validation , which is ingested into Hive as event.eventgate_analytics_external_error_validation .

Validation errors are also routed into Logstash. Useful starting points:

The *.error.validation streams are streams like any other, so you can also subscribe to them from EventStreams.

Exposing events publicly

Streams produced through the Kafka main clusters can be exposed on the public EventStreams service at https://stream.wikimedia.org . Before you expose anything, make sure the stream contains no PII and has been cleared against the Data Collection Guidelines . Once exposed, the stream is consumable by anyone on the internet.

To expose a stream, add it to allowed_streams in deployment-charts/helmfile.d/services/eventstreams/values.yaml :

config:
  # ...
  allowed_streams:
    # ...
    - my_namespace.thing_happened

Once that patch is reviewed and deployed, the stream shows up in the public EventStreams API docs and can be consumed at https://stream.wikimedia.org .

Decommissioning

When you set up a stream, plan for how it ends. Generally, schemas should not be deleted , because there may still be older events referencing them in the Data Lake, but you can remove the stream-related code and config at any time to stop producing.

To decommission:

  1. Stop producing events from your code.
  2. Remove the stream's entry from wgEventStreams (and wgEventLoggingStreamNames if applicable).
  3. Mark the stream as decommissioned in its DataHub entry.
  4. If you no longer need the schema, update its description in the schema repository (in a new materialized version) and note the deprecation in its README/CHANGELOG.

See also