Instana

 View Only

Monitor ClickHouse ingestion with Instana

By Timo Hillerns posted Fri March 22, 2024 06:39 AM

  

Co-authored by Yiquan Zhou

The challenge

Instana uses ClickHouse to store distributed traces. In our largest region, we ingest up to 1 million calls per second and store about 100TB of tracing data per day. It’s a big challenge to scale and fine-tune the cluster properly to ingest such amount of data, with high insert throughput and minimum latency. We have run into various issues over time. In order to better understand where the bottleneck is and what can be optimized, we have improved Instana ClickHouse sensor to gain insights into some key metrics on how data is written to ClickHouse. In this blog post, you can find out how easily you can monitor your own ingestion pipeline with Instana.

How does insert work in ClickHouse? 

Distributed and local insert

When running a ClickHouse cluster with multiple shards, we can create a Distributed table which does not store any data on its own. When we insert some data into the distributed table, it will split the data according to the defined sharding key and forward it to the local table of different shards. You can also take care of the sharding logic in your application and insert directly into the local table. 

In both cases, it’s interesting to monitor how much data is inserted into each ClickHouse node and if it’s well balanced. Balanced data ensures even distribution of load and optimized latency of distributed queries.

Parts creation and merges

Each insert results in creation of a new data part, which is a physical file or files storing values of different columns of the inserted rows. A data part can also be fetched from another replica when data is not inserted directly into the target node. In the background, ClickHouse periodically merges smaller data parts belonging to the same partition into a bigger one.

If inserts are too small and too frequent, the background merge process may not be able to catch up with newly inserted data. This could lead to too many parts error. It’s a critical error which could lead to data loss and even slow down and crash the entire cluster. 

It’s interesting to monitor the number of parts creation to detect insert spikes or data balancing issues. Keeping an eye on the merge performance should also allow us to detect merge issues earlier and act quickly before the cluster running into an uncoverable state. 

How does Instana gather metrics from ClickHouse

Instana auto-discovers and monitors ClickHouse server processes with the ClickHouse sensor. The sensor provides visibility for built-in dashboards for ClickHouse clusters and nodes. Instana allows you to get useful metrics down to table level. To get this you need to do two changes

1. The sensor relies on data from two system tables provided by the ClickHouse server:

  • system.query_log - The system.query log contains detailed information for each executed query.
  • system.part_log - The system.part_log contains detailed information about merged data parts.

Both need to be enabled in ClickHouse global server settings. If enabled, the ClickHouse sensor can provide full insights into INSERT and MERGE operations.

2. In the Instana Agent configuration you need to specify the tables you want to monitor. A simple configuration for Instana is shown below and will enable detailed metrics for the calls table:

...
com.instana.plugin.clickhouse
  monitorTables:
    - shared.calls

 Once configured, the ClickHouse sensor will query the system tables to get 25 detailed metrics concerning INSERT and MERGE performance for the calls table across all nodes in the monitored ClickHouse cluster. Behind the scenes we fetch metrics, such as average durations, sum of rows and total bytes for read, write, merge and download operations. The metrics querying happens every 5 seconds for a sliding window over the last minute.

Why is that? One minute is the default timeout for CH queries. Also most internal operations such as merges finish usually within that timeframe. The one minute window guarantees that we catch metrics for short running queries that fall in between metric fetches. Additionally, one minute is an easy mental model for comparisons.

Example SQL query

The following SQL query shows how to fetch the average duration and number of read rows for INSERT queries.

  SELECT
    arrayJoin(tables) AS table,
    avg(query_duration_ms) AS avg_duration_ms,
    sum(read_rows) AS sum_read_rows
  FROM system.query_log
    WHERE (event_time > subtractMinutes(now(), 1)) AND (query_kind = 'Insert') AND (type != 'QueryStart')
  GROUP BY table

How can you use the metrics?

You can use these table metrics to create custom dashboards. Here is one example with some key metrics for one of our clusters in a 1 hour timeframe:

In one of the Instana SaaS regions we insert into 40 ClickHouse nodes 1.174M calls per second. This results in 8.5M written rows and 10.5GiB of inserted data per minute on each ClickHouse node. The number of written rows are higher than the number of inserted rows because the calls table has additional downstream materialized views.

Column 3 shows distributed INSERT metrics per minute and node by new and download parts for the calls table. Detailed MERGE metrics such as the slowest merging nodes are in column 4.

You can make use of the more detailed metrics to create dedicated custom dashboards per table. One example is to create charts with node metrics to identify misbehaviours such as imbalances between shards or on a node level. Also timeshift comparisons can be very helpful to spot different load behaviours.

Conclusion

In this blogpost we demonstrate how the Instana ClickHouse sensor gathers detailed performance metrics on a table level. We also show how we in Instana make use of the ClickHouse sensor by ourselves.

In addition to SDK tracing and existing infrastructure information, the ClickHouse provides some powerful insights for an end-to-end view of our data ingestion pipeline.

Appendix

Table level INSERT metrics in detail

  • <table>.insert.avg_duration_ms
  • <table>.insert.p85_duration_ms
  • <table>.insert.max_duration_ms
  • <table>.insert.sum_read_rows
  • <table>.insert.sum_written_rows
  • <table>.insert.sum_written_bytes
  • <table>.insert.sum_queries
  • <table>.insert.error_rate

Table level new and download metrics in detail

  • <table>.new_download.avg_rows
  • <table>.new_download.min_rows
  • <table>.new_download.max_rows
  • <table>.new_download.p85_rows
  • <table>.new_download.sum_rows
  • <table>.new_download.count_parts

Table level MERGE metrics

  • <table>.merge.avg_duration_ms
  • <table>.merge.p85_duration_ms
  • <table>.merge.max_duration_ms
  • <table>.merge.sum_read_rows
  • <table>.merge.unique_partitions
  • <table>.merge.count_parts
  • <table>.merge.max_memory
  • <table>.merge.sum_errors
  • <table>.merge.merge_rate

References

0 comments
22 views

Permalink