You are browsing a read-only backup copy of Wikitech. The live site can be found at wikitech.wikimedia.org

Difference between revisions of "Analytics/Systems/Cluster/MediaWiki Avro Logging"

From Wikitech-static
Jump to navigation Jump to search
imported>Ebernhardson
imported>BryanDavis
 
Line 1: Line 1:
{{Outdated}}
#REDIRECT [[Obsolete:Analytics/Systems/Cluster/MediaWiki Avro Logging]]
 
Logs can be generated from the production MediaWiki instances in [https://avro.apache.org/docs/current/ Avro] and shipped through Kafka to Hadoop and finally end up in HDFS. This documentation is a work in progress. Get ahold of ottomatta, nuria, dcausse, or ebernhardson to clear up any questions about this documentation.
 
== Schema ==
Schemas are stored in the [https://gerrit.wikimedia.org/r/#/admin/projects/mediawiki/event-schemas mediawiki/event-schemas] repository. Schemas are labeled with a version number, starting at 10, along with the a unix timestamp indicating when the schema was created. For example the schema stored in <code>avro/mediawiki/CirrusSearchRequestSet/101446746400.avsc</code> within the event-schemas repository is the first version of the CirrusSearchRequestSet. It was created at 1446746400.
 
=== Schema conventions ===
* The schema should contain a <code>{ "name": "ts", "type": "int"}</code> field recording the UNIX epoch time that the record represents. This is used by the [https://phabricator.wikimedia.org/diffusion/OPUP/browse/production/modules/camus/templates/mediawiki.erb default camus job] that consumes messages from all <code>mediawiki_.*</code> kafka topics
* Schemas should use camelCase naming conventions for field names when possible. This matches the default naming convention for EventLogging schemas.
 
== MediaWiki ==
 
Once a schema has been added it can be referenced from the [https://gerrit.wikimedia.org/r/#/admin/projects/operations/mediawiki-config operations/mediawiki-config] repository. You will need to generate a submodule bump for the <code>wmf-config/event-schemas</code> directory within operations/mediawiki-config so it points to a version that includes the new schema. In addition you will need to edit <code>wmf-config/InitializeSettings.php</code> .  First off you need to associate a monolog channel to the Avro schema. This is done through the <code>wmgMonologAvroSchemas</code> setting. The <code>schema</code> field needs to be a string containing a valid Avro schema. This schema must be sourced from the event-schemas repository. The <code>revision</code> field refers to the exact schema number. This schema number is encoded into the header of a log message so that readers can use the correct schema to decode the log line.<syntaxhighlight lang="php">
'wmgMonologAvroSchemas' => array(
    'default' => array(
        'CirrusSearchRequestSet' => array(
            'schema' => file_get_contents( __DIR__ . '/event-schemas/avro/mediawiki/CirrusSearchRequestSet/111448028943.avsc' ),
            'revision' => 111448028943,
        ),
    ),
),
</syntaxhighlight>
 
In addition to defining the schema a Monolog channel needs to be defined for this schema. The channel name must be the same as the schema name above. When utilizing the Avro + Kafka pipeline the buffer flag should always be set to true to prevent latency communicating with Kafka from negatively impacting page load performance. For any high volume logging the logstash handler for this channel must be disabled. The udp2log channel can be enabled or disabled depending on your needs.<syntaxhighlight lang="php">
'wmgMonologChannels' => array(
    'default' => array(
        ...
        'CirrusSearchRequestSet' => array(
            'kafka' => 'debug',
            'udp2log' => false,
            'logstash' => false,
            'buffer' => true
        ),
        ...
    )
),
 
</syntaxhighlight>
 
== Kafka ==
With MediaWiki configured as defined above binary Avro log messages will start appearing in the mediawiki_CirrusSearchRequestSet topic of Kafka. Topics will be auto-created in Kafka as necessary, but they will only have a single partition. For logging channels with any kind of volume, which should be the only reason a MediaWiki->Kafka->Hadoop pipeline is necessary, you will need to create a ticket in Phabricator for analytics engineering to create your topic with an appropriate number of partitions.
 
=== Test reading an Avro record from a Kafka topic ===
Individual messages can be read out of Kafka on stat1002.eqiad.wmnet, to verify the data is being encoded as expected. This command will read the most recent message from one partition:
 
<syntaxhighlight lang="bash">
$ kafkacat -b kafka1012 -t mediawiki_CirrusSearchRequestSet -c 1 > csrq.avro
</syntaxhighlight>
 
The extracted message is not quite valid Avro yet, it contains a 9 byte header pre-pended to it. The header can be read with this small bit of scala:<syntaxhighlight lang="scala">
object AvroTest {
  def main(args: Array[String]) {
    val file = new java.io.FileInputStream(args(0));
    val data = new java.io.DataInputStream(file);
    println("magic = %d".format(data.readByte()));
    println("revid = %d".format(data.readLong()));
  }
}
</syntaxhighlight>
 
With that code in a file named <code>avrotest.scala</code> the following shell command will read in and report the header. Magic must always be 0. The revid must match the writer schema revision id that was used to write the message in mediawiki:
 
<syntaxhighlight lang="bash">
$ scala avrotest.scala csrq.avro
</syntaxhighlight>
 
Finally the actual Avro message can be read in to ensure it, as well, is as expected with the following shell command. Pull down [https://archiva.wikimedia.org/repository/mirrored/org/apache/avro/avro-tools/1.7.7/avro-tools-1.7.7.jar avro-tools-1.7.7.jar] to your home directory and run the following command. This will throw a java.io.EOFException after printing out the decoded event. The EOFException can safely be ignored.
 
<syntaxhighlight lang="bash">
$ dd if=csrq.avro bs=1 skip=9 | \
      java -jar avro-tools-1.7.7.jar fragtojson --schema-file event-schemas/CirrusSearchRequestSet/111448028943.avsc -
</syntaxhighlight>
 
Here's a script that can be used to perform all of the steps:
<syntaxhighlight lang="bash">
#!/usr/bin/env bash
# Validate MediaWiki Avro encoded message
 
CHANNEL=${1:?MediaWiki log channel expected (e.g. ApiAction)}
VERSION=${2:?Avro schema version expected (e.g. 101453221640)}
 
TOPIC="mediawiki_${CHANNEL}"
KAFKA_SERVER=kafka1012
REC="${CHANNEL}.avro"
SCALA_PROG=avrotest.scala
SCHEMAS=/srv/event-schemas/avro/mediawiki
 
# create a temp file for capturing command output
TEMPFILE=$(mktemp -t $(basename $0).XXXXXX)
trap '{ rm -f "$TEMPFILE"; }' EXIT
 
# Grab a record from kafka
echo "Reading 1 record from ${TOPIC} via ${KAFKA_SERVER}..."
echo "(this may take a while depending on event volume)"
kafkacat -b ${KAFKA_SERVER} -t ${TOPIC} -c 1 > ${REC}
 
# Validate the header
[[ -f $SCALA_PROG ]] ||
cat <<EOF >$SCALA_PROG
object AvroTest {
  def main(args: Array[String]) {
    val file = new java.io.FileInputStream(args(0));
    val data = new java.io.DataInputStream(file);
    println("magic = %d".format(data.readByte()));
    println("revid = %d".format(data.readLong()));
  }
}
EOF
 
echo "Checking binary packet header..."
scala $SCALA_PROG $REC > "$TEMPFILE"
grep 'magic = 0' "$TEMPFILE" || {
  echo >&2 "[ERROR] Expected 'magic = 0' in $REC"
  cat "$TEMPFILE"
  exit 1
}
grep "revid = ${VERSION}" "$TEMPFILE" || {
  echo >&2 "[ERROR] Expected 'revid = ${VERSION}' in $REC"
  cat "$TEMPFILE"
  exit 1
}
 
# Validate the payload
echo "Validating JSON..."
echo "(java.io.EOFException can be safely ignored)"
dd if=${REC} bs=1 skip=9 |
java -jar avro-tools-1.7.7.jar fragtojson \
    --schema-file ${SCHEMAS}/${CHANNEL}/${VERSION}.avsc -
</syntaxhighlight>
 
== Camus ==
Camus is the piece of software that reads messages out of Kafka and writes them to Hadoop. Camus will read the previous hour worth of data out of Kafka topics named <code>mediawiki_<name></code> at 15 minutes past the hour. This data is written out to Hadoop in <code>/wmf/data/raw/mediawiki/<name>/<year>/<month>/<day>/<hour></code>. For this to work though, Camus needs to know about the schemas to use. Camus reads schemas out of the [https://gerrit.wikimedia.org/r/#/admin/projects/analytics/refinery/source analytics/refinery/source] repository, via a git submodule to the event-schema repository. After adding a schema to the event-schema repository a submodule bump needs to be merged to analytics/refinery/source and a new version of the refinery-source jar needs to be deployed to the analytics cluster.
 
While Camus can read events from any schema version it knows about, it always writes events out to HDFS as the schema version configured in the [[phab:diffusion/OPUP/browse/production/modules/camus/templates/mediawiki.erb|mediawiki.erb]] template of the camus module in puppet. For each Kafka topic Camus reads there needs to be a line like the following:
org.wikimedia.analytics.schemas.CirrusSearchRequestSet.latestRev=111448028943
 
In order to correctly write files in their corresponding folder (partitions) camus needs to read the event timestamp. This is also configured in [[phab:diffusion/OPUP/browse/production/modules/camus/templates/mediawiki.erb|mediawiki.erb]] with the following properties:
* <code>camus.message.timestamp.field</code>: the field name (defaults to <code>timestamp</code>)
* <code>camus.message.timestamp.format</code>: the format (defaults to <code>unix_milliseconds</code>), can be :
** <code>unix_milliseconds</code>: unix timestamp in millisecond (number)
** <code>unix_seconds</code>: unix timestamp in second (number)
** <code>ISO-8601</code>: [http://en.wikipedia.org/wiki/ISO_8601 ISO_8601] format (string)
** any other string will be treated as a JAVA [https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html SimpleDateFormat]
 
With an avro schema like :
<syntaxhighlight lang="json">
{
  "name": "ts",
  "doc": "The timestamp, in unix time, that the request was made",
  "type": "int",
  "default": 0
},
</syntaxhighlight>
 
The camus properties file should contain :
<syntaxhighlight lang="properties">
camus.message.timestamp.field=ts
camus.message.timestamp.format=unix_seconds
</syntaxhighlight>
 
== Hive ==
Hive is the final destination for querying logs generated from mediawiki and stored within Hadoop. The appropriate external table needs to be created on Hive in the <code>wmf_raw</code> database. See [[Analytics/Cluster/Hive/Avro]] for more information.
 
== Oozie ==
Oozie is the Hadoop workflow scheduler. A job needs to be created within the [[:phab:diffusion/ANRE/|analytics/refinery]] repository to create partitions in the Hive table after Camus has written them out.
 
The [[:phab:diffusion/ANRE/browse/master/oozie/mediawiki/load/|oozie/mediawiki/load bundle]] defines a reusable workflow for populating a Hive table from an Avro dataset sent to Kafka via Mediawiki and Monolog. To add processing for a new schema, add a new coordinator declaration in <code>oozie/mediawiki/load/bundle.xml</code> and set <code>$channel</code>, <code>$raw_data_directory</code> and <code>$send_error_email_to</code> appropriately.
 
Example coordinator declaration for a schema named ''MyNewMediaWikiSchema'' :
<syntaxhighlight lang="xml">
    <coordinator name="load_mediawiki_MyNewMediaWikiSchema-coord">
        <app-path>${coordinator_file}</app-path>
        <configuration>
            <property>
                <name>channel</name>
                <value>MyNewMediaWikiSchema</value>
            </property>
            <property>
                <name>raw_data_directory</name>
                <value>${raw_base_directory}/mediawiki_MyNewMediaWikiSchema</value>
            </property>
            <property>
                <name>send_error_email_to</name>
                <value>${common_error_to},developer@example.org</value>
            </property>
        </configuration>
    </coordinator>
</syntaxhighlight>
 
== Initial Deployment Checklist ==
These steps are intended to be done in-order.
# Commit schema to mediawiki/event-schemas repository (e.g. {{gerrit|265164}})
# Commit submodule bump to analytics/refinery/source repository (e.g. {{gerrit|273556}})
# Commit Oozie job to create partitions to analytics/refinery repository (e.g. {{gerrit|273557}})
# Commit property changes for Camus to operations/puppet repository (e.g. {{gerrit|273558}})
# Wait for analytics to deploy new versions of refinery and refinery-source to analytics cluster
# Have a topic with the proper number of partitions created in Kafka
# Commit submodule bump along with proper configuration to operations/mediawiki-config repository (e.g. {{gerrit|273559}})
# Deploy initial mediawiki-config patch to production with a sampling rate of a few events per minute for testing
# Verify events in Kafka are as expected. Check mediawiki logs for errors.
# After enough time has passed (Camus runs once per hour) verify events are showing up in HDFS
# Create table in Hive pointing at the events in HDFS
# Submit coordinator to Oozie to auto-create partitions
# Adjust (or remove) sampling of events in operations/mediawiki-config repository
# Query your data!
 
== Schema Upgrade Checklist ==
Schema upgrades need to be performed in a controlled manner to ensure the full pipeline continues processing events throughout the change. The only schema upgrade that is tested so far is adding a new field with a default value.
# Commit new schema version to mediawiki/event-schemas repository
# Commit submodule bump to analytics/refinery/source repository
# Wait for analytics to deploy new versions of refinery and refinery-source to analytics cluster
# Alter the relevant Hive table to use the new schema version
# Commit property changes for Camus to operations/puppet repository to write out data files using the new schema
# Adjust and deploy mediawiki code to provide the new field. Note that the PHP Avro encoder will ignore new fields it doesn't know about, but will error out if a field it knows about (even with a default value) is missing.
# Commit submodule bump and schema update to operations/mediawiki-config
# After the next Camus run verify it is still writing out all of the events. If things are awry Camus may only partially write out the directory for that hour.
# Verify the Oozie job to create partitions completes. If Camus only wrote a partial directory this will fail.

Latest revision as of 17:24, 29 July 2021