Spark Accumulators

By: CriteoLabs / 18 Jun 2018

Prerequisite:

A basic understanding of Spark big-data processing framework.

Please refer to Blog Post: Spark Custom Partitioner to get a detailed explanation of the business problem.

Introduction

This blog covers the counters-at-scale problem. We have the need to track 20 counters per partner to keep track of partner centric enrichment metrics. For a 1000 partner job it’s 20K counter. Accumulators are to Spark what Counters are to MapReduce. Accumulators are variables that are only ‘added’ to through an associative and commutative operation and can therefore be efficiently supported in parallel. Please refer to Spark programming guide for more details on Accumulators. I am mentioning key relevant points here as a premise.

  • Tasks can add but cannot read its value. Only the driver program can read the accumulator’s value, using its value method.
  • Spark provides built-in support for accumulators of basic types like Long. Programmers can create their own types by subclassing AccumulatorV2.
  • Each task/partition gets a copy of the accumulator and driver takes care of merging the counters.
  • Drawback: In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.

Accumulator 1.0

At first, we used LongAccumulator per counter. It scaled to 50 partners, but beyond that we reached memory resource limitations. We crossed the limits of spark.driver.maxResultSize – Limit of total size of serialized results of all partitions for each Spark action. Clearly, Spark was not happy with 20K Java object replicated for each partition.

Accumulator 2.0

We extended AccumulatorV2 class to create a MapAccumulator – Key being the counter name’s hashCode and Value being the long counter value. The default int hashCode of Java object has a drawback. The probability of int hashCode collisions will reach 10% at 20K distinct counters. Thats why we used MurmurHash long hashCode of the counter name instead. We also maintain a MultiMap<CounterGroupName, CounterName> to register grouped counter names used during Enrichment. On job completion we use this register to retrieve counters for logging and metrics reporting.

The reason it scales is because

  • Each partition is segregated by partner. Therefore, returning only 20 counters. Making it easier on executor memory footprint, network transfer and accumulating in driver. 20K counters will translate to 20K * (8+8 bytes) = ~300KB after accumulating in driver. On task completion it will only network transfer 20 * (8+8 bytes) = ~300 Bytes.
  • There is only one accumulator object to be registered with SparkContext.

 


Post written by :

Pravin Boddu
Senior Software Engineer – Palo Alto

 

  • CriteoLabs

    Our lovely Community Manager / Event Manager is updating you about what's happening at Criteo Labs.