At Mozilla, we’ve switched to using Spark and Python to compute with our data. I would stay away from the data that was used with Spark because the performance was unreliable and the formats available were all over the place. Moreover, searching for Spark/Python help entirely depends on how good Google is at its job.

Recently the data engineers created a flat one row per submission per profile called main_summary[1]. With much fewer columns this was a data set I could wrap my head around. I wanted to try some spark DataFrame queries and for posterity i shall keep the code here.

[1] https://gist.github.com/mreid-moz/518f7515aac54cd246635c333683ecce

Compute Unique Values of Number of Pings Per Profile

 from pyspark.sql import SQLContext
 from pyspark.sql.types import *
 from pyspark.sql.functions import *
 bucket = "telemetry-parquet"
 prefix = "main_summary/v2"
 mainpingspq = sqlContext.read.load("s3://{}/{}".format(bucket, prefix), "parquet")
 mpqs = mainpingspq.sample(False,0.2)
 gr2 = mpqs.groupby("client_id").agg({"client_id": 'count'}).select(col('count(client_id)').alias('pinglen'))
 gr3 = gr2.groupBy("pinglen").count().collect()
 gr3 = [x.asDict() for x in gr3]
 gr3 = sorted(gr3, key=lambda s: -s['count'])

Reformat the data so that a record is an array of submissions per profile

 from operator import add
 mpqs = mainpingspq
 gr2 = mpqs.groupby("client_id").agg({"client_id": 'count'}).select(col("client_id"),col('count(client_id)').alias('pinglen'))
 clientexclusion = gr2.filter(gr2.pinglen > 15000)
 clientexclusion.write.save("telemetry-test-bucket/sguhatmp/tmp/clientexclusion1.parquet")
 clxList = sqlContext.read.load("telemetry-test-bucket/sguhatmp/tmp/clientexclusion1.parquet").collect()
 clxList = [ x.client_id for x in clxList]
 
 from operator import add
 def combFunc(u,v):
     u.append(v)
     u = sorted(u, key=lambda g: g.subsession_start_date)
     l = len(u)
     if l < 5000:
         t = l
     else:
         t = 5000
     return  u[ -t:]
 
 def redFunc(u,v):
     u = u + v
     u = sorted(u, key=lambda g: g.subsession_start_date)
     l = len(u)
     if l < 5000:
         t = l
     else:
         t = 5000
     return  u[ -t:]
 
 ## 1. exclude the massive clients
 x = mpqs #.sample(False,0.0001).cache()
 t1  = x.rdd.filter(lambda s: s.client_id not  in clxList)
 t2 = t1.map(lambda d: (d.client_id,d))
 t3 = t2.aggregateByKey([], combFunc, redFunc)
 t3.saveAsSequenceFile("telemetry-test-bucket/sguhatmp/tmp/newformdata.sq")

This failed despite using 61 c3.4xlarge compute nodes.