You are browsing a read-only backup copy of Wikitech. The live site can be found at

User:Joal/Clickstream historical

From Wikitech-static
Jump to navigation Jump to search

This page describes my experiments with the Clickstream dataset.

First, launch a spark-2 scala shell from any of the stat100[46] machine (with nice limitations for resource sharing):

spark2-shell --master yarn --conf spark.dynamicAllocation.maxExecutors=64 --driver-memory 4G --executor-memory 8G --executor-cores 2 --jars /srv/deployment/analytics/refinery/artifacts/refinery-job.jar

Then do some work:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{StructType, StructField, StringType, LongType}
import org.apache.spark.sql.Row

val wiki = Seq(

val dates = Seq(

val wiki_dates = wiki.flatMap(w => => (w, d))).par

val rawSchema = StructType(
  StructField("page_from", StringType, false) ::
  StructField("page_to", StringType, false) ::
  StructField("link_type", StringType, false) ::
  StructField("count", LongType, false) :: Nil

val augSchema = rawSchema.
  add(StructField("date", LongType, false)).
  add(StructField("wiki_db", LongType, false))

def readDf(wiki: String, dt: String): DataFrame = {
    option("delimiter", "\t").
    withColumn("date", lit(dt)).
    withColumn("wiki_db", lit(wiki))

val emptyDf = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], augSchema)
val clickstreams = wiki_dates.
  map(w_d => readDf(w_d._1, w_d._2)).
  fold(emptyDf)((d1,  d2) => d1.union(d2)).

// Takes a long time as it has to read the full dataset (unzip is not parallel :(
clickstreams.groupBy("wiki_db", "date").count().sort("wiki_db", "date").show(100, false)

  where("page_from IN ('Global_warming', 'Climate_change') AND page_to IN ('Global_warming', 'Climate_change')").
  sort("date", "page_from", "page_to").
  show(1000, false)

  where("page_from IN ('Smog', 'Air_pollution') AND page_to IN ('Smog', 'Air_pollution')").
  sort("date", "page_from", "page_to").
  show(1000, false)