AppDynamics Application Intelligence provides the business and operational insights into application performance, user experience and business impact of software applications. At the core of the application intelligence platform is the metric processing engine that helps record, track & compare performance metrics. As application complexity explodes and many businesses break their monolith application to microservices, the number of metrics we needed to capture, store and process explode exponentially as well.
Metrics collection and processing from heterogeneous environment
AppDynamics Application Performance Management (APM) agents automatically instrument millions of lines of code across thousands of application tiers in production environments within minutes, all with minimal overhead. These APM agents provide support for a wide variety of languages and frameworks including Java, .Net, Node.js, PHP, and Python. Browser and mobile agents also collect end user monitoring metrics from browser and mobile devices.
Additionally, server monitoring agents collect server metrics, and AppDynamics extensions bring in metrics from heterogeneous applications and infrastructure components.
The metrics gathered by these agents are sent periodically to a metric processing engine. The metric processing engine aggregates the metric in different dimensions and creates multiple views. These views are used extensively for reporting; a very sophisticated rule engine also continuously evaluates policies, on these aggregated views.
Challenges with initial metric service implementation
MySQL powered our initial metric processing engine. The MySQL engine worked well for our customers and our SaaS deployment, but as metrics grew exponentially, we risked hitting the physical boundaries of MySQL with the number of metrics we needed to process. The cost of processing also skyrocketed, so to reduce cost of storage we rolled up data in a time dimension, providing less resolution for older metrics. That allowed us to expire highly granular collected metrics much earlier and retain time rolled up data for longer periods of time. However, we started receiving feedback from customers to increase this limit even more to accommodate different use cases.
Requirements for metric service re-architecture:
To address these challenges, the AppDynamics Application Intelligence Platform’s metric service needed to:
- increase the total metrics processing capabilities by many folds.
- increase data retention period for all resolution levels.
Once these requirements solidified, we soon realized the best solution would be to move to a new platform that could support us on a much greater scale. In addition to addressing these two key requirements we also had to enhance the metric service in following areas:
Real time Cluster Rollup
Cluster-wide performance metric aggregation is a crucial feature of AppDynamics APM solution. Let’s take an example of an E-Commerce Service that may consist of multiple services like inventory service, order processing system, web Tier service, invoicing system, etc. Each of these services could be hosted independently on a single server/node or a distributed cluster consisting of hundreds or thousands of physical nodes. Using the information from application, services, tiers and nodes, AppDynamics creates a topological visualization for the entire application as shown below:
AppDynamics agents are deployed on each of the physical nodes or machines, as part of initialization each agent registers the node or service, informing the metric processing engine which tier or service and which application they belong to. Once registration is completed, agents start collecting metrics on the node and periodically send the gathered information to the metric processing engine. Each metric payload from each node contains the information of which tier and application they belong to. Based on the cluster (node, tier, and applications) information, metric processing engines aggregate the metrics coming from different nodes to their parent tier and then to their parent application. Metrics collected from each processor machine need to be aggregated cluster-wide based on a logical topology of our customer applications. The reporting views must also be in real time.
Support for batch processing
The platform itself had to support batch-processing jobs on the collected metrics to be able to roll up in a time dimension. Apart from this, we needed to be able to run analytics on the collected metrics.
Fault tolerant no single point of failure
The platform needed to work in High Availability mode from day one.
Zero downtime software upgrade
As part of metric processing, we also calculate seasonality based baseline and standard deviations. If we bring down our system for software upgrades, it will create data gaps in the system, corrupting baselines and standard deviation calculations.
Also, the new metric service needed to accommodate the behavior of metric data and type of operations required to create different views:
- Stream of time series, append-only, immutable data sets
- Requires ingesting and processing time stamped metric data that are appended to existing records rather than overwriting them
- State is determined by the natural time-based ordering of the data
- Requires multiple views in different dimensions
- All operations on the data are idempotent; view creation is an idempotent operation
- View creation should be near real time (NRT)
Options for Metric Service Re-architecture
Keeping all these requirements in mind, we started exploring various NOSQL databases and looked at a couple of time series database implementation.
Notable time series databases are OpenTSDB from StumbleUpon and Druid from MetaMarkets. We didn’t select these solutions, as our requirements were more customized and could not fit these solutions as is. However, both these solutions are excellent, and one should try these out before writing anything new.
Next, we started evaluating various Key-Value store databases that provide fault tolerance and scale-out features. Two mainstream open source NOSQL databases are used widely throughout the industry – HBase and Cassandra.
HBase VS Cassandra
Based on our evaluation of use cases, we selected HBase over Cassandra. The main reason for this selection was sharding strategy by HBase in ordered partitioning of its key ranges, allowing us to make longer time range queries and efficiently apply aggregates on the result at a single shard level.
It’s critical to understand what continuous key range is and how it impacts the design and behavior of a system. HBase key-values are stored in Tables, and each table is split into multiple regions. A region is a continuous range within the key space, meaning all the rows in the table that sort between the region’s start key and end key are stored in the same region. More details here in this helpful article from Hortonworks.
Cassandra employs a different strategy. It arranges the Cassandra nodes into a consistent hash ring, and any key stored in the system is converted to Murmur3 hash code and then placed in a node in the ring. This article from Datastax provides additional information on sharding strategies.
Why is it so important for us to have continuous key ranges?
Let’s go over our use cases one more time. AppDynamics Application Performance Dashboard (as shown below) provides a topological view of an entire application and its performance metrics. Dashboards provide information on call flow within the different tiers of the application. Another powerful feature provided through the dashboard are overall performance statistics like the rate of calls per minute, average response time, the rate of errors per minute, exceptions per minute, etc.
The topological graph, program flow, and the performance statistics are pulled from the metrics store based on the time range selected by the user.
A typical dashboard aggregate query would look like this:
Select SUM(totals calls) from METRICS where metricID = m1 and time in range (t1, t2), t1-start time, t2 – end time.
Select AVG(response time) from METRICS where metricID = m1 and time in range (t1, t2), t1-start time, t2 – end time.
Select SUM(total errors) from METRICS where metricID = m1 and time in range (t1, t2), t1-start time, t2 – end time.
Let’s start with a very simple design: make the metric ID as a row key. All metrics received against the metric would be stored as columns labeled at the time the metric received, against the row key. Both HBase and Cassandra would support this simple design. All information stored in a row key will be kept in a single shard both in HBase and Cassandra. If we query for a metric for a time range, we can retrieve all the data points and apply our aggregates as discussed above.
There is a physical and theoretical limitation to the maximum number of columns that can be stored in a single row key. In our metric processing system, every minute we could receive 1 data point or more data point, over a period of 1 day we may receive thousands of data points and over a week it would go into millions and over a year into billions. The design of having metric ids as row key and storing all the metric data points against it as a column is not practical.
Another much-recommended design strategy is – bucketing the row keys by time range. That is to store the data as columns against a row key received during a time period. Create a new row key for the next time range. For example, if we bucket the row key by 1 hour from 12:00 AM to 12:00 PM we will have 12-row keys as shown below,
Here is a helpful article on how model time series data in Cassandra, that uses the technique mentioned above.
One may argue that in Cassandra we could create a composite key, using a partition key and a cluster key, using the partition key as metric ID and time buckets as cluster key and the metrics received can be stored as columns.
Cassandra’s storage engine uses composite columns under the hood to store clustered rows. All the logical rows with the same partition key get stored as a single, physical wide row. Using this design, Cassandra supports up to 2 billion columns per (physical) row.
Storing the same keys and values would be very different in HBase. There is no concept of a composite key in HBase; a key is a row of bytes. All keys are lexicographically sorted and stored in shards called regions by range. If we design the key efficiently, we can store several years of metric data in a single shard or region in HBase.
This is a very powerful concept and was very useful for our use cases; we are able to perform aggregate operations on significant time ranges– from 6 months to two years in the region server process itself and only the aggregated value could be returned to the calling program, reducing network traffic significantly. Imagine how inefficient a query would be if we had to collect data from 10 different shards (machines) into a client program and then apply aggregates on the values before returning the final aggregated value to the calling program.
At the time of evaluation Cassandra also lacked several key features that were readily available in HBase. Some of the key features were:
- HBase allowed us to arrange metrics with different resolutions to expire with different retention periods. We put them in different column families and set the TTL at column family level. Cassandra stores the TTL info with every cell, repeating the same info for same cell types (using the same expiration or retention time), making storage very bulky. Cassandra 3.0 has the fix for this which came out in November 2015.
- HBase provides consistency over availability by design. This was a critical component for our team as our policy engine evaluates metrics continuously. You can read more on the CAP theorem from this article on Dzone. More on CAP theorem at Dzone.
Cassandra is catching up fast and has overtaken HBase in many areas. Here is another article comparing HBase and Cassandra.
In my next blog, I’ll address the challenges we face with HBase and suggest ideas for improvement. Looking forward to your feedback and comments.