Product catalog in Criteo is the most fundamental component. Processing and exploring the information inside products is an interesting topic. How to make the catalog processing easy and scalable is also a challenging topic.
Some facts: the status and the numbers
In universal catalogs, we are now conducting two major missions: catalog export and catalog enrichment. One is to process various catalogs and export to different publishers like Facebook or Google. The other one is to enrich the information inside all catalogs to provide better prospecting or prediction performance.
At Criteo, we have 15,000+ customers and most of them update their catalogs daily, but some might upload ~10 times per day. The catalog sizes also vary, from ~10 products to hundreds of millions of products. Taking the current catalog export for Facebook DPA only as an example, every day, we are processing 10,000+ catalogs, which counts as 5-6 billion products or Tera bytes of data on the HDFS, and uploading ~700-900 GB data to Facebook. With the processing for other use cases, we could easily reach yet another scale.
Therefore, we want to make the catalog processing easy and scalable.
How to make the processing easy?
Currently, all catalogs are processed using Hadoop M/R jobs. We provide a general framework to ease the development of catalog processing for developers who do not even have any Hadoop experience.
In this framework, we decoupled the data flow handling from the processing logic. The data flow handling is responsible for partitioning the catalogs to be processed in a generic way. It automatically groups the catalogs and distributes the processing to the Hadoop cluster. It does not need to care how the catalogs are processed, but just manage the data flow and guarantee the input and output catalogs are formatted following google shopping products spec, stored at advertiser level and persisted properly. On the other hand, the processing logic only cares how the products should be manipulated. The processing is dynamically injected into the framework through arguments. We also map the processing counters to Hadoop counters, which will be automatically logged and monitored.
A processing job is implemented in a three level architecture, named as workflow, strategy and factory. The workflow is the first level, which manages the input catalogs and chooses the strategy. The strategy will examine the input catalogs to determine which ones should be processed. Then, it partitions them into multiple small groups with customizable partitioning strategies. After that, it will start a thread pool and submit each group to the factory to execute the tasks simultaneously. It will keep monitoring these tasks until they are finished or timeout. The strategy will also perform the post-processing to generate events for the downstream jobs. The processing logic is defined by the third level factory. By default, the factory could initiate different M/R jobs and submit to Hadoop cluster. The factory is only responsible to construct the M/R jobs, but does not know the processing performed.
Inside the factory, we expose an interface called ProductBusinessLogicInterface, which is the only part the developers should implement. This processing is injected from the arguments and instantiated by a processor manager. So it is simply “you implement it, we run it”. The job execution plan is illustrated in Figure 1. below
How to make the processing scalable:
Scaling the processing for all Criteo catalogs is challenging. We use an internal workflow manager, where we define a catalog-processing unit as a job, and jobs are concatenated to form pipelines. The jobs could arbitrarily switch the order if they have no data dependency and the pipelines could branch out to share the same up streaming jobs.
We could start multiple instances of the same pipeline, and they are all isolated. In each instance, it is not good to start a M/R job for each catalog. Indeed, some catalogs are small, while some others are huge. So we will group a bunch of catalogs into one M/R job based on their size, and combine some files into one mapper to save the resource on the Hadoop cluster. For example, we could group ~50 or event smaller partners into on M/R job and start only few mappers to process all these catalogs. This could save ~10 times resource on the cluster.
The job itself does not need to know how the products are processed. It only manages the input and output catalogs to partition and group them appropriately. Figure 2 shows an example of a pipeline and how the catalogs are handled inside a job.
Pipeline performance tuning
Another way to make the processing scale is to improve the performance, thus reduce the processing time and resource consumption. We try to improve the performance wherever we can.
In Facebook DPA, we are using multiple feeds to support big catalogs, where some catalogs might have 200+ million products. At the beginning, we used to only generate one file and upload this big file to FB, which could take up to 4~5 hours. We are now using multiple feeds, where we break this big catalog into much smaller ones, and we could also upload these feeds simultaneously. This enabled us to process almost all catalogs within 1 hour.
Also, for Facebook DPA, as we are processing billions of products daily, this could also cause scalability problems for other Criteo services, such as Image service. Indeed, Facebook might query new images for millions of products simultaneously, and overflow our image servers, which happened unfortunately. In order to resolve this problem, we implemented a throttle method to slow down the upload speed on feed level, which significantly reduced the peak traffic to Pix servers.
Product processing performance tuning
Each product takes less then 1ms to be processed, but when we do it for billions of products every day, it’s a lot of hours and days of CPU time. If product processing time can be reduced by a fraction of millisecond, it could save us about an hour of CPU time on each run. Overall, it might be more than one CPU-day each day. It is always cool to make our planet a little bit greener by consuming less electricity.
First things first, we must understand complexity and overheads of algorithms used inside M/R jobs. Product processing requires a lot of string manipulations. Most of the string processing problems can be solved with a good hash function and Java has one inside String class. In addition, an object of String class is constant, but its hash isn’t calculated at the time of object creation. In fact, hash value is lazy initialized in the first call to hashCode() method, all subsequent calls to this method would return precalculated value. It means that if String object is used as key for a HashMap more than one time, then it’s a good idea to reuse the same object, since its hash would be calculated only once. Besides, avoiding excessive string concatenation would also improve run time.
Certainly, string hashing isn’t enough to make product processing efficient, sometimes we need to use more advanced algorithms. We use Aho-Corasick algorithm for dictionary matching. It allows finding all occurrences of words from predefined dictionary in a text. It’s very important that the complexity of this algorithm is linear. It depends only on the length of input text and doesn’t depend on the size of the dictionary. It makes this algorithm by far more efficient than naïve approach of text tokenization and searching in HashSet or using Knuth-Morris-Pratt algorithm. Decent Java implementation of Aho-Corasick algorithm can be found at ahocorasick.org.
It’s worth mentioning that Hadoop supplies string data to the mappers in objects of class Text and this class stores data as a UTF-8 encoded string. In addition, all output data should be UTF-8 encoded. But, as you know, internally Java String object stores data encoded as UTF-16, so it’s required to convert encoding to load Text into String, and converting large string from one encoding to another can be expensive operation. In universal catalog, both input and output data are in JSON format and we use Jackson library to serialize and deserialize it. Jackson can directly read and write UTF-8 encoded data without converting to UTF-16. We can benefit from it and avoid encoding conversions, which saves us about 10% of product processing time.
Post written by:
Our lovely Community Manager / Event Manager is updating you about what's happening at Criteo Labs.See DevOps Engineer roles