Improving System Efficiency by Taking Meaningful Actions From Data
Author: Shawn Hillyer, Software Engineer
This is a story of how we identified and fixed an issue of a key component of our business – improving its efficiency by over 70%.
Software engineers have the vital responsibility of ensuring the stability of the systems they design and maintain. This is especially important at Skillz, a massive marketplace platform that services over 4 million mobile game tournaments every day. We tailor the user experience with a data-driven approach that creates a record for each player in each game played.
We want to cache important information about every player, so we use Amazon S3 to stage that information and then index it in ElasticSearch for quick retrieval.
However, in August, we noticed this data was taking so long to load that we were unable to finish loading records before the next scheduled load began. To preserve the positive player experience, we had to quickly identify and act on solutions.
Architecture and Design
Our Java microservices often interact with user data, which helps us define and optimize the experience of each competitor on the Skillz platform. This data is transient, and we keep it up-to-date for all players on the system on a regular frequency. As player behavior evolves, we provide experiences to match those behaviors and preferences.
Before 2017, user data was stored in MySQL. However, inserting records into MySQL while simultaneously reading from those same records had an adverse impact on the system as a whole. Concerned about performance, we began searching to identify data solutions that could ingest large amounts of records while minimizing impact on query or search times.
By early 2017, we migrated the data from MySQL into ElasticSearch. At times, the total hourly records created can top 20 million. These files are stored in Amazon S3 buckets corresponding to an ElasticSearch index into which records are inserted.
After calculating the initial 20 million records and loading them into ElasticSearch, subsequent data queries calculate the differences between previous results and send only those changes to S3. While copying records from S3 into ElasticSearch, we also compare each record against any existing records in ElasticSearch and conditionally act on this event.
Processing these records is handled by a Springboot microservice that receives a request to load the JSON records. We begin by fetching a list of S3 file paths for an index, then download each file locally, enqueue any required push notifications, and upsert the records to ElasticSearch via the Bulk API.
Identifying the Problem
We were already recording the number of records inserted into ElasticSearch as part of this data pipeline. An AWS Cloudwatch Metric and an associated alarm would trigger if no data points were received within a threshold. Additional alarms would alert us that something failed earlier in the pipeline, but the problem wasn’t a failure in this case — it was a delay in receiving the expected data.
- An AWS Cloudwatch alarm was triggered when the number of updated records was not reported in the expected timeframe.
- Runtime for the loading process historically took ~35-50 minutes leading up to this event, but an increased number of records had pushed the load time past our threshold for an hourly loading process
We uncovered interesting data in our logs and metrics:
- ElasticSearch monitoring tools showed that our Central Processing Unit (CPU) Utilization was occasionally peaking at 100% and boosting to 110% for short periods of time. Normal utilization began low at 30%, grew to about 80%, then spiked for short periods to 100-110%.
- AWS Monitoring tools for the microservice loading the records was not under any excessive load, sitting comfortably at 10% CPU Utilization and no other symptoms of memory, storage or other resource constraints.
- Microservice logs show the start and finish times for each load. We noticed that occasionally two “Started elastic search loader” messages were logged before the first’s corresponding “Finished elastic search loader” message was logged. The overlap was about 5 to 10 minutes.
We graphed the count of certain messages in our logs and confirmed that the number of updates into ElasticSearch doubled during that time frame, which suggested our ElasticSearch cluster was a likely bottleneck.
As soon as we were confident that the data pipeline was beginning to overlap, we quickly identified a list of action items to address the problem. Some action items could be addressed immediately, but others would take time to re-engineer. It was important to brainstorm solutions in a cross-discipline team with members from multiple areas of expertise: data analytics, product, and engineering.
Data Analytics and Product
These teams acted immediately by eliminating queries from the data aggregation process at the top of the pipeline. They focused solely on queries that generated records for testing and research purposes. This had an almost immediate impact, reducing our record count so sufficiently that our pipeline no longer overlapped.
CPU Utilization was exceeding our allotted virtual CPU on the cloud provider for periods of time. We realized that our trend of holding at 80% utilization across two clusters was a problem if one of two nodes might fail.
Thus, we planned to upgrade the ElasticSearch hardware in a staging environment to test the system with two major changes:
- Add another node in a third availability zone to evenly split our index across three primary shards (one per node).
- Upgrade the hardware profile by doubling the memory and virtual CPU allotment.
Configurable changes are also a great place to find optimizations. The size of the batches being sent to ElasticSearch’s Bulk API were presumably load tested and optimized at some point in the past, but we took another pass at this. We made the batch size configurable via a JMX command and discovered that our old batch size of 1,000 records was not optimal. Increasing to 5,000 showed significant gains in our load time, and 10,000 resulted in marginal gains with some increased search latency.
The net effect of these tests reduced CPU load to a peak of 40% and reduced total loading times by 35% to 45%. Rolling out the same changes to our production environment bought more time to improve.
Our next task involved refactoring the code in our Java Springboot microservice to facilitate multi-threading of the process without changing the side effects or final state. A fair amount of refactoring was involved as the original implementation was not made with parallelization in mind. However, the core algorithm could definitely be reorganized to work in a multithreaded environment.
When refactoring the Java classes responsible for performing ElasticSearch loading, we first had to identify its dependencies and verify that they were thread-safe. We also aimed to break the algorithm down to identify where the application spent more time waiting than processing records.
The existing class had a number of data structures managing shared state during the serial processing of records. Rather than pass data from method to method, these variables were all modified as side effects during the algorithm:
- A shared Set used to ensure one-time delivery delta events
- A shared StringBuilder used to build up the JSON payload for ElasticSearch bulk update requests
- A shared List of records to retrieve from ElasticSearch for diffing purposes
- A shared List of records in the current batch for diffing purposes
We reorganized the structure and logic of the classes such that there was, if possible, no shared state across threads. Alternatively, we could use shared-state variables if necessary. Our preference was to minimize side-effects on member variables and instead pass the data structures where needed.
Ultimately, we determined that each file could be processed in its own thread with only one piece of shared state — the Set of users that had already sent a push notification during this loading event. To safely maintain a Set across multiple threads, we swapped out the HashSet with a Java ConcurrentHashMap and passed it into each thread as a method parameter. A ConcurrentHashMap was necessary because a single user may have multiple records with similar or identical changes in each. We wouldn’t want a user receiving 10 identical, or even similar, notifications for a single event simply because they play 10 games on the platform.
Our algorithm relies on taking data from one store (Amazon S3) and writing it to another (ElasticSearch). These steps require that we interact with libraries from third parties, which also needed to be thread-safe. The clients included:
- An ElasticSearch REST client, which is used for Bulk indexing records into ElasticSearch.
- Our own convenience wrapper around the ElasticSearch REST client for safe searches against our ElasticSearch cluster, which is used to fetch records from ElasticSearch and compare against the new records as we processed each one.
- An AmazonS3 client used for fetching file names and downloading files.
Fortunately, each of these clients was already thread-safe. Amazon and ElasticSearch recommend using a single client across threads. Our own wrapper, meanwhile, simply added convenience methods but no state beyond the client itself.
We opted to manage the threads using Spring’s ThreadPoolTaskExecutor. We decided to rewrite the JSON Indexer by breaking it into distinct classes. One limitation of using Spring’s @Async annotation is that a class cannot invoke an async-annotated method on itself without breaking the proxying mechanism used to achieve multithreading. We also aimed to reduce the complexity of the class by removing its knowledge about how files were retrieved.
First, we moved every method that accessed the Amazon S3 Client into a new Remote File Provider Service. This allowed the JSON Indexer to be ignorant of the source of its data. Reducing the cognitive complexity of a class can make it much easier to maintain, unit test, and debug.
Next, we split the JSON Indexer class apart. We moved the S3 client from the original JSON Indexer into a new one. The goal was to make the original JSON Indexer responsible for creating and managing the threads.
After refactoring the code, the JSON Indexer algorithm looks like this:
- Ask JOSN Indexer Service for a list of files that it should process.
- Add an invocation of the @Async method on Json Indexer Service to a thread pool.
- Await the results of the thread pool.
The only point where we are asynchronously waiting in the JSON Indexer is fetching a list of file names from Amazon S3. This is a single call for each of the two buckets we are interested in, and thus not a viable candidate for multithreading.
The core algorithm for asynchronously handling a single file is also straightforward:
- Download a JSON file from the remote file provider service.
- For each row in the file:
- Add the contents of the file to a batch.
- Search ElasticSearch to compare the new record to the existing record.
- If the batch is full, send the batch to ElasticSearch.
- Send any remaining records in batch.
In this algorithm, there is a lot of waiting on clients to complete requests. In Step 1, the process waits while the file downloads to the local file system. In Step 2, the process waits once when we search ElasticSearch and occasionally when a batch is sent to ElasticSearch. Finally, in Step 3, we wait for the final remainder batch to be sent to ElasticSearch.
The effort of refactoring the algorithm and reorganizing the class hierarchy resulted in code that was thread-safe, easier to understand and maintain, and more modular. We tested the changes in our staging environment again and were pleased to see that run times were reduced significantly.
The following changes resulted in notable system improvements:
- Reducing the number of records and upgrading hardware as a stopgap reduced average load times from 65 minutes to 50 minutes.
- Revising the size of Bulk requests (from 1,000 to 5,000 records) into Elasticsearch reduced a 60-minute process to about 40 minutes.
- Refactoring the Spring application to leverage multithreading and asynchronous logic led to a 45% increase in the average number of records indexed per second when going from 1 to 2 threads and an additional 25% increase from 2 to 3 threads.
The graph below depicts the number of records processed per second over a month’s time in our staging environment. We enabled multithreading on September 13 using just two threads then added a third thread on September 20. Both changes noticeably improved our staging environment.