Enhancing Personalization with Realtime Streaming Inference

Ronny Mathew
13 min readOct 31, 2022

--

Combining the power of streaming data with online deep-learning inference to elevate and personalize the member session experience in real-time

Previously, we talked about how to run a simplified distributed deep-learning training process on Spark. This enabled us to build a custom sequential deep learning model that takes member activities and provides personalized recommendations (even with just one product click), unlocking personalization for our new members from their second session on the site.

Recapping some key components from my previous article, we have already built training and inference data pipelines on Spark including custom UDFs (user-defined functions), deep learning modeling capabilities with TF recommenders (and other cool Tensorflow features), distributed training of Tensorflow models on Spark (with Spark Tensorflow distributor), MLFlow model registry (as our single source of truth for all models), and offline inference pipelines that pull the latest models from MLFlow to generate recommendations. This article focuses specifically on the inference side to generate recommendations in real time using these same models!

We found more untapped opportunities after deploying the offline version of our models, as the existing approach could only reach members from their second session onwards, we missed out on wowing our new members on their initial session with us. As the saying goes, you never get a second chance to make the first impression, it’s very important that we provide our new members with relevant products and experiences to make them more sticky and excited to use our site ASAP. This led us to the path of expanding our offline models and infrastructure to provide recommendations to members right from their first visit and paving the way for our platform to handle a lot more real-time opportunities!

In our org, Data science functions as a service provider to our frontend and marketing counterparts, servicing all data at API endpoints which can then be consumed by any downstream process. This was a key decision point for us to pick streaming technology to tackle real-time recommendations as it would require only a minimal amount of engineering work on other teams and gives us full control for personalizing the member experience.

Without further ado, let's get started and talk about how we got it done!

Up Next,

Realtime Data and Streams
Processing a Stream
Integrating personalization into Streaming
ML Lookup Models
Serving the model as endpoints with TF serving
Online Inference Endpoints
Final streaming architecture on AWS
Conclusion
References

Realtime Data and Streams

Data records are generated from almost every system that is in place, these could be log events, clickstream data, order data, etc. Streams capture these real-time data records (events) from these sources as soon as it is generated. These events are then pushed to a stream processing system like Kafka or AWS Kinesis.

Connecting to an activity stream gives direct and instant access to everything that is happening live on the site

This is a very powerful concept as it unlocks tons of real-time use cases where we can adapt the member experience as they browse the site, including applications like trending algorithms, session-based recommendations, session-based experiences, and more.

At RGG, we use AWS Kinesis streams to gather all collected clickstream data into streams. Connecting to these kinesis streams gives us access to the latest click data from a user. These streams are also consumed by downstream applications including data warehouses to ingest data for offline Analytics. Almost all concepts in this article can be easily adapted for Kafka streams as well.

Processing a Stream

Streams provide us access to data as they happen, but these are pure JSON payloads with complicated schemas, so we still need to wrangle, clean, and apply transformations on it (aka process them into the clean final structured data that we all love to work with). This can be achieved by connecting these streams to popular stream processing frameworks and libraries (or by building it yourself!).

Stream processing has a lot of benefits but at the same time, requires more care while setting up. Since things are all real-time, if we miss something, we mess up in real-time! So, we would need additional monitoring and alerts to make sure we capture any errors or issues as soon as it happens.

In this article, we will talk about two processing approaches with which we expanded our ML platform,

  1. Structured streaming on Apache Spark
  2. Native services on AWS (Amazon Web Services)

Structured Streaming on Apache Spark

Structured streaming on Spark enjoys almost all the benefits and features of batch processing in Spark with a few exceptions. Spark includes connectors to most of the popular streaming technologies including Kinesis and Kafka, using which we can connect to a stream just like we connect to any other data source. In addition, we can also easily reuse all of our custom UDFs, pipelines, and transformations from the offline codebase.

Structured streaming works by processing micro-batches of the streaming data into a Spark table (or Data Frame), on top of which we can then use Spark APIs to create streaming pipelines. As newer data arrives, they get appended automatically to the bottom of the table, and the downstream transformations in the pipeline are also applied.

source: Spark docs

We also get access to streaming-specific functions like windowing and watermarking which lets us create streaming windows to limit aggregation states and/or create timed or rolling aggregation windows.

Structured streaming also keeps track of the progress of processing using checkpoints which provides fault recovery in case of issues. Using these checkpoints, the streaming job will resume from the point of failure making sure each data point is only processed once.

Stream processing on Spark

Benefits of using structured streaming,

  • Reusability of almost all offline code, connectors, etc
  • Fault tolerance and recovery
  • Requires less infrastructure code
  • Schema enforcement (also easy to auto-infer schema from raw payload)
  • Write streams to any sink (including custom sinks using foreachBatch)
  • Data and Aggregation DataFrames can be cached for reuse

We also faced some challenges with this approach,

  • Stream-to-stream joins are not possible when using certain stream output modes (but we made it work with pandas UDFs)
  • End-to-end latency was 1–2s higher than the AWS approach

If interested, read more about Structured Streaming here.

Stream processing on AWS

In this approach, we use native AWS services and components and wrapped everything up into a neatly packaged AWS CloudFormation template. Since we use AWS Kinesis for creating streams, we also get a very convenient AWS feature with it, namely events.

Every new record that arrives on the kinesis stream triggers an event, which then any service can subscribe to

That means we can now have a Serverless function (Lambda) trigger on each and every incoming event to process the newly arriving data. This function will perform all the required transformations, run inference, and surface final recommendations. Since a lambda function by definition is serverless, we need an additional data store component here to keep track of the existing state of aggregation. This data store should support superfast reads and non-blocking IOs, something like a NoSQL database (AWS DynamoDB) or a really efficient cache (like Redis).

Stream processing on AWS

Things that we really liked with this approach,

  • Entirely event-driven (only runs when there are new events)
  • Super efficient and fast as we process every event as it happens
  • Serverless and very cost-effective (only pay for what we use)
  • Uses all built-in AWS pieces

Some challenges that we had to overcome,

  • Keeping track of the state of aggregations had to be handled explicitly with an extra component. This could get more complicated when we have to keep multiple aggregates
  • Schema of the input data has to be explicitly enforced and extra unit tests to capture all edge cases
  • Alarms and dashboards to make sure we were alerted in case of issues in processing or inference

Integrating personalization into Streaming

Now that we have streaming all set up, we need to do some more work to run an ML model inference in real time. Final pieces of the puzzle,

  1. ML model (and a lookup model)
  2. Serving the model as endpoints
  3. A service to host these endpoints and models

ML Lookup Models

For our first version, we use the same model that we trained for our offline use case and it lives in MLFlow Model Registry. In order to have this model run inference in real-time, we also added some extra tweaks. (I won't be focusing much on the ML side of things in this series, but want to point out a quick optimization we implemented).

During offline inference, we use two models to generate the final recommendations, a query (member) model and a lookup model.

Query (member) Model

The query model gets all the latest query level (or member level) features as input and generates a query embedding vector that encodes the input features.

Lookup model

The lookup model has a vector lookup index for all eligible live products on the site. The index is built with product embedding vectors that are created by passing product-level features through the product model for each product.

We can then query the index with the query (member) embedding vector and find products with the closest matching embedding vectors. We used google’s ScaNN index as our lookup layer, as it's already conveniently baked into the TF recommenders library and is really efficient and quick in serving up recommendations (about ~30–40ms per query).

Optimization: Combining the two into one tensorflow model

If we were to implement these two models as it is set up in our offline version, there would be two inference endpoints, one to create the member embedding vector and another to look up the member embedding and generate product recommendations. We decided to combine these two models into one joint model, so it can take features as inputs and spit out recommendations as output. This approach helped us deploy just one inference endpoint that can be queried with streaming data reducing network latencies and removing extra points of failure.

Note, There might be other use cases where it might make sense to use these as two endpoints instead. For example, if you were to use the member embedding as input to multiple models.

Serving the model as endpoints with TF serving

Tensorflow Serving provides a flexible, production-ready, high-performant serving system for machine learning models. Google also conveniently provides docker images for multiple flavors of Tensorflow serving which can be quickly pulled in as needed to build our inference image.

FROM google/tf-serving-scann:2.5.1COPY ./<local_path>/models/lookup/1 models/lookup/1

This is all we need in our Dockerfile to serve the TF recommender model using TF serving and when we build and run this docker image, we are all set with an efficient REST endpoint to run inference on inputs using our custom model. It can't get any simpler than this!

You can use this approach if you want to own the infrastructure and stand up your own online inference endpoints using Kubernetes or ECS (AWS Elastic Container Service). If that's the case, highly recommend reading up on tf serving with docker here.

Online Inference Endpoints

Now that we have our model and serving ready, we need to host our serving images into a really efficient service. We will talk about AWS Sagemaker endpoints for the rest of the article as that is what we have implemented for our first use case. There are multiple other options like MLFlow model serving which is well integrated with MLFlow Model Registry and can accomplish the same purpose. But for our custom model, we faced a few challenges and decided to go the Sagemaker way.

AWS Sagemaker Inference Endpoints

AWS Sagemaker provides a scalable way to host these TF serving endpoints and handles most of the infrastructure heavy-lifting including model deployment, environment configuration, availability, load balancing, metrics, and logs collection. The endpoints can also be triggered directly from python code using the boto3 library which means we can easily integrate it into the Spark structured streaming-based pipelines as well.

Sagemaker also provides the option of setting up serverless endpoints. With serverless endpoints, we don't need to manage instances and we get billed by requests, so it won't cost us anything for idle time.

In order to use our custom models in Sagemaker, we need to first convert them into Sagemaker models using docker containers built by AWS or custom ones. We used the custom container approach as our models using ScaNN were not yet supported by AWS out of the box. Custom docker containers should implement minimal Sagemaker SPI requirements, i.e. an HTTP ping endpoint, and the ability to consume inference API requests in a pre-defined format. A basic example of implementing a custom container is provided by AWS here.

MLFlow Sagemaker Deployment

In order to pull the models from MLFlow Model Registry and integrate it with AWS Sagemaker, we used MLFlow’s Sagemaker deployment client. This client, which is already baked into MLFlow, can deploy our MLFlow model directly into a Sagemaker inference endpoint.

MLFlow Sagemaker client provides a very convenient create_deployment method that runs all of these steps in one go, fetch the model from the MLFlow model registry, re-package it to the format expected by Sagemaker, deploy the package to S3, create or update the Sagemaker model configuration, and deploy the model and environment configuration to the Sagemaker endpoint. We can also run of all these operations one by one using AWS APIs in the boto3 client if that is your cup of tea.

Since our model required some custom flavor of serving, we had to add some additional configurations and re-packaging of the MLFlow model to make it into the pre-defined format that the MLFlow client expected.

We also found that the MLFlow Sagemaker deployment client did not support Serverless Sagemaker endpoints yet, so we had to use the instance-based endpoints for our initial deployment.

Wrapping up our TF model in Sagemaker

Final streaming architecture on AWS

Our final version for this project was an almost Serverless approach, using two Serverless (Lambda) functions, DynamoDB as our data stores and Sagemaker endpoints for real-time inference. This design was also non-blocking as the final data store always has the most up-to-date data and API calls did not have to wait for it to be ready.

Serverless system for real-time recommendations

Quick summary of the key components here,

Static Data Sources: All extra data that is required to enrich the stream data including product details, additional query details, etc.

Sagemaker Endpoints: Sagemaker endpoints host our custom Tensorflow model and provide recommendations on the given query.

Activity Aggregator: Function that processes the incoming record including data cleansing and preprocessing, calculates the new aggregate using the data point, and finally saves the new aggregate to the store.

Model Input Store: Data store that keeps track of the aggregation state for each member.

Inference Runner: Function that grabs the latest aggregate data, performs feature transformations, and invokes the Sagemaker endpoint. Perform additional post-processing on the recommendations and finally store the recommendations.

Recommendations Store: Data store which can be queried by an API gateway to fetch the latest recommendation for a member.

Here is a really quick video of all of this coming alive on our website as soon as a member registers. You can see the recommendations updating in near real-time as I click on more and more products!

Realtime recommendations in action!

Go check out ruelala.com and gilt.com today to test-drive this yourself!

Conclusion

This entirely event-driven approach integrating streaming with personalization proved to be very efficient and cost-effective for us as it took only about 1–3s roundtrip from the time someone clicked on the site to the recommendations updating.

With these new additions to our platform, we have now unlocked new capabilities using streaming and combining streaming with personalization. We can now expand our existing personalization algorithms and provide a more responsive and tailored experience to all our members. Streaming-based applications like trending by location, urgency recommendations, etc, are now a stone’s throw away.

--

--

Ronny Mathew

Reinventing member experiences with cutting edge AI/ML at Rue Gilt Groupe!