You are browsing a read-only backup copy of Wikitech. The live site can be found at wikitech.wikimedia.org

Analytics/Cluster/Streaming: Difference between revisions

From Wikitech-static
Jump to navigation Jump to search
imported>Ottomata
 
imported>Milimetric
(Milimetric moved page Analytics/Cluster/Streaming to Analytics/Systems/Cluster/Streaming: Reorganizing documentation)
Line 1: Line 1:
This page will eventually become a really great document on how to use Hadoop streaming with Wikimedia data in Hadoop.
#REDIRECT [[Analytics/Systems/Cluster/Streaming]]
 
But, alas, for now it is a bunch of notes.
 
= What is Hadoop Streaming =
 
 
= Sample Code =
 
 
== Count number of lines in SequenceFiles ==
<source lang="bash">
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D          mapreduce.output.fileoutputformat.compress=false \
-input      /wmf/data/raw/webrequest/webrequest_mobile/hourly/2014/10/01/00 \
-output      /user/otto/tmp/streaming_test/out/12 \
-mapper      'wc -l' \
-reducer    "awk '{s+=\$1} END {print s}'" \
-inputformat SequenceFileAsTextInputFormat
</source>
 
== Word Count with Python ==
 
Mapper code.  Needs to output <tt>key\tval</tt> for reducer to automatically process.
 
<tt>mapper.py</tt>
<source lang="python">
#!/usr/bin/env python
import sys
import re
 
# read lines from stding
for line in sys.stdin:
    print(line)
    # split the line by whitespace (spaces or tabs)
    words = re.split(r'\s+', line)
    # Print 'word\t1'.  That is, the word, then a tab, then the number 1.
    print('\n'.join(["{0}\t1".format(word) for word in words if word]))
</source>
 
Reducer code.  Mapper output will be automatically sorted before being handed to the reducer, so all you gotta do is sum up the 1s!
''Note:  Otto thinks this might not be correct.  Hmmmmmm.''
 
<tt>reducer.py</tt>
<source lang="python">
#!/usr/bin/env python
import sys
 
current_word  = None
current_count = 0
 
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    if not line:
        continue
 
    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)
    count = int(count)
 
    if current_word == word:
        current_count += count
    # else we've moved on to the next word,
    # output the count we summed for this word.
    else:
        if current_word:
            print('{0}\t{1}'.format(current_word, current_count))
        # Move on to the next word.
        current_word  = word
        current_count = count
 
# output the last word if needed!
if current_word == word:
    print '%s\t%s' % (current_word, current_count)
</source>
 
Test it out on the shell:
 
<source lang="bash">
# Note that we need to sort the output of the mapper when testing in the shell.  Sorting is done
# automatically during the Combiner phase of MapReduce in Hadoop.
echo 'the is a word that appears many times in english like this maybe the the the the the' | ./mapper.py | sort | ./reducer.py
a 1
appears 1
english 1
in 1
is 1
like 1
many 1
maybe 1
that 1
the 6
this 1
times 1
word 1
</source>
 
 
Now try it in Hadoop Streaming.  You'll need to tell <tt>hadoop-streaming.jar</tt> to ship both <tt>mapper.py</tt> and <tt>reducer.py</tt> along with the MapReduce job it is going to create.
 
Put a file in HDFS to test this on.  Your input data needs to be a directory, not a file.
<pre>
$ cat defenestration.txt
Defenestration is the act of throwing someone or something out of a window.[1] The term was coined around the time of an incident in Prague Castle in the year 1618. The word comes from the Latin de- (down or away from) and fenestra (window or opening).[2] Likewise, it can also refer to the condition of being thrown out of a window, as in "The Defenestration of Ermintrude Inch".[3]
While the act of defenestration connotes the forcible or peremptory removal of an adversary, and the term is sometimes used in just that sense,[4] it also suggests breaking the windows in the process (de- also means removal). Although defenestrations can be fatal due to the height of the window through which a person is thrown or throws oneself or due to lacerations from broken glass, the act of defenestration need not carry the intent or result of death.
 
$ hdfs dfs -mkdir /tmp/defenestration
$ hdfs dfs -put defenestration.txt /tmp/defenestration/
</pre>
 
Launch a Hadoop Streaming job:
 
<source lang="bash">
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D      mapreduce.output.fileoutputformat.compress=false \
-files  mapper.py,reducer.py \
-input  /tmp/defenestration \
-output  /tmp/defenestration.wordcount \
-mapper  mapper.py \
-reducer reducer.py
</source>
 
The output should be in /tmp/defenestration.wordcount
<pre>
$ hdfs dfs -cat /tmp/defenestration.wordcount/part*
"The 1
(de- 1
(down 1
(window 1
1618. 1
Although 1
Castle 1
Defenestration 2
Ermintrude 1
Inch".[3] 1
Latin 1
Likewise, 1
Prague 1
The 2
While 1
a 3
act 3
adversary, 1
also 3
an 2
and 2
around 1
as 1
away 1
be 1
being 1
breaking 1
broken 1
can 2
carry 1
coined 1
comes 1
condition 1
connotes 1
de- 1
death. 1
defenestration 2
defenestrations 1
due 2
fatal 1
fenestra 1
forcible 1
from 2
from) 1
glass, 1
height 1
in 5
incident 1
intent 1
is 3
it 2
just 1
lacerations 1
means 1
need 1
not 1
of 11
oneself 1
opening).[2] 1
or 7
out 2
peremptory 1
person 1
process 1
refer 1
removal 1
removal). 1
result 1
sense,[4] 1
someone 1
something 1
sometimes 1
suggests 1
term 2
that 1
the 14
through 1
throwing 1
thrown 2
throws 1
time 1
to 3
used 1
was 1
which 1
window 1
window, 1
window.[1] 1
windows 1
word 1
year 1
</pre>
 
== Shipping Python Modules  ==
The above example didn't use anything other than Python scripts and standard Python libraries.  Often, you will want to run Python code in Hadoop that uses custom modules not available on the Hadoop worker nodes.  You can take advantage of Python's automatic [https://docs.python.org/3/library/zipimport.html zipimport] module to zip up your module and ship it along with your streaming job with the -files option.
 
This example will use halfak's [https://github.com/halfak/Mediawiki-Utilities Mediawiki-Utilities Python module].
 
<source lang="bash">
# clone the module from github:
git clone https://github.com/halfak/Mediawiki-Utilities.git mediawiki_utilities
# compress the mw directory into a zip file.
cd Mediawiki-Utilities && zip -r ../mediawiki_utilities.zip ./mw/ && cd ../
</source>
 
We'll use the <tt>xml_dump</tt> submodule to parse a xmldump of revision content and compute counts per contributor username.
 
Our mapper needs to output the username as the key and a 1 as the value for each page's revision.
 
<tt>revision_usernames_mapper.py</tt>
<source lang="python">
#!/usr/bin/env python3
 
import sys
# If there is a .zip file on the Python sys.path,
# the python importer will automatically use
# zipimporter to allow you to import from this module.
sys.path.append('mediawiki_utilities.zip')
from mw.xml_dump.iteration import Iterator
 
def revision_usernames(dump):
    """Yields each page's revision's username in the dump."""
    for page in dump:
        for revision in page:
            yield(revision.contributor.user_text)
 
if __name__ == '__main__':
    dump = Iterator.from_file(sys.stdin)
    for username in revision_usernames(dump):
        # Emit username \t 1
        print('{0}\t1'.format(username))
 
</source>
 
Since all we are doing is counting keys, we can use the same <tt>reducer.py</tt> from the Python wordcount example above.
 
Unfortunately, this example will only work with a single mapper, because XML cannot be split and parsed correctly.  We'll have to figure out something fancier soon...
 
<source lang="bash">
# /user/otto/dump_small is a 10000 line test xml dump.
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D          mapreduce.output.fileoutputformat.compress=false \
-D          mapred.map.tasks=1 \
-files      revision_usernames_mapper.py,reducer.py,mediawiki_utilities.zip \
-input      /user/otto/dump_small \
-output    /user/otto/tmp/dumps_username_count \
-mapper    revision_usernames_mapper.py \
-reducer    reducer.py
</source>
 
== Using WikiHadoop to parse XML dumps ==
[https://github.com/whym/wikihadoop WikiHadoop] is a custom InputFormat for Mediawiki xmldumps.  Build it, and then provide it as a -libjars and -inputformat when you submit your hadoop streaming job.
 
In this example, we will still use Aaron's Mediawiki-Utilities to  parse the data from WikiHadoop.  However, since WikiHadoop returns individual page records without being wrapped in a full xml document, we'll need to wrap the individual page xml with the expected XML header and footer.
 
<tt>revision_usernames_wikihadoop_mapper.py</tt>
<source lang="python">
#!/usr/bin/env python3
import sys
import io
 
# If there is a .zip file on the Python sys.path,
# the python importer will automatically use
# zipimporter to allow you to import from this module.
sys.path.append('mediawiki_utilities.zip')
from mw.xml_dump.iteration import Iterator
metaXML = """
<mediawiki xmlns="http://www.mediawiki.org/xml/export-0.5/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mediawiki.org/xml/export-0.5/ http://www.mediawiki.org/xml/export-0.5.xsd" version="0.5" xml:lang="en">
<siteinfo>
<sitename>Wikipedia</sitename>
<base>http://en.wikipedia.org/wiki/Main_Page</base>
<generator>MediaWiki 1.17wmf1</generator>
<case>first-letter</case>
<namespaces>
<namespace key="-2" case="first-letter">Media</namespace>
<namespace key="-1" case="first-letter">Special</namespace>
<namespace key="0" case="first-letter" />
<namespace key="1" case="first-letter">Talk</namespace>
<namespace key="2" case="first-letter">User</namespace>
<namespace key="3" case="first-letter">User talk</namespace>
<namespace key="4" case="first-letter">Wikipedia</namespace>
<namespace key="5" case="first-letter">Wikipedia talk</namespace>
<namespace key="6" case="first-letter">File</namespace>
<namespace key="7" case="first-letter">File talk</namespace>
<namespace key="8" case="first-letter">MediaWiki</namespace>
<namespace key="9" case="first-letter">MediaWiki talk</namespace>
<namespace key="10" case="first-letter">Template</namespace>
<namespace key="11" case="first-letter">Template talk</namespace>
<namespace key="12" case="first-letter">Help</namespace>
<namespace key="13" case="first-letter">Help talk</namespace>
<namespace key="14" case="first-letter">Category</namespace>
<namespace key="15" case="first-letter">Category talk</namespace>
<namespace key="100" case="first-letter">Portal</namespace>
<namespace key="101" case="first-letter">Portal talk</namespace>
<namespace key="108" case="first-letter">Book</namespace>
<namespace key="109" case="first-letter">Book talk</namespace>
</namespaces>
</siteinfo>
"""
 
 
def revision_usernames(dump):
    """Yields each page's revision's username in the dump."""
    for page in dump:
        for revision in page:
            if (revision.contributor):
                yield(revision.contributor.user_text)
            else:
                yield('_Unknown_')
 
if __name__ == '__main__':
    xml_file = io.StringIO()
    xml_file.write(metaXML + sys.stdin.read() + '</mediawiki>')
    xml_file.seek(0)
    dump = Iterator.from_file(xml_file)
    for username in revision_usernames(dump):
        # Emit username \t 1
        print('{0}\t1'.format(username))
 
</source>
 
Submit the job:
<source lang="bash">
# /user/otto/dump_small is a 10000 line test xml dump.
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-libjars    /srv/deployment/analytics/refinery/artifacts/wikihadoop.jar \
-D          mapreduce.output.fileoutputformat.compress=false \
-D          mapreduce.input.fileinputformat.split.minsize=300000000 \
-D          mapreduce.task.timeout=6000000 \
-files      revision_usernames_wikihadoop_mapper.py,reducer.py,mediawiki_utilities.zip \
-inputformat org.wikimedia.wikihadoop.StreamWikiDumpInputFormat \
-input      /user/otto/dump_small/enwiki_dump.10000.xml.bz2 \
-output      /user/otto/tmp/dumps_username_count.$(date +%s) \
-mapper      revision_usernames_wikihadoop_mapper.py \
-reducer    reducer.py
</source>

Revision as of 13:44, 7 April 2017