visit
This article is part of .
Open-source big data computing engine Apache Flink, or Flink for short, has gained popularity in recent years as a powerful framework for both batch processing and stream processing that can be used to create a number of event-based applications. Flink is first of all a pure stream computing engine with a data stream basic data model. A stream can be infinite and borderless, which describes stream processing in the general sense, or can be a finite stream with boundaries, as in the case of batch processing. Flink thus uses a single architecture to support both stream and batch processing. As conveyed by its slogan “Stateful Computations Over Streams”, Flink has the additional strength of supporting stateful computing. Processing is called stateless when the result of processing an event or piece of data is only related to the content of that event itself. Alternatively, if the result is related to previously processed events, it is called stateful processing. Some complicated data processing, such as aggregation, joins two streams that are both stateful processing.As well as relying on its support for massive data operations, Alibaba Group has contributed significantly to Flink’s development over the past several years, including with the addition of important components such as the Async I/O feature. Drawing on insights from Alibaba developer Chong (Jark) Wu, this article introduces key functions available in the framework’s current version before looking at the historical changes it has undergone to improve specific performance areas.
Flink’s powerful framework has benefited from a number of groundbreaking API enhancements to each of its previous versions. During its 1.0.0 period, Flink introduced the State API, consisting of ValueState, ReducedState, ListState, and so on. The State API can be considered the biggest innovation of Flink. It allows users to use Flink’s managed state as Java Collections API and automatically enjoy state consistent guarantees without any loss of state due to failure. As a sign of its significance, even the subsequent Apache Beam State API borrowed extensively from this component. Flink’s 1.1.0 period saw the start of support for Session Window and correct processing for out-of-order and late-arriving data, assuring accuracy for final results. In the 1.2.0 period, Flink’s ProcessFunction API was provided, which is a lower-level API for implementing more advanced and complex functions. As well as being able to register various State types, it also supported registration timers (EventTime and ProcessingTime) and is often used to develop event-based, time-based applications. In the 1.3.0 period, Flink received the Side Output function. This is important because there is generally only one output type for the operator’s output, while another type may sometimes be needed. For example, as well as outputting main stream data, it may also be advantageous to output some amount of abnormal and late-arriving data in the form of side streams and then hand them to different downstream nodes for processing. Side Output specifically supports this kind of multiplexed output. Flink’s 1.5.0 phase saw the addition of BroadcastState, which is a State API extension that is used to store data broadcast from upstream. Because it is broadcast from the upstream, the BroadcastState data in each task of this operator is exactly the same. Based on this State, it becomes possible to more effectively resolve the dynamic rule functions in CEP and unequal join scenarios in SQL. Lastly, in its 1.6.0 period Flink received its State TTL and DataStream Interval Join functions. State TTL realized specification of time-to-live parameters (TTL) while applying for a certain state and ensured that this state can be specified for automatic cleanup by the system after a given time interval. Prior to this version, users have to register a timer within ProcessFunction and then manually clear the State using the timer’s callback. From 1.6.0 onward, the Flink framework solves this problem natively based on TTL. DataStream Interval Join makes interval Join possible. For example, a five-minute interval Join means that each record in the left stream joins right-stream data falling within five minutes before and five minutes after the joining.
Parallel to the changes discussed in the previous section, Flink underwent a number of changes to its high-level API as it progressed through versions. In Flink’s 1.0.0 period, its Table API (or structured data processing API) and CEP (complex event processing API) frameworks were added to the library. The Table API is a structured, high-level API that supports the Java and Scala languages in a similar way to Spark’s DataFrame API. It is also highly similar to SQL. Table API and SQL are both relational APIs and can share lots of implementations. Therefore, in Flink 1.1.0, the community refactored the entire Table module based on Apache Calcite, which supported both the Table API and SQL and allowed the them to share most of the code. In the 1.2.0 period, the Flink community supported rich built-in window operations on the Table API and SQL, including Tumbling Window, Sliding Window, and Session Window. During the 1.3.0 period, the Flink community proposed the Dynamic Table concept. With it, stream and table are dual and can be transformed to each other without losing any information. This provides a foundation for the unification of stream and batch. At the core of implementing Dynamic Table is the Retraction mechanism, which alone allows multi-level Aggregate and multi-level Join to be implemented correctly, and which alone can ensure the correctness of semantics and results of streaming SQL. Additionally, in this version, Flink also supported the scalability of CEP operator — in other words, the parallelism of CEP operator can now be changed without loss of state. Finally, in Flink version 1.5.0, Join is supported on Table API and SQL, including regular stream Join and time-windowed stream Join. Additionally, SQL Client was added. The SQL Client provides an easy way of writing, debugging, and submitting table programs to a Flink cluster without a single line of Java or Scala code. The SQL Client CLI allows for retrieving and visualizing real-time results from the running distributed application on the command line.
Flink’s Checkpoint mechanism has been supported since its early stages and has since become a core feature, and the Flink community has also been working hard to improve the efficiency of checkpoint and recovery. Flink’s 1.0.0 period supported RocksDB statebackend. Prior to this version, all state data can only be stored in memory. Because JVM memory is fixed in size and a rise in the size of data will lead to the FullGC and OOM problems. This makes it difficult for Flink to be used stably in the production environment. To store more data or a larger state, the RocksDB statebackend is needed. RocksDB is a file-based embedded database that stores data to the disk while providing efficient read and write performance. Using RocksDB thus avoids the OOM problem. In its 1.1.0 period, Flink gained support for the asynchronization of RocksDB Snapshot. In previous versions RocksDB’s Snapshot process was synchronous, which blocks processing of the main data stream and affects throughput. With asynchronization support, Flink’s throughput has since been greatly improved. In the 1.2.0 period, support was added for the re-scalable KeyedState and OperatorState with the introduction of the KeyGroup mechanism. This is to say that the parallelism of operators can be changed without losing the state. In its 1.3.0 period, Flink added support for its Incremental Checkpoint mechanism, which indicated that Flink’s stream computation had absolutely reached a production-ready state. Incremental Checkpoint only stores and persists new incremental state snapshots. For general stream computing, gigabyte-level states and even terabyte-level states are quite common, and if the states for all data are stored to distributed storage each time then the cost to the network will become exceedingly high. In this version, Flink also supported fine-grained recovery features. Instead of restarting the complete ExecutionGraph in the event of a task failure, Flink was able to restart only the affected subgraph and thereby significantly decrease recovery time. Lastly, in the Flink 1.5.0 period, a mechanism was introduced for local state recovery. Because of the checkpoint mechanism, the State will be stored persistently in distributed storage. Taking HDFS, for instance, data will need to be re-downloaded from the remote HDFS when failover occurred. If the state is especially large, the download time will run long, and the time spent on failover recovery will likewise lengthen. The local state recovery mechanism backed up the state file locally in advance, so that when a job experiences failover it could then be restored locally instead of by downloading the state file from the remote HDFS, thus improving recovery efficiency.
Notable developments in Flink Runtime begin from the version 1.2.0 period, when Alibaba directly contributed the high-profile Async I/O feature to the Flink community. Async I/O’s main purpose is to solve system bottlenecks caused by network latency when interacting with external systems, as for example when querying the external HBase table to fill some columns. A synchronous approach results in each query operation being blocked, and the data stream becomes stuck due to frequent I/O requests. Using asynchronous I/O, requests for N asynchronous queries can be initiated simultaneously without blocking the main data stream, improving the throughput of the entire job and utilization of the CPU. Flink version 1.3.0 introduced a HistoryServer component. HistoryServer allows users to query the status and statistics of completed jobs that have been archived by a JobManage, which is useful when doing in-depth troubleshooting after job is completed. In the Flink 1.4.0 period, an end-to-end exactly-once semantic guarantee was added. Exactly-once means that each record of input data will take effect on the final result exactly once even if software or hardware failures occur, thus ensuring data cannot be lost or computed twice. Prior to version 1.4.0, Flink provided the exactly-once guarantee within the Flink program, but did not include outputs to external systems, making it possible that duplicate data would be written to an external system if failover happened. At that time the general solution was to use an idempotent external database. In 1.4.0, Flink supported the end-to-end exactly-once semantic guarantee by the two-phase commit algorithm. It further supported Kafka’s end-to-end guarantee with a built-in approach and provided the TwoPhaseCommitSinkFunction for users to create their own exactly-once data sinks. During its 1.5.0 period, Flink released a new deployment model and processing model. The rewrite of Flink’s deployment and process model (internally known as FLIP-6) had been in the works for more than a year and was a substantial effort from the Flink community. It has been the most significant improvement of a Flink core component since the project’s inception. In a nutshell, the improvements add support for dynamic resource allocation and dynamic release of resources on YARN and Mesos schedulers for better resource utilization, failure recovery, and also dynamic scaling. Version 1.5.0 additionally refactored the network stack, for the reason that in the old version, communications between multiple upstream and downstream tasks shared a single TCP connection. When a task was back pressured, all tasks sharing its connection would be blocked. For Flink 1.5, the community worked on two efforts to improve Flink’s network stack: credit-based flow control and improved transfer latency. Credit-based flow control reduces the amount of data “on the wire” to a minimum while preserving high throughput. This significantly reduces the time to complete a checkpoint in back pressure situations. Moreover, Flink is now able to achieve much lower latencies without a reduction in throughput.
(Original article by Wu Chong伍翀)
This article is part of .