Rebuilding our next-gen ML Platform with the best of Spark and Tensorflow

Ronny Mathew
17 min readOct 17, 2021

--

Rue Gilt Groupe is a fashion eCommerce company located in Boston, MA, that has 50M+ members and daily flash sales on millions of products. Our Data Science team is a tight-knit group of Data Scientists and Machine Learning Engineers who work full-stack on cloud-native architectures to deliver DS and ML services, heavily utilizing Apache Spark and AWS.

This post focuses on some recent updates we incorporated into one of our stacks built for big data applications to add support for running the latest and greatest deep learning based algorithms and models. This architecture provides us with the flexibility to pick the right framework at any step of Machine Learning and unlock scalable deep learning pipelines with minimal MLOps code. At the same time, it also provides the flexibility to transition to any MLOps platform without a lot of future ML code changes.

For businesses like ours, fast prototyping and quick experimentations are key to building completely new experiences in an efficient and iterative way. It is always preferred to have tangible results before putting more resources into a certain project. This architecture provides us with that capability and lets us spend more time on research, build models, test quickly, and rapidly iterate.

We hope this article will help folks who are on the fence about moving to Deep Learning based algorithms and tackle some of the common concerns and questions while taking on such a big project.

Why do we need Deep Learning?

Deep learning is everywhere these days and we believe recent innovations in DL-based Recommenders [6,7], Sequential Models, Transformers, CNNs, Machine Translation, Named Entity Recognition, among a lot more, would unlock untapped opportunities in the retail and fashion technology space.

Why distributed Deep Learning?

Previously, we have developed DL-based models without distributed training. In this case, our approach was to use a bigger GPU instance to speed up training. When we bump up GPU, the cost is a big concern (meatier GPUs are expensive), in addition, it's also possible that we change hyperparameters (like batch_size) which might inadvertently affect training (we learned this the hard way).

Most clickstream datasets especially for training recommender systems are millions (even billions) of rows of data, this gets difficult to train on a single instance (unless you are ready to wait days for a single epoch to complete). Also in use cases like recommenders, it's not easy to learn useful models with a smaller dataset as getting a proper sample representative of the population is harder.

Even if you don't agree with any of the above, wouldn’t it be nice to have multiple cheap GPUs or even a cluster of nodes each with multiple GPUs doing the training? Granted that this comes with its own issues and complexities, these are some things we want to address in this article and make this transition as easy as possible.

So let’s get started!

Overview

The general structure of the article is to quickly go over what we have currently, describe some greatness of the two main technologies we will discuss, and how we merged them. (Feel free to skip to any section if you feel the content is already familiar)

Where we are now

Let’s take a quick look at our existing big data stack before diving further into the deep end. We leverage sci-kit learn, OpenCV, XGBoost, Spark ML, among others for our ML workloads. Spark connectors provide read and write data capabilities to/from S3, Data Warehouses, parquet files, Kinesis, DynamoDB, Elasticsearch, etc.

current stack and technologies used at different ML stages

At RGG, we use Databricks Lakehouse as our platform to run Spark jobs but the concepts in this article can be applied to any platform that can run Apache Spark jobs with tensorflow.

With that out of our way, I'll quickly explain my justification for why both Spark and Tensorflow are great ecosystems and how their marriage is going to be a happily ever after one!

Spark Awesomeness

Data Pipelines with an easy SQL interface

Spark gives us the ability to build data pipelines with easy-to-use APIs in SQL, Python, Java, R, and/or Scala and supports most commands and functions from popular SQL systems. If a certain functionality is missing in the built-in pyspark.sql.functions package, we can write our own user-defined function and register it as a spark SQL function which can then be used in any query similar to a built-in function.

In short, this allows engineers to write a UDF in any language they prefer (java/python/scala/R) and have Data Analysts/Data Scientists access it using the SQL API.

# in python
import pyspark.sql.functions as f
@f.udf("string")
def say_hello(name: str) -> str:
return f"Hello {name}"
sqlContext.udf.register("say_hello", say_hello)-- in sql
SELECT say_hello('Bob')

Pandas UDFs

This is a fairly recent addition to Spark, but in short, it allows pandas Dataframe based manipulations inside a UDF including a UDAF (an aggregate function). In this case, instead of your method getting a bunch of spark columns as input it gets a pandas DataFrame which we can then manipulate just like we would any pandas DataFrame.

@f.pandas_udf("string")
def simple_udf(iterator: Iterator[pd.Series])->Iterator[pd.Series]:
for x in iterator:
yield pd.Series(list(map(lambda r: r + "1", x)))

I point this feature out here because we used this to do batch inference on spark. This enabled us to load our deep learning model into a UDAF and run inference on a batch of the dataset much much faster on a CPU spark cluster.

This is a much bigger topic than this article, read more about pandas UDFs here.

Spark ML

Spark ML provides scalable versions of most of the common ML algorithms that perform really well on huge datasets including Regression, Trees, ALS-based Collaborative Filtering, Word2Vec, TF-IDF, Multilayer Perceptron, Naive Bayes, FP-Growth, Locality Sensitive Hashing, and Bucketed Random Projection (ANN and approx joins) among others.

It also has support for integrating with XGBoost, sklearn, and most other popular libraries.

Tensorflow Awesomeness

Tensorflow is one of the most popular frameworks for Machine Learning and Deep Learning maintained and used at Google. It has a great developer community and an amazing ecosystem of tools and libraries that extends its capabilities far beyond. TensorFlow also has amazing official guides and tutorials to get you started and work your way up to more complicated models, all on hands-on colab notebooks.

Now let's quickly talk about, some of the tensorflow tools and libraries that were of interest to us.

TF Data

TF data is a set of APIs that performs data manipulation and data loading into ML models using Tensorflow datasets which is the recommended way to load data into a tensorflow 2.x model. We can also load the tfrecord files into a dataset using a TFRecordDataset. It comes with its own version of useful methods for loading data into a dataset, batching, shuffling, caching, etc, and also does a lot of great optimizations behind the scenes. For instance, if you are training on a GPU, it uses the CPU to load the data from files and prefetches the next n batches of data ready to use in memory. But how do we pick n for prefetching, you ask? Use tf.data.AUTOTUNE to automatically adapt to the number of workers in your cluster, it is as simple as that!

dataset = (tf.data.TFRecordDataset(filenames=[list of filenames]))
.batch(1000)
.shuffle(10000)
.prefetch(tf.data.AUTOTUNE))

Another benefit of using tf.data is while running in distributed mode, you just need to add a one-line code change to enable your tf dataset to work for multiple workers.

# There are two shard policies, DATA and FILE
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset.with_options(options)

Learn more about tf.data here.

TF Distribute

tf.distribute is part of core TensorFlow which deserves a mention as its key to a lot of things we are going to discuss next. Tensorflow uses Distribution Strategies to enable different types of distributed training like synchronous/asynchronous, on GPU/TPU, etc. If you don't care about strategies while developing on a single-node or local, Tensorflow automatically assigns the default strategy to your code under the hood.

This means that to make your single-node development code to later work with distributed training is as simple as adding these few lines initially.

# we can also add more conditions here, eg check for num workers
if tf.config.list_physical_devices('GPU'):
my_strategy = tf.distribute.MirroredStrategy()
else: # Use the Default Strategy
my_strategy = tf.distribute.get_strategy()
with my_strategy:
# wrap all the training code within this scope
...

This is just the tip of the tf.distribute iceberg. For a full walkthrough, check this guide out.

TFx

TFx is an end-to-end machine learning platform for ML applications that handles everything from data engineering to modeling to batch inference and/or online model serving. We will only discuss very few components of TFx in this article as we primarily use Spark to perform Data Engineering.

TF Transform: tf.transform is a great TFx component that can be used instead of Spark, in case you want to use TFx for feature engineering and manage the workload on kubeflow or similar platforms and/or stay in Tensorflow land without crossing over to Spark.

Tensorflow Model Analysis (TFMA): Another interesting library in TFx that is very useful for model validations both in an offline setting and for monitoring issues and drifts in an online production model. In an offline use case, we can utilize this library’s metrics for model management and automated promotion of a model to Production. We are more interested in using this library in the future for the online use case to monitor model performance and drift once we move to online model serving for real-time use cases.

TF Serving: Serving is now a part of TFx and is the core Tensorflow component responsible for deploying and serving an ML model online. It reads a SavedModel file and serves it at a given high-performance REST endpoint. The whole thing can be dockerized and served as a scalable endpoint using container services, an autoscaler and an API gateway.

Check out the tf serving docs here.

There is also a docker container provided from Tensorflow that we used to prototype our online serving component here. This Docker-based approach is perfect for us as it fits seamlessly into our existing AWS ECS-based architecture for serving offline recommendations.

Learn more about TFx here. As expected, you will find really good tutorials and guides in this section.

TF Recommenders

TF recommenders is a fairly new but great TF extension library that simplifies building Deep learning based recommender systems. It provides friendly wrappers for models, metrics, and losses. This library simplifies building models for Retrieval (narrowing the entire corpus to a few thousand candidates), ranking (ranking the top candidates to a handful with more complex architecture), multitask recommenders (learn from multiple objectives), Deep Cross Networks (capture explicit feature interactions between items), and more.

This library uses google’s scann approximate nearest neighbor engine for multiple purposes. A scann engine is provided as a layer in this library which can then be used to build approximate nearest neighbor lookup indexes saved as a SavedModel. This SavedModel can now be loaded into an online service using TF serving described above for real-time lookups of vectors. This layer is also used within the library to compute top K Accuracy metrics directly from the embeddings by comparing the query embedding with all product embeddings.

This is a developing library so things are still changing a lot. We noticed some issues already, but we really like the simplified usage of the scann indexes and the top K metrics features.

Learn more about TF recommenders here.

TF Agents

TF Agents is another extension library for Tensorflow that enables building bandit and Reinforcement learning based agents. We are developing a new smart experimentation platform using contextual bandit algorithms using this library.

Check out more on TF agents here.

TF Hub

A collection of the latest pre-trained deep learning models for text, image, video and audio, and more. Once you decide on a model (eg, BERT), you can easily pull it into any existing model as a layer with a simple command like,

# model path can be found in tf hub 
# sometimes we also need to load preprocessing layers
bert_path="https://tfhub.dev/tensorflow/bert_en_uncased_L-12_H-768_A-12/4"
bert_embeddings = hub.KerasLayer(bert_path, trainable=False)

Check out the entire model hub here.

This concludes our section on Tensorflow. You can see the whole plethora of Tensorflow extension libraries here. Every time I look at this page it keeps growing and the Tensorflow community is really active and amazing!

MLFlow

MLFlow is a great tool for experiment tracking and model management among other things. It makes it easy to compare models with different hyperparameters and simplifies the process of picking the best model. We also use MLFlow Models and Model Registry to manage models and automagically promote them to production based on metrics making sure the newly trained models meet the minimum requirements. MLFlow model registry is our single source of truth for the models and all their versions. All downstream applications query MLFlow to fetch the latest production model and use as needed during inference.

Bridging the gap between worlds

We have talked about these great tools and how beneficial it would be to include them in our stack. In this section, we will start exploring some smaller components that would help us close the gap between them.

Read and write to TFRecords from Spark

It is now possible to write Spark DataFrame as tfrecords files and read tfrecords into a Spark DataFrame. TFRecord is a Tensorflow specific file format that stores a sequence of binary records as Examples. This switch from writing parquet files to tfrecords provides us with the capability of plugging in Spark at any point of our ML lifecycle and plays seamlessly with TFx since we are now dealing with the same file format at inputs and outputs from various stages.

# Write tfRecords as Example from a Spark Dataframe
some_df.write.format(“tfrecords”)
.option(“recordType”, “Example”)
.mode(“overwrite”).save(save_path))
# You need to define a schema for Example
def read_tfrecord_sample(example):
feature_description = {
'feature0': tf.io.FixedLenFeature([], tf.int32),
'feature1': tf.io.FixedLenFeature([], tf.int32)
}
return tf.io.parse_single_example(example, feature_description)
# Then we can load data into a TF Dataset using the above
# This method takes in a list of filenames to read from
dataset = tf.data.TFRecordDataset(filenames=list_of_files)
.map(read_tfrecord_sample)
# You can also read the tfRecords back into a Dataframe
read_df = spark.read.format(“tfrecords”)
.option(“recordType”, “Example”)
.mode(“overwrite”).load(save_path))

Read more on this package here.

Spark Tensorflow Distributor

This package is part of the TensorFlow ecosystem that lets us run tf.distribute inside of Spark jobs. In this approach, we delegate cluster management and communication between nodes and distributed TensorFlow Ops to Spark.

Usage of this is pretty straightforward, after installing the package to your cluster, define Tensorflow data loading and model building code in a train method. This train method can be defined outside of the current file and imported as needed allowing us to develop our single node code first and then migrate to a distributed runner quickly. We just need to specify the number of workers in the cluster and the train method to the distributed runner.

num_workers = 10
# add local_mode=True for local testing
# you can also specify another strategy in the train method and turn on custom_strategy=True
MirroredStrategyRunner(num_slots=num_workers).run(train)

The greatness of this library is the amount of work that is automated under the hood including the designation of chief and worker nodes and communication between nodes. It also supports custom_strategy to use other tf.distribute strategies (in case you have multiple GPUs on each node or if you want to use TPUs). To turn on custom_strategy, add the new strategy in the train method and add custom_strategy=True as a parameter to the runner.

how spark tensorflow distributor works
how Spark tensorflow distributor works under the hood

This library also lets us do distributed training on CPU nodes (obviously it will be slower but you can now use a lot of cheaper nodes and spot instances). In our experiments, performance was 3x faster with a base GPU instance (AWS g4dn.xlarge) than on a similar priced CPU (c4.2xlarge) instance. This would be a place where we need to tune the type and number of nodes to optimize the performance of the cluster depending on budget and other requirements.

Read more about Spark Tensorflow distributor here. This guide is a great resource to learn more about all distribution strategies supported by TensorFlow and how to use them as a custom strategy in the distributed runner (Spoiler Alert: It’s very simple to switch strategies).

tf.distribute works by specifying a tf_config environment variable on each node (read more about it here), this config also includes a node index and node 0 is the chief node. There are cases when we need to figure out what the role of a node is, this will become important when we want to treat callbacks and other operations on chief different than on workers. We don’t want all the workers (over)writing the model checkpoints to the same path! Recommendations from the tensorflow guide is to have the chief write to a persistent store like S3 and all other workers write to some temp path).

Since Spark is handling the setting up of the variable and the node index in the case of the spark distributor, we can figure out which one is the chief using these lines of python code.

if 'TF_CONFIG' in os.environ:    
tf_config = json.loads(os.environ['TF_CONFIG'])
node_index = tf_config['task']['index']
is_chief = node_index == 0
print(f"Node Index: {node_index}, Is Chief: {is_chief}")

This is still not perfect and I want to point out some issues we tackled

Since the training happens on the cluster and the code is completely wrapped inside the runner method, we don’t have access to the final model at the end of training. This means that if you don't explicitly handle this, you might be running training for days but there would be no way to retrieve your model (yikes!). Tensorflow recommends using the ModelCheckpoint or BackupAndRestore callbacks for this. We used ModelCheckpoint callback,

best_model_save_callback = ModelCheckpoint(      filepath=best_filepath, # Somewhere on s3 or persistent storage
monitor=monitor_metric,
save_best_only=True)

Another thing we noticed while saving the model was that if we added the callback to just the chief node, distributed training crashes as the workers move on from the callback stage and start training for the next epoch while the chief node is still saving the model and the workers think it’s dead and crashes the whole training.

callbacks need to be similar for chief and workers

A workaround we found for this issue was to add a dummy callback to all workers which will save the model to some temp path while the chief saves to a persistent store (like S3), this makes sure all nodes take a similar time to complete an epoch.

best_model_save_callback_dummy = ModelCheckpoint(    filepath="some_random_path",    
monitor=monitor_metric,
save_best_only=True)

callbacks = [best_model_save_callback if is_chief \
else best_model_save_callback_dummy]

The training would also crash in the event that a worker dies or the cluster loses a worker as there is no built-in recovery mechanism in this distribution strategy. Tensorflow recommends saving checkpoints using the same approach as above at each epoch to make sure we can restart training if this happens. Read more about it here.

In addition to this approach using tf distributor (works only for TensorFlow), we can also run Distributed deep learning training on spark using horovod which supports both TensorFlow and PyTorch. If you want to learn more about it, these are good starting points horovod and petastorm.

MLFlow Callback

We wrote our own simple MLFlow callback to make sure we log metrics after each epoch. At the time of writing this article, MLFlow autologging (enables automatic logging of metrics and params, duh.) did not work for us in distributed mode. Once again due to the distributed nature of training, we need to make sure only one of the nodes (chief) will write data to MLFlow which can be accomplished with the same technique we used for model callbacks.

class MlflowLogging(tf.keras.callbacks.Callback):  
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def on_train_end(self, logs=None):
super().on_train_end(logs)
def on_epoch_end(self, epoch, logs=None):
super().on_epoch_end(epoch, logs)
keys = list(logs.keys())
for key in keys:
mlflow.log_metric(key, float(logs[key].numpy()), epoch)

Completing the picture

With all of these new pieces ready, we integrated them into our existing stack adding the capability to run any deep learning based workloads in a scalable and flexible manner.

updated stack now extended with Tensorflow

In our initial use case, we decided to keep using our existing Spark-based Data Engineering pipelines with the key difference being the training and inference datasets written now as tfrecords (vs parquet) files, which would then allow tf.data to pick these up and load into the model using a TFRecordDataset.

I'll go over some pros and cons of this approach to wrap up,

Pros:

  • Leverage existing read/write connectors in Spark (Most data warehouses, MySQL, S3, Kinesis, etc)
  • Parallelized batch inference in Spark with CPU clusters
  • Combine the best of both ecosystems with the flexibility to choose one over the other at any stage of the ML lifecycle
  • Run distributed deep learning loads on Spark clusters with fault tolerance
  • Ease to move from single-node development to distributed training
  • Use of any extension libraries from the TF ecosystem
  • Central storage, versioning, and management of models using MLflow
  • Any downstream application can now fetch the latest model from MLFlow
  • Great for smaller teams who want to focus on ML and less infrastructure

Cons:

  • Distributed training with MirroredStrategy is not perfect especially with recovery (in the case of a dead worker). Parameter server strategy could be a better alternative in this case
  • Cryptic and messy Tensorflow error messages
  • Tensorflow is still a work in progress and a lot of things mentioned in this article are still experimental (especially with distribution strategies)
  • APIs between Keras and Tensorflow are in a state of flux right now

Conclusion and Next Steps

We believe the addition of this new key piece of technology into our stack enables us to build the next generation of our great products and services that will put our members in the driver's seat. Some of the initial use cases are,

  • Personalization using Deep learning based Recommenders
  • Better text representation with BERT (and other transformers)
  • Product Tagging and Catalog Management with NER and text
  • Image embeddings with Autoencoders and CNN based CV models
  • Bandits and Online recommendations with Reinforcement Learning

We are primarily a Tensorflow shop at the moment and like how tf.distribute handles distribution strategies and (almost) how well it integrates with Spark.

We are currently exploring more additional pieces of technology that we believe would be great additions to our stack including Online inference using scann indexes deployed on ECS and streaming pipelines which would unlock real-time recommendations.

I will follow up with another article with a sneak peek at the sequential (LSTM based) model we recently built and deployed using this stack and share some of our learnings from this experience. This model has already won the hearts of our members (>30% lift in click-through rate) proving that deep learning based solutions are indeed the way to go and should be factored into the next generation of our products.

The future is exciting and with endless possibilities!

Read the next part of this series where we implemented real-time streaming recommendations!

PS: This is my first ever medium post, feel free to reach out if you have any questions or any comments and suggestions on this article!

--

--

Ronny Mathew

Reinventing member experiences with cutting edge AI/ML at Rue Gilt Groupe!