top of page

ML Inference Pipeline with Celery and Redis

Building a Scalable ML Inference Pipeline with Celery and Redis

Generic Dashboard of a Inference pipeline

Our goal was to build a inference pipeline that could scale without breaking the bank and connect to multiple ML models (mostly classifiers, extractors, semantic search .... We needed a inference pipeline with multiple gated conditions and also highly scalable.


The pipeline has to process millions of requests, categorizing thousands of products across a diverse range of categories, each with its unique set of requirements. The backbone of our solution was a sophisticated orchestration of Celery and Redis, combined with scalability of Google Kubernetes Engine (GKE).



Initial Requirements and Challenges


Our primary goal was to design a system capable of:

  1. Handling Multiple Models: Our solution needed to seamlessly integrate and manage multiple machine learning models to cater to the diverse requirements of different product categories.

  2. Dynamic Request Routing: The system was required to intelligently route requests to various models based on the outcomes of preceding model inferences.

  3. Queue management: Models and processes in pipeline had their own processing time and so we saw queue build ups. Need to push more processing when this happened.

  4. Scalability: With the expectation of handling millions of requests, scalability was not just a requirement but a necessity.


The complexity of the problem we were tackling—extracting precise product information from unstructured text across thousands of categories—necessitated a multifaceted ML approach. Each category had its own set of unique requirements, demanding a highly adaptable and dynamic pipeline.


ML Inference Pipeline Architecture


High level architecture

Our solution architecture comprised several key components. The models are processes were all wrapped up as celery workers. Redis was the messaging bus. We had multiple named queues in celery for routing requests. We also created separate queues for passing data and requests. We also created centralized processes for logging, database and had a separate queue for these.


  1. The entry point for any text blob was our custom-built classification model. Its primary function was to ascertain the category of the product mentioned in the text, thereby determining the subsequent processing path.

  2. Models: We have a bunch of models for Preprocessing, Extractors, Classifiers

  3. Dynamic Routing Mechanism: Based on the classification results, the data was then directed to one of several specialized models designed for extracting category-specific product information.

  4. Validation and Data Storage: Extracted data featuring Manufacturer Part Numbers (MPNs) or Global Trade Item Numbers (GTINs) were forwarded to a validation model. Conversely, data lacking these identifiers were sent directly to our data store.

  5. Final Aggregation: Post-validation, the enriched and verified product information was consolidated into our data store, ready for downstream consumption and analysis.



Celery and Redis for Distributed Task Management


At the heart of our pipeline’s operational efficiency was Celery, an asynchronous task queue/job queue based on distributed message passing. Celery’s flexibility in handling a multitude of tasks concurrently, coupled with its compatibility with various messaging systems, made it an ideal choice for our needs.


Celery is not the first thing that come to mind when building model pipelines - but hey it works

Redis Cloud served as the backbone for Celery, acting as the message broker and ensuring high availability, persistence, and instantaneous access to data.


We the need for efficient task routing based on model inference results, we use multiple Celery queues. This setup enabled us to categorize tasks into distinct pathways, ensuring each request is processed by the appropriate model, in the correct sequence. We also used separate queues for data and logging. Any worker or process in the pipeline could save/retrieve data using a separate queue and the same applied to logging information. We plan is to replace this logging to standard google logs.


Scaling with Google Kubernetes Engine (GKE)


To encapsulate our models and their dependencies, we opted for Docker, a platform that enables applications to be bundled into containers—lightweight, standalone, and executable software packages. This approach not only facilitated ease of deployment but also ensured consistency across development, testing, and production environments. The containerized models were then stored in Google Cloud Container Registry, a private storage for Docker images on the Google Cloud platform.


To achieve the desired level of scalability and manageability, we deployed our Dockerized model workers on GKE, a managed environment for deploying containerized applications. GKE allowed us to automatically scale our resources based on CPU utilization and the number of requests in the queue. This elasticity was crucial for handling the unpredictable volumes of incoming requests, ensuring that the pipeline could dynamically adapt to varying workloads without compromising performance.



Takeaways


Celery is pretty robust and easy to work with. Hand-offs between fastapi and celery is pretty easy. The beast with any distributed system is really testing all of it. We started with scripts to monitor redis queues and then got Flower - a pretty nifty Celery monitoring tool. This was the antacid we needed on some late nights.


We love redis. We have used it as a in memory vector store before and now as message queue here. It's a workhorse and the redis cloud makes it easy to deploy and manage. One issue we faced was celery workers don't disconnect their redis connection fast enough and redis cloud has limitation on number of connections, so new workers could not connect and we started seeing massive queue build ups. . We implemented a mechanism for celery workers to close their redis connections but this needs some care as it may lead to data loss.


Its not possible to scale GKE based on celery queues or workers - which would be ideal in our case. As we have more more queue build up those workers.


Airflow was an option but team felt we had internal learning already with celery and redis gave the flexibility to achieve our intended goal.

bottom of page