You are browsing a read-only backup copy of Wikitech. The primary site can be found at wikitech.wikimedia.org

Analytics/Systems/Jupyter/Tips

From Wikitech-static
Jump to navigation Jump to search

Custom PySpark Notebook Kernels

If you are using PySpark, you should not need to create a new custom Jupyter Notebook Kernel for Spark.

Jupyter Notebook kernels are configured in JSON files. The default ones are installed in /usr/local/share/jupyter/kernels, but you can create your own custom kernel spec files in your home directory in ~/.local/share/jupyter/kernels (--user mode) or in your Conda environment (--sys-prefix) mode.

There are a few reasons why you might want to run a custom kernel. For example: jupyter does not, by default, know how to read avro files, to do so you need a set of extra dependency jars passed along to the environment when it starts and the way to do that is by setting up a kernel that passes those.

There are two ways to launch Spark with custom kernel settings. You can either create a new kernel spec that will launch Spark with your custom settings, OR you can run a plain old python notebook and instantiate a SparkSession directly.

Launching a Custom PySpark SparkSession (without wmfdata)

For this method, you will run the following in a regular Python notebook (not a Spark specific notebook kernel). This allows you to specify the spark session configuration directly.

Note: When used in Yarn mode, this example starts a big spark application in the cluster (~30% of resources).

import findspark
findspark.init('/usr/lib/spark2')
from pyspark.sql import SparkSession
import os

master = 'yarn'
app_name = 'my_app_name'
builder = (
    SparkSession.builder
    .appName(app_name)
    .config(
        'spark.driver.extraJavaOptions',
        ' '.join('-D{}={}'.format(k, v) for k, v in {
            'http.proxyHost': 'webproxy.eqiad.wmnet',
            'http.proxyPort': '8080',
            'https.proxyHost': 'webproxy.eqiad.wmnet',
            'https.proxyPort': '8080',
        }.items()))
    .config('spark.jars.packages', 'graphframes:graphframes:0.6.0-spark2.3-s_2.11')
)
if master == 'yarn':
    os.environ['PYSPARK_SUBMIT_ARGS'] = '--archives spark_venv.zip#venv pyspark-shell'
    os.environ['PYSPARK_PYTHON'] = 'venv/bin/python'
    builder = (
        builder
        .master('yarn')
        .config('spark.sql.shuffle.partitions', 512)
        .config('spark.dynamicAllocation.maxExecutors', 128)
        .config('spark.executor.memory', '8g')
        .config('spark.executor.cores', 4)
        .config('spark.driver.memory', '4g')
    )
elif master == 'local':
    builder = (
        builder
        .master('local[12]')
        .config('spark.driver.memory', '8g')
    )
else:
    raise Exception()

spark = builder.getOrCreate()
spark.sparkContext.setCheckpointDir('/user/ebernhardson/spark-checkpoints')

Setting the foundations

Breaking this into a few pieces, first the findspark package must be installed to your Jupyter server.

!pip install findspark

findspark can then be called to bring the system spark 2.x libraries into your environment

import findspark
findspark.init('/usr/lib/spark2')
from pyspark.sql import SparkSession
import os

The SparkSession builder is then used to give the spark app a name and specify some useful defaults. In this example we provide appropriate proxies to the spark driver to download the graphframes package. Other spark dependencies can be installed in a similar fashion. For python library dependencies, continue reading.

Note: In this example the started spark-yarn application uses a medium amount of the cluster resources (~15%)

builder = (
    SparkSession.builder
    .appName('example spark application')
    .config(
        'spark.driver.extraJavaOptions',
        ' '.join('-D{}={}'.format(k, v) for k, v in {
            'http.proxyHost': 'webproxy.eqiad.wmnet',
            'http.proxyPort': '8080',
            'https.proxyHost': 'webproxy.eqiad.wmnet',
            'https.proxyPort': '8080',
        }.items()))
    .config('spark.jars.packages', 'graphframes:graphframes:0.6.0-spark2.3-s_2.11')
    .config('spark.sql.shuffle.partitions', 256)
    .config('spark.dynamicAllocation.maxExecutors', 64)
    .config('spark.executor.memory', '8g')
    .config('spark.executor.cores', 4)
    .config('spark.driver.memory', 2g)
)

Application Master

Spark can be run either in local mode, where all execution happens on the machine the notebook is started from, or in Yarn mode which is fully distributed.

Local mode is especially convenient for initial notebook development. Local mode is typically more responsive than Yarn mode, and allows prototyping code against small samples of the data before scaling up to full month or multi-month datasets. Dependencies are additionally easily handled in local mode, when running local nothing special has to be done. Dependencies can be installed with !pip install and will be available in all executor processes. In local mode the number of java threads can be specified using brackets, such as local[12] . It is recommended to provide more memory than the defaults when running in local mode, otherwise a dozen java executor threads might fight over a couple GB of memory.

Note: Using spark in local mode implies some knowledge of the resources of the local machine it runs on (number of CPU cores and amount of memory). Please don't use the full capacity of local machines! Some other users might be working with it as well :)

(
    builder
    .master('local[12]')
    .config('spark.driver.memory', '8g')
)


Local mode has it's limits. When more compute is necessary set the master to yarn and specify some limits to the amount of compute to use. Spark will execute a task per core, so 64 executors with 4 cores will give 256 parallel tasks with 64 * 8G = 512G of total memory to work on the notebook (this represent ~15% of the whole cluster). Finally some more memory is needed for the spark-driver: 2g instead of 1g as default. The driver being responsible to collect computed data onto the notebook (if needed) as well as managing the whole tasks the application deals with, some memory shall prevent it to crash too quickly.

(
    builder
    .master('yarn')
    .config('spark.dynamicAllocation.maxExecutors', 64)
    .config('spark.executor.memory', '4g')
    .config('spark.executor.cores', 4)
    .config('spark.driver.memory', 2g)
)


PySpark in YARN with python dependencies

Our anaconda-wmf package is installed on all Hadoop workers. Many dependencies are included in Anaconda. If they are, you should be able to use them in your Hadoop workers out of /usr/lib/anaconda-wmf as a conda environment.

If you need extra dependencies not yet installed by anaconda-wmf, you can create and ship a containing all the dependencies to each executor. There are two options for packaging up the virtualenv. The simplest is to use a notebook bang command to zip up the current virtualenv. For more advanced usage see venv-pack.

!cd venv; zip -qur ~/spark_venv.zip .

Two environment variables work together to tell spark to ship the virtualenv to the executors and run workers from the included python executable. First is PYSPARK_SUBMIT_ARGS which must be provided an --archives parameter. This parameter is a comma separated list of file paths. Each path can be suffixed with #name to decompress the file into the working directory of the executor with the specified name. The final segment of PYSPARK_SUBMIT_ARGS must always invoke pyspark-shell.

os.environ['PYSPARK_SUBMIT_ARGS'] = '--archives spark_venv.zip#venv pyspark-shell'

In PYSPARK_SUBMIT_ARGS we instructed spark to decompress a virtualenv into the executor working directory. In the next environment variable, PYSPARK_PYTHON, we instruct spark to start executors using python provided in that virtualenv.

os.environ['PYSPARK_PYTHON'] = 'venv/bin/python'

Ready to go

After the builder is fully configured spark can be started and utilized as normal.

spark = builder.getOrCreate()

Other working examples

An example notebook has been put together demonstrating loading the ELMo package from tensorflow hub and running it across many executors in the yarn cluster.


Sharing Notebooks

There is currently no direction functionality to view (Phab:T156980) or share (Phab:T156934) other users' notebooks in real-time, but it is possible to copy notebooks and files directly on the server by clicking 'New' -> 'Terminal' (in the root folder in the browser window) and using the cp command.

GitHub

It's also possible to track your notebooks in Git and push them to GitHub, which will display them fully rendered on its website. If you want to do this, you should connect to GitHub using HTTPS. SRE recommends not using SSH because of the risk that other users could access your SSH keys (which, if combined with a production SSH key reused for GitHub, could result in a serious security breach).

With HTTPS, by default you'll have to type in your GitHub username and password every time you push. You can avoid this by adding the following (from this Superuser answer) to ~/.gitconfig:

[url "https://YOURUSERNAME@github.com"]
    insteadOf = https://github.com
[credential]
    helper = cache --timeout=28800

This will automatically apply your GitHub user name to any HTTPS access, and then cache the password you enter for 8 hours (28 800 seconds).

You can also set up a personal access token o use for authentication over HTTPS. This will allow you to not enter your GitHub password every time. These tokens can also be limited in what access they have, e.g. they can be set to only be able to modify repositories.

HTML files

You can also export your notebook as an HTML file, using File > Download as... in the JupyterLab interface or the jupyter nbconvert --to html command.

If you want to make the HTML file public on the web, you can use the web publication workflow.

HTTP requests

Allow HTTP requests, to for example enable your notebook to clone a repo, by adding export code to your .bash_profile in the Terminal Notebook.

Sending emails from within a Notebook

To send out an email from a Python notebook (e.g. as a notification that a long-running query or calculation has completed), one can use the following code

In[1]:

  1. cf. https://phabricator.wikimedia.org/T168103#4635031 :
notebookservername = !hostname
notebookserverdomain =  notebookservername[0]+'.eqiad.wmnet'
username = !whoami

import smtplib

def send_email(subject, body, to_email = username[0]+'@wikimedia.org', from_email = username[0]+'@'+notebookserverdomain):
smtp = smtplib.SMTP("localhost")
message = """From: <{}>
To: <{}>
Subject: {}

{}
""".format(from_email, to_email, subject, body)
smtp.sendmail(from_email, [to_email], message)

# example uses:
# send_email('Jupyter notebook ready (n/t)', <nowiki>''</nowiki>)
# send_email('Jupyter email test', 'test body', 'yourname@wikimedia.org', 'yourname@wikimedia.org')

(Invoking the standard mail client via the shell, i.e. !mailx or !heirloom-mailx, fails for some reason, see phab:T168103.)

Using R in Python through rpy2

rpy2 enables access to R in Python, and is particularly useful in Jupyter notebooks in that you can for instance use ggplot2 for your visualizations. As of March 2020, installing rpy2 on the Jupyter servers is not necessarily straightforward as the latest version of it is incompatible. Using a version lower than 3.1 appears to work, e.g.

  pip install --upgrade "rpy2 < 3.1"
  pip install tzlocal

The second call is needed because this version of rpy2 fails to install the required "tzlocal" package. Once installed, you should be able to load the extension with the following code in a notebook:

   %load_ext rpy2.ipython

And once that is done, use the %R and %%R magic words to execute R code in the notebook.