At Observe.AI, we work with some of the largest contact centers in the world, empowering them with AI-based voice insights to better understand and enhance agent performance. In a nutshell, our typical workload includes pulling our customers’ stored calls, processing those calls, and reporting the call analysis and findings to our customers through our app.
To run queries and search on those transcriptions, we use Elasticsearch as our secondary datastore. Additionally, while processing customer calls, we use Elasticsearch to run keyword searches.
This article describes how we scaled this search step to suit our workflow.
Currently, the processing of a call is divided into multiple steps (see Figure 1 below). Each step is a separate service that consumes packets from a message queue, and then processes it and pushes it back to an appropriate queue based on the next step in processing.
Note: We omitted steps that aren't relevant to the scope of this post (like redaction).
For every call pulled from partner integrations, we create a pipeline processing object containing the list of pipeline steps in order. This object goes to an initial queue based on the first step of the pipeline (i.e ASR) once processing is done, and is then sent to the next step by looking at the list of pipeline steps.
Based on the requirement, we use MongoDB, Amazon S3, and Elasticsearch to save the results of each step. If the processing fails, the message is retried a certain number of times before being sent to a dead-letter queue. When the pipeline steps array is empty, the call gets marked as Complete in Elasticsearch. Marking a call as Complete means call processing has been completed and it is now available for reporting to end-users.
A brief explanation of each step:
Because each of the services read from queues, we can easily monitor the rate of processing for each of the above steps by looking at the rate at which messages are deleted.
As the number of customers and as a result, number of agents grew, we identified the “keyword searching” step to be the bottleneck in the above architecture because it limited our processing speed (number of calls processed/hour).
There are two main reasons we conduct phrase searches on a call’s transcript using Elasticsearch:
1. We can easily add and remove Elasticsearch available analyzers and tokenizers in the indexes as it suits the requirements.
2. It makes the phrase search for an individual document consistent with the search results that end-users receive when they search phrases within calls (which is also powered by Elasticsearch).
The number of phrase searches done for a call can vary depending on the customer. However, on average, assume around 1500 phrases can be searched on each call. We limit 100 phrase match queries in each Elasticsearch query, which comes to around 15 Elasticsearch queries for every call.
If we process around a million calls in a day, this would mean around 15 million search calls to Elasticsearch each day (which comes out to be ~170 searches/second). However, the call processing load varies in a day and reaches its peak at times during which our Elasticsearch service cluster runs at a speed of 600–700 searches per second.
We are using a hosted service for Elasticsearch with three machines (AWS M5) each running with 60 GB RAM. We have already doubled the size of each machine in our cluster, which increased the throughput of keyword searches (from around 400 to 600 searches/second) while also increasing the cost of service by ~4x. At this point, we started looking for other solutions to increase our throughput.
We realized that we needed to make the keyword searching step “horizontally scalable” to increase our pipeline processing throughput as more customers are added into the system.
In the “keyword searching” step, each search query is run on a single document. We realized that we can use this constraint to our advantage. We can run local dockerized Elasticsearch machines, which will have an empty temporary index. The keyword searching step indexes the document before searching, searches on the document, and then deletes and moves on.
Suppose we have an Elasticsearch cluster with N machines (8 GB RAM, 2 CPU) each running an Elasticsearch deployment independently (1 shard, no replica). There is a wrapper service in front of it that exposes an API (private) called getESHost (which basically returns one of the N hosts based on RoundRobin on N machines).
Before processing any call, the “keyword searching” service requests a host from the getESHost API. Once the service gets a host, it indexes the call on Elasticsearch, searches all phrases on it and when done, deletes the document from Elasticsearch (deletion can be done asynchronously by pushing to a separate queue). As a result, the approximate size of the index is nearly zero at all times and the searching works much faster. This solution uses separate local temporary Elasticsearch deployments to power the “keyword searching” service.
Once deployed to production, we were able to get around 200 phrase requests/sec from each Elasticsearch machine and were able to surpass our current prod throughput of around 600 phrase searches per second to ~800 requests/sec with just 4 machines.
More importantly, this removed the load on our prod Elasticsearch Clusters which were used by reporting APIs. This allowed us to shrink our Elasticsearch Cluster to half of the current size (back to 30GB RAM machines). We added APIs to the Elasticsearch wrapper to increase/decrease the Elasticsearch machines as required and we're now working on adding alerts so we can autoscale this Elasticsearch cluster.
We also tried experimenting with Elasticsearch’s rolling indexes functionality, but our workflow often requires the Elasticsearch documents to update as the machine learning algorithms are experimented on and improved (typically less than a month old calls).
The above changes also added some complexities in the system:
However, the above two are easier to handle and occur much less frequently, which is a reasonable tradeoff.
The code for the Elasticsearch Wrapper is agnostic of the business, however, uses it MongoDB for saving data. We will look to open source this in the future.
Aakash Bhardwaj is a senior engineer and leads the Call Processing Pipeline team at Observe.AI. He has previously worked on scaling architectures for multiple B2B companies and has a passion for solving engineering problems.