You are browsing a read-only backup copy of Wikitech. The live site can be found at


From Wikitech-static
< Analytics‎ | Systems‎ | Cluster‎ | Spark
Revision as of 20:14, 3 March 2021 by imported>Ottomata (→‎Testing PySpark)
Jump to navigation Jump to search

Upgrading Spark

Since we don't maintain active Spark master processes (these are launched upon request per application by YARN), upgrading Spark is mostly just installing the new .deb package.

However, there are 3 gotchas to be careful about:

  • Spark Assembly spark.yarn.archive
  • Spark YARN Shuffle Service
  • Spark Oozie Sharelib

These gotchas make a rolling upgrade in YARN difficult.

Read on for more information. Here is a general upgrade plan:

  1. Update spark.version in any pom.xml files and rebuild and deploy all Spark jobs jars.
  2. Stop all Spark jobs (Refine, Oozie, etc.)
  3. Upgrade spark .deb package everywhere
  4. Run Puppet everywhere
  5. Restart all hadoop-yarn-nodemanagers
  6. Update all Spark job jar versions to the ones built with the new spark.version
  7. Update all oozie_spark_lib in Oozie job properties in refinery (and elsewhere?)
  8. Restart all Spark jobs (Refine, Oozie, etc.)

Spark Shuffle Service

YARN runs the Spark YarnShuffleService as an auxilliary service. Spark uses this to shuffle data around between tasks. The spark-X.Y.Z-yarn-shuffle.jar is symlinked by Puppet into Hadoop's classpath, and then enabled in yarn-site.xml:


The Spark YarnShuffleService runs inside of NodeManagers. After upgrading the Spark .deb package on NodeMananger boxes, a Puppet run will cause the new spark-X.Y.Z-yarn-shuffle.jar to by symlinked into Hadoop's classpath. Once this is done, all NodeManagers need to be restarted. This is unlikely to work in as rolling upgrade; Spark tasks need to talk to a Spark YarnShuffleService of the same Spark version that they were launched with.

This makes it difficult to test Spark upgrades in YARN, but we don't have a better alternative.

Spark Assembly

Spark is configured with an 'assembly' archive file via the spark.yarn.archive config. The Spark assembly is a zip file of all of the jars Spark uses to run. If spark.yarn.archive is configured, a Spark job running in Hadoop will use these .jars to launch Spark processes. This allows for a faster startup time when running in YARN; the local Spark dependencies don't have to be shipped to Hadoop every time you launch Spark.

After upgrading the Spark .deb package on an analytics 'coordinator' node, Puppet should automatically upload the new /usr/lib/spark2/ file to HDFS in /user/spark/share/lib.

The value of spark.yarn.archive is set by Puppet in spark-defaults.conf to the currently installed version of Spark. This means that a Spark job will use the Spark Assembly that matches the version of Spark where the Spark job was launched from.

Oozie Sharelib

Oozie jobs are configured to use pre-uploaded 'sharelibs' in HDFS. (same concept as assembly, but unzipped). Configured sharelibs are added to the Oozie workflow classpaths. Our Spark oozie jobs are configured to use this via the oozie_spark_lib property; which ends up setting the Oozie oozie.action.sharelib.for.spark property.

After upgrading the Spark .deb package on an analytics 'coordinator' node, Puppet should automatically upload a new versioned oozie sharelib. You can confirm this by running oozie admin -shareliblist.

The value of oozie_spark_lib should be updated for every Oozie job once Spark is upgraded and the shuffle service jar has been loaded in NodeManagers.

Testing PySpark

# Print the python executable and the PYTHONPATH used on a YARN PySpark worker:
def python_interpreter():
    import sys, platform
    return "{}: {} {}".format(platform.node(), sys.executable, sys.path)
rdd = spark.sparkContext.parallelize(range(2), 2).mapPartitions(lambda p: [python_interpreter()])

# test numpy on remote workers:
import numpy as np
rdd = sc.parallelize([np.array([1,2,3]), np.array([1,2,3])], numSlices=2)
rdd.reduce(lambda x,y:,y))

# test pyarrow on remote workers:
import pyspark.sql.functions as F
df = spark.range(0, 1000).withColumn('id', (F.col('id') / 100).cast('integer')).withColumn('v', F.rand())

@F.pandas_udf(df.schema, F.PandasUDFType.GROUPED_MAP)
def pandas_subtract_mean(pdf):
    return pdf.assign(v=pdf.v - pdf.v.mean())

df2 = df.groupby('id').apply(pandas_subtract_mean)