

Getting into Low-Latency Gears with Apache Flink - Part One
source link: https://flink.apache.org/2022/05/18/latency-part1.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Getting into Low-Latency Gears with Apache Flink - Part One
18 May 2022 Jun Qin & Nico Kruber
Apache Flink is a stream processing framework well known for its low latency processing capabilities. It is generic and suitable for a wide range of use cases. As a Flink application developer or a cluster administrator, you need to find the right gear that is best for your application. In other words, you don’t want to be driving a luxury sports car while only using the first gear.
In this multi-part series, we will present a collection of low-latency techniques in Flink. Part one starts with types of latency in Flink and the way we measure the end-to-end latency, followed by a few techniques that optimize latency directly. Part two continues with a few more direct latency optimization techniques. Further parts of this series will cover techniques that improve latencies by optimizing throughput. For each optimization technique, we will clarify what it is, when to use it, and what to keep in mind when using it. We will also show experimental results to support our statements.
This series of blog posts is a write-up of our talk in Flink Forward Global 2021 and includes additional latency optimization techniques and details.
Latency
Types of latency
Latency can refer to different things. LatencyMarkers in Flink measure the time it takes for the markers to travel from each source operator to each downstream operator. As LatencyMarkers bypass user functions in operators, the measured latencies do not reflect the entire end-to-end latency but only a part of it. Flink also supports tracking the state access latency, which measures the response latency when state is read/written. One can also manually measure the time taken by some operators, or get this data with profilers. However, what users usually care about is the end-to-end latency, including the time spent in user-defined functions, in the stream processing framework, and when state is accessed. End-to-end latency is what we will focus on.
How we measure end-to-end latency
There are two scenarios to consider. In the first scenario, a pipeline does a simple transformation, and there are no timers or any other complex event time logic. For example, a pipeline that produces one output event for each input event. In this case, we measure the processing delay as the latency, that is, t2 - t1
as shown in the diagram.
The second scenario is where complex event time logic is involved (e.g., timers, aggregation, windowing). In this case, we measure the event time lag as the latency, that is, current processing time - current watermark
. The event time lag gives us the difference between the expected output time and the actual output time.
In both scenarios, we capture a histogram and show the 99th percentile of the end-to-end latency. The latency we measure here includes the time an event stays in the source message queue (e.g., Kafka). The reason for this is that it covers the scenarios where a source operator in a pipeline is backpressured by other operators. The more the source operator is backpressured, the longer the messages stay in the message queue. So, including the time events stay in the message queue in the latency gives us how slow or fast a pipeline is.
Low-latency optimization techniques
We will discuss low-latency techniques in two groups: techniques that optimize latency directly and techniques that improve latency by optimizing throughput. Each of these techniques can be as simple as a configuration change or may require code changes, or both. We have created a git repository containing the example jobs used in our experiments to support our statements. Keep in mind that all the experimental results we will show are specific to those jobs and the environment they run in. Your job may show different results depending on where the latency bottleneck is.
Direct latency optimization
Allocate enough resources
An obvious but often forgotten low-latency technique is to allocate enough resources to your job. Flink has some metrics (e.g., idleTimeMsPerSecond, busyTimeMsPerSecond, backPressureTimeMsPerSecond) to indicate whether an operator/subtask is busy or not. This can also be spotted easily in the job graph on Flink’s Web UI if you are using Flink 1.13 or later. If some operators in your job are 100% busy, they will backpressure upstream operators and the backpressure may propagate up to the source operators. Backpressure slows down the pipeline and results in high latency. If you scale up your job by adding more CPU/memory resources or scale out by increasing the parallelism, your job will be able to process events faster or process more events in parallel which leads to reduced latencies. We recommend having an average load below 70% under normal circumstances to accommodate load spikes that come from input data, timers, windowing, or other sources. You should adjust the threshold based on your job resource usage patterns and your latency requirements.
You can apply this optimization if your job or part of it is running at its total CPU/memory capacity and you have more resources that can be allocated to the job. In the case of scaling out with high parallelism, your streaming job must be able to make use of the additional resources. For example, the job should not have fixed parallelisms in the code, the job should not be bottlenecked on the source streams, and the input streams are partitionable by keys such that they can be processed in parallel and have no severe data skew, etc. In the case of scaling up by allocating more CPU cores, your streaming job must not be bottlenecked on a single thread or any other resources.
Keep in mind that allocating more resources may result in increased financial costs, especially when you are running jobs in the cloud.
Below are the experimental results of WindowingJob. As you can see from the graph at the left, when the parallelism was 2, the two subtasks were often 100% busy. After we increased the parallelism to 3, the three subtasks were around 75% busy. As a result, the 99th percentile latency reduces from around 3 seconds to 650 milliseconds.

Use applicable state backends
When using the filesystem
(Flink 1.12 or early) or hashmap
(Flink 1.13 or later) state backend, the state objects are stored in memory and can be accessed directly. In contrast, when using the rocksdb
state backend, every state access has to go through a (de-)serialization process which in addition may involve disk accesses. So using filesystem/hashmap
state backend can help reduce latency.
You can apply this optimization if your state size is very small compared to the memory you can allocate to your job and your state size will not grow beyond your memory capacity. You can set the managed memory size to 0 if not needed. Since Flink 1.13, you can always start with the hashmap
state backend and seamlessly switch to the rocksdb
state backend via savepoints when the state increases to the size that is close to your memory capacity. Note that you should closely monitor the memory usage and perform the switch before an out-of-memory happens. Please refer to this Flink blog post for best practices when using the rocksdb
state backend.
Keep in mind that heap-based state backends use more memory compared with RocksDB due to their copy-on-write data structure and Java’s on-heap object representation. Heap-based state backends can be affected by the garbage collector which makes them less predictable and may lead to high tail latencies. Also, as of now, there is no support for incremental checkpointing (this is being developed in FLIP-151). You should measure the difference before you make the switch.
Our experiments with the previously mentioned WindowingJob after switching the state backend from rocksdb
to hashmap
show a further reduction of the latency down to 500ms. Depending on your job’s state access pattern, you may see larger or smaller improvements. The graph on the right shows the garbage collection’s impact on the latency.

Emit watermarks quickly
When using a periodic watermark generator, Flink generates a watermark every 200 ms. This means that, by default, each parallel watermark generator does not produce watermark updates until 200 ms have passed. While this may be sufficient for many cases, if you are aiming for sub-second latencies, you could try reducing the interval even further, for example, to 100 ms.
You can apply this optimization if you use event time and a periodic watermark generator, and you are aiming for sub-second latencies.
Keep in mind that watermark generation that is too frequent may also degrade performance because more watermarks must be processed by the framework. Moreover, even though watermarks are only created every 200 milliseconds, watermarks may arrive at much higher frequencies further downstream in your job because tasks may receive watermarks from multiple parallel watermark generators.
We re-ran the previous WindowingJob experiment with the reduced watermark interval pipeline.auto-watermark-interval: 100ms
and reduced the latency further to 430ms.

Flush network buffers early
Flink uses buffers when sending data from one task to another over the network. Buffers are flushed and sent out when they are filled up or when the default timeout of 100ms has passed. Again, if you are aiming for sub-second latencies, you can lower the timeout to reduce latencies.
You can apply this optimization if you are aiming for sub-second latencies.
Keep in mind that network buffer timeout that is too low may reduce throughput.
As seen in the following experiment results, by using execution.buffer-timeout: 10 ms
in WindowingJob, we again reduced the latency (now to 370ms).

Summary
In part one of this multi-part series, we discussed types of latency in Flink and the way we measure end-to-end latency. Then we presented a few latency optimization techniques with a focus on direct latency optimization. For each technique, we explained what it is, when to use it, and what to keep in mind when using it. Part two will continue with a few more direct latency optimization techniques. Stay tuned!
Recommend
-
115
Apache Flink:Keyed Window与Non-Keyed Window Apache Flink中,Window操作在流式数据处理中是非常核心的一种抽象,它把一个无限流数据集分割成一个个有界的Window(或称为Bucket),然后...
-
53
README.md Apache Flink Apache Flink is an open source stream processing framework with powerful stream- and batch-processing capabilities. Learn...
-
55
-
50
什么是Flink Apache Flink是一个分布式大数据处理引擎,可以对 有限数据流 和 无限数据流 进行 有...
-
15
PyFlink: The integration of Pandas into PyFlink 04 Aug 2020 Jincheng Sun (@sunjincheng121) & Markos Sfikas (@MarkSfik) Pyt...
-
21
A Deep-Dive into Flink's Network Stack 05 Jun 2019 Nico Kruber Flink’s network stack is one of the core components that make up the flink-runtime module and sit at the heart of every Flink jo...
-
15
A Deep Dive into Rescalable State in Apache Flink 04 Jul 2017 by Stefan Richter (@StefanRRichter) Apache Flink 1.2.0, released in February 2017, introduced support f...
-
7
13 Mar 2015 by Fabian Hüske (@fhueske) Join Processing in Apache Flink Joins are prevalent operations in many data processing applications. Most data processing systems feature APIs...
-
7
Getting into Low-Latency Gears with Apache Flink - Part Two 23 May 2022 Jun Qin & Nico Kruber This series of blog posts present a collection of low-latency techniques in Flink. In
-
10
Xbox Cloud Gaming is getting mouse and keyboard support and latency improvements Microsoft preps game developers for xCloud improvements By...
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK