
Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed, and at any scale. Developers build applications for Flink using APIs such as Java or SQL, which are executed on a Flink cluster by the framework.
Apache Flink reduces the complexity that has been faced by other distributed data-driven engines. It achieves this feature by integrating query optimization, concepts from database systems and efficient parallel in-memory and out-of-core algorithms, with the MapReduce framework.
Flink is a truly streaming engine, A crucial piece of a streaming infrastructure is a stream processor that can deliver high throughput, low latency, and strong consistency guarantees even in the presence of stateful computation. Apache Flink is a scalable stream processing engine that provides exactly this combination of properties.
Flink uses one common runtime for data streaming applications and batch processing applications. These are the following features that Flink has.
Exactly-once guarantees: After a failure, a state in stateful operators should be correctly restored
Low latency: Many applications require sub-second latency as the lower the better.
High throughput: As data grows, pushing large amounts of data through the pipeline is crucial
Powerful computation model: The framework should offer a programming model that does not restrict the user and allows a wide variety(vəˈraɪəti) of applications
Low overhead: It is required for fault tolerance mechanism in absence of failures
Flow control: Backpressure from slow operators should be naturally absorbed by the system and the data sources to avoid crashes or degrading performance due to slow consumers
Event-driven Applications
Data Analytics Applications
Data Pipeline Applications
Flink Client: Compiles batch or streaming applications into a dataflow graph, which it then submits to the JobManager.
JobManager: JobManager is the name of the central work coordination component of Flink. It has implementations for different resource providers, which differ on high-availability, resource allocation behavior and supported job submission modes. so far the flink support three application deploy mode, there are
Application Mode: runs the cluster exclusively for one application
Per-Job Mode: runs the cluster exclusively for one job. The job's main method (or client) runs only prior to the cluster creation.
Session Mode: one JobManager instance manages multiple jobs sharing the same cluster of TaskManagers
TaskManager: TaskManagers are the services actually performing the work of a Flink job.
package com.alexwang.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class WordCountBatchProcess {
public static void main(String[] args) throws Exception {
//1. create the execution env.
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//2. read data from batch files
DataSource<String> dataSource = env.readTextFile("input/words.txt");
//3. process and transformation data
FlatMapOperator<String, Tuple2<String, Long>> flatMapOperator = dataSource.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Long>> collector)
throws Exception {
Arrays.stream(line.split("\\s+")).forEach(
word -> collector.collect(Tuple2.of(word, 1L))
);
}
});
UnsortedGrouping<Tuple2<String, Long>> unsortedGrouping = flatMapOperator.groupBy(0);
AggregateOperator<Tuple2<String, Long>> aggregateOperator = unsortedGrouping.sum(1);
aggregateOperator.print();
}
}
package com.alexwang.flink;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class WordCountStreamProcess {
public static void main(String[] args) throws Exception {
//1. init context and env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//set the runtime mode
//bin/flink run -Dexecution.runtime-mode=BATCH ....
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
//2. source define
DataStreamSource<String> dataStreamSource = env.readTextFile("input/words.txt");
//3. transformation
SingleOutputStreamOperator<Tuple2<String, Long>> wordAndCount = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Long>> out)
throws Exception {
Arrays.stream(line.split("\\s+")).forEach(word -> {
out.collect(Tuple2.of(word, 1L));
});
}
});
KeyedStream<Tuple2<String, Long>, String> keyedStream = wordAndCount.keyBy(tuple -> tuple.f0);
SingleOutputStreamOperator<Tuple2<String, Long>> sum = keyedStream.sum(1);
sum.print();
env.execute();
}
}
package com.alexwang.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class WordCountSocketStreamProcess {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.socketTextStream("hadoop151", 4567)
.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
Arrays.stream(value.split("\\s+")).forEach(word -> out.collect(Tuple2.of(word, 1L)));
}
}).keyBy(t -> t.f0)
.sum(1)
.print();
env.execute();
}
}
package com.alexwang.flink;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class WordCountLambdaExpressionProcess {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.socketTextStream("hadoop151", 4567)
.flatMap((String line, Collector<Tuple2<String, Long>> collector) -> {
Arrays.stream(line.split("\\s+"))
.map(w -> Tuple2.of(w, 1L))
.forEach(collector::collect);
}, Types.TUPLE(Types.STRING, Types.LONG))
//.returns(Types.TUPLE(Types.STRING, Types.LONG))
.keyBy(t -> t.f0)
.sum(1)
.print();
env.execute();
}
}
The Flink can execute applications in one of three ways:
in Application Mode,
in Session Mode,
in a Per-Job Mode (deprecated).
those three modes are differ in two aspect.
the cluster lifecycle and resource isolation guarantees
whether the application’s main() method is executed on the client or on the cluster.
The Session mode assumes an already running cluster and uses the resources of that cluster to execute any submitted application. Applications executed in the same (session) cluster use, and consequently compete for, the same resources. This has the advantage that you do not pay the resource overhead of spinning up a full cluster for every submitted job. But, if one of the jobs misbehaves or brings down a TaskManager, then all jobs running on that TaskManager will be affected by the failure.
Advantage
easy to undersdand
easy to setup
Job Manager role are unique in flink cluster
Disadvantage
Lack of resources isolation
Tasks affect each other in the same taskmanger
Cluster resources fixation
The jobmanager overresponsibility
Appropriate scene
The job that Short execution time
Small job
Dev and Test
In a Flink Job Cluster, the available cluster manager (like YARN) is used to spin up a cluster for each submitted job and this cluster is available to that job only. Here, the client first apply resources from the cluster resource manager to start the JobManager and submits the job to the Dispatcher running inside this process. TaskManagers are then lazily allocated based on the resource requirements of the job. Once the job is finished, the Flink Job Cluster is dispose down.
Advantage
better resource isolation. difference flink job will not affect each other.
a misbehaving job can only bring down its own TaskManagers
Disadvantage
Flink Standalone Cluster not support such application deploy mode.
Flink client node overresponsibility
The Apache flink deprecated this mode since Flink 1.15 version and instead by Application mode.
a Flink Application Cluster is a dedicated Flink cluster that only executes jobs from one Flink Application and where the main() method runs on the cluster rather than the client. The job submission is a one-step process: you don’t need to start a Flink cluster first and then submit a job to the existing cluster session;
instead, you package your application logic and dependencies into a executable job JAR and the cluster entrypoint (ApplicationClusterEntryPoint) is responsible for calling the main() method to extract the JobGraph. This allows you to deploy a Flink Application like any other application on hadoop yarn and Kubernetes.
Apache Hadoop YARN is a resource provider popular with many data processing frameworks. Flink services are submitted to YARN’s ResourceManager, which spawns containers on machines managed by YARN NodeManagers. Flink deploys its JobManager and TaskManager instances into such containers.
Flink can dynamically allocate and de-allocate TaskManager resources depending on the number of processing slots required by the job(s) running on the JobManager.
In yarn session mode, Flink starts a cluster with only one Job Manager service in the cluster. When a job is submitted, the JobManager applies for resources from the Yarn ResourceManager based on the required resources and starts the TaskManager container
Flink on YARN supports the Per Job mode in which one job is submitted at a time and resources are released after the job is completed. The Per Job process is as follows:
A client submits a YARN application, such as a JobGraph or a JAR package.
The YARN ResourceManager applies for the first container. This container starts a process through the ApplicationMaster, which runs Flink programs, namely, the Flink YARN ResourceManager and JobManager.
The Flink YARN ResourceManager applies for resources from the YARN ResourceManager. The TaskManager is started after resources are allocated. After startup, the TaskManager registers with the Flink YARN ResourceManager. After registration, the JobManager allocates tasks to the TaskManager for execution.
Application Mode creates a session cluster per application and executes the application’s main() method on the cluster. It thus comes with better resource isolation as the resources are only used by the job(s) launched from a single main() method
In all the other modes, the application’s main() method is executed on the client side. This process includes downloading the application’s dependencies locally, executing the main() to extract a representation of the application that Flink’s runtime can understand (i.e. the JobGraph) and ship the dependencies and the JobGraph(s) to the cluster.
Application Mode will launch a Flink cluster on YARN, where the main() method of the application jar gets executed on the JobManager in YARN not client side.
To unlock the full potential of the application mode, consider using it with the yarn.provided.lib.dirs configuration option and pre-upload your application jar to a location accessible by all nodes in your cluster, that will allow the job submission to be extra lightweight as the needed Flink jars and the application jar are going to be picked up by the specified remote locations rather than be shipped to the cluster by the client.
The Flink runtime consists of two types of processes: a JobManager and one or more TaskManagers.
Master Node(Job Manager)
The Job Manager daemons running on the Master node of the cluster and work as a coordinator in the Flink system. It receives the program code from the client system and assigns that task to slave nodes for further processing.
Slave Nodes(Task Manager)
The Task Manager demons running on the Slave nodes of the cluster and perform the actual operation in the Flink system. It receives the command from the Job Manager and performs the required action.
Each worker (TaskManager) is a JVM process, and may execute one or more subtasks in separate threads. To control how many tasks a TaskManager accepts, it has so called task slots (at least one).
Each task slot represents a fixed subset of resources of the TaskManager. A TaskManager with three slots, for example, will dedicate 1/3 of its managed memory to each slot. Slotting the resources means that a subtask will not compete with subtasks from other jobs for managed memory, but instead has a certain amount of reserved managed memory. Note that no CPU isolation happens here; currently slots only separate the managed memory of tasks.
By adjusting the number of task slots, users can define how subtasks are isolated from each other. Having one slot per TaskManager means that each task group runs in a separate JVM (which can be started in a separate container, for example). Having multiple slots means more subtasks share the same JVM. Tasks in the same JVM share TCP connections (via multiplexing) and heartbeat messages. They may also share data sets and data structures, thus reducing the per-task overhead.
The parallelism priority: Operator > ExecutionEnvironment > Console Option > flink-conf.yaml configuration
Flink features an optimization technique called operator chaining that reduces the overhead of local communication under certain conditions.
For distributed execution, Flink chains operator subtasks together into tasks. Each task is executed by one thread. Chaining operators together into tasks is a useful optimization: it reduces the overhead of thread-to-thread handover and buffering, and increases overall throughput while decreasing latency.
When does operator chaining occur?
operators have the same parallelism
operators grouped into the same slot group
operators connected by forward channels
A few basic data sources and sinks are built into Flink and are always available. The predefined data sources include reading from files, directories, and sockets, and ingesting data from collections and iterators. The predefined data sinks support writing to files, to stdout and stderr, and to sockets.
from Java collection read data into flink for process
DataGeneratorSource
SocketSource
LocalFile Source
HdfsFile Source
Apache Kafka topic source
How to Customize Source
fromCollection: Creates a data stream from the given non-empty collection. The type of the data stream is that of the elements in the collection.
fromElements: Creates a new data stream that contains the given elements. The elements must all be of the same type, for example, all of the String or Integer.
fromSequence: Creates a new data stream that contains a sequence of numbers (longs) and is useful for testing and for cases that just need a stream of N events of any kind. The generated source splits the sequence into as many parallel sub-sequences as there are parallel source readers. Each sub-sequence will be produced in order. If the parallelism is limited to one, the source will produce one sequence in order.
The DataGen connector provides a Source implementation that allows for generating input data for Flink pipelines. It is useful when developing locally or demoing without access to external systems such as Kafka. The DataGen connector is built-in, no additional dependencies are required.
Flink provides a MongoDB connector for reading and writing data from and to MongoDB collections with at-least-once guarantees.
package com.alexwang.flink.chapter04.source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class SimpleSourceFunction extends RichSourceFunction<Long> {
private final static AtomicBoolean running = new AtomicBoolean(true);
@Override
public void open(Configuration parameters) throws Exception {
int subtasks = getRuntimeContext().getNumberOfParallelSubtasks();
int indexSubTask = getRuntimeContext().getIndexOfThisSubtask();
System.out.println("total tasks: " + subtasks + ", current task index:" + indexSubTask);
}
@Override
public void run(SourceContext<Long> ctx) throws Exception {
while (running.get()) {
final long value = ThreadLocalRandom.current().nextLong(2000);
ctx.collect(value);
TimeUnit.MILLISECONDS.sleep(value);
}
}
@Override
public void cancel() {
running.set(false);
}
}
There are seven different categories of data types:
Java Tuples and Scala Case Classes
Java POJOs
Primitive Types
Regular Classes
Values
Hadoop Writables
Special Types
The DataStream API provides transformations for the most common data transformation operations. We present the transformations of the DataStream API in four categories:
Basic transformations are transformations on individual events.
KeyedStream transformations are transformations that are applied to events in the context of a key
Multistream transformations merge multiple streams into one stream or split one stream into multiple streams
Distribution transformations reorganize stream events.
The map transformation is specified by calling the DataStream.map() method and produces a new DataStream. It passes each incoming event to a user-defined mapper that returns exactly one output event, possibly of a different type.
The filter transformation drops or forwards events of a stream by evaluating a boolean condition on each input event. A return value of true keep the input event and forwards it to the output, and false results in dropping the event.
The flatMap transformation is similar to map, but it can produce zero, one, or more output events for each incoming event. In fact, the flatMap transformation is a generalization of filter and map and can be used to implement both those operations.
The keyBy transformation converts a DataStream into a KeyedStream by specifying a key. Based on the key, the events of the stream are assigned to partitions, so that all events with the same key are processed by the same task of the subsequent operator. Events with different keys can be processed by the same task, but the keyed state of a task’s function is always accessed in the scope of the current event’s key.
The DataStream.union() method merges two or more DataStreams of the same type and produces a new DataStream of the same type.
The events are merged in a FIFO fashion—the operator does not produce a specific order of events. Moreover, the union operator does not perform duplication elimination. Every input event is emitted to the next operator.
In addition to the main stream that results from DataStream operations, you can also produce any number of additional side output result streams. The type of data in the result streams does not have to match the type of data in the main stream and the types of the different side outputs can also differ. This operation can be useful when you want to split a stream of data where you would normally have to replicate the stream and then filter out from each stream the data that you don’t want to have.
Partitioner that forwards elements only to the locally running downstream operation.
Partitioner that distributes the data equally by selecting one output channel randomly.
Partitioner that distributes the data equally by cycling through the output channels.
Partitioner that selects all the output channels.
Partitioner that sends all elements to the downstream operator with subtask ID=0.
Partitioner selects the target channel based on the key group index.
Partitioner that distributes the data equally by cycling through the output channels. This distributes only to a subset of downstream nodes because org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator instantiates a DistributionPattern.POINTWISE distribution pattern when encountering RescalePartitioner.
The subset of downstream operations to which the upstream operation sends elements depends on the degree of parallelism of both the upstream and downstream operation. For example, if the upstream operation has parallelism 2 and the downstream operation has parallelism 4, then one upstream operation would distribute elements to two downstream operations while the other upstream operation would distribute to the other two downstream operations. If, on the other hand, the downstream operation has parallelism 2 while the upstream operation has parallelism 4 then two upstream operations will distribute to one downstream operation while the other two upstream operations will distribute to the other downstream operations.
Accumulators are simple constructs with an add operation and a final accumulated result, which is available after the job ended.
The most straightforward accumulator is a counter: You can increment it using the Accumulator.add(V value) method. At the end of the job Flink will sum up (merge) all partial results and send the result to the client. Accumulators are useful during debugging or if you quickly want to find out more about your data.
Flink currently has the following built-in accumulators. Each of them implements the Accumulator interface.
IntCounter , LongCounter and DoubleCounter : See below for an example using a counter.
Histogram : A histogram implementation for a discrete number of bins. Internally it is just a map from Integer to Integer. You can use this to compute distributions of values, e.g. the distribution of words-per-line for a word count program.
Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them.
Socket Sink
Local File System Sink
Hdfs File System Sink
JDBC Sink
Apache Kafka Sink
Mongodb Sink
Custom Sink
package com.alexwang.flink.chapter04.sink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SocketSinkConnector {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.socketTextStream("hadoop151", 4567)
.map(String::toUpperCase, Types.STRING)
.writeToSocket("hadoop151", 4568, new SimpleStringSchema());
env.execute();
}
}
The file sink collector writes incoming data into buckets. Given that the incoming streams can be unbounded, data in each bucket is organized into part files of finite size. The bucketing behaviour is fully configurable with a default time-based bucketing where we start writing a new bucket every hour or minute. This means that each resulting bucket will contain files with records received during 1 hour or 1 minute intervals from the stream.
Example:
select * from read_parquet("C:/WorkBench/Work/flink/2024-06-21-18/alex-7c14ba4b-d1d5-4872-bc86-a4f44da3c38f-0.parquet");
In the case of keyed streams, any attribute of your incoming events can be used as a key . Having a keyed stream will allow your windowed computation to be performed in parallel by multiple tasks, as each logical keyed stream can be processed independently from the rest. All elements referring to the same key will be sent to the same parallel task.
In case of non-keyed streams, your original stream will not be split into multiple logical streams and all the windowing logic will be performed by a single task, i.e. with parallelism of 1.
The window assigner defines how elements are assigned to windows. This is done by specifying the WindowAssigner of your choice in the window(...) (for keyed streams) or the windowAll() (for non-keyed streams) call.
A WindowAssigner is responsible for assigning each incoming element to one or more windows. Flink comes with pre-defined window assigners for the most common use cases, namely tumbling windows, sliding windows, session windows and global windows.
The sliding windows assigner assigns elements to windows of fixed length. Similar to a tumbling windows assigner, the size of the windows is configured by the window size parameter. An additional window slide parameter controls how frequently a sliding window is started. Hence, sliding windows can be overlapping if the slide is smaller than the window size. In this case elements are assigned to multiple windows.
The session windows assigner groups elements by sessions of activity. Session windows do not overlap and do not have a fixed start and end time, in contrast to tumbling windows and sliding windows. Instead a session window closes when it does not receive elements for a certain period of time, i.e., when a gap of inactivity occurred. A session window assigner can be configured with either a static session gap or with a session gap extractor function which defines how long the period of inactivity is. When this period expires, the current session closes and subsequent elements are assigned to a new session window.
A global windows assigner assigns all elements with the same key to the same single global window. This windowing scheme is only useful if you also specify a custom trigger. Otherwise, no computation will be performed, as the global window does not have a natural end at which we could process the aggregated elements.
The features of Global Windows
no window size
non-overlapping
no start and end timestamp
no computation result otherwise attached trigger at code
Count window set the window size based on how many elements exist within that window. A window is fired when it has more than a certain number of elements .For example, if we fixed the count as 4, every window will have exactly 4 elements . It doesn’t matter whats the size of the window in terms of time. Window size will be different but the number of elements in that window will always be the same. Count windows can have overlapping windows or non-overlapping, both are possible. The count window in Flink is applied to keyed streams means there is already a logical grouping of the stream based on all values associated with a certain key.
After defining the window assigner, we need to specify the computation that we want to perform on each of these windows. This is the responsibility of the window function, which is used to process the elements of each (possibly keyed) window once the system determines that a window is ready for processing.
Rolling aggregation Functions
sum
min
max
minBy
maxBy
reduce-> ReduceFunction
aggregate-> AggregateFunction
process->ProcessWindowFunction
ProcessWindowFunction with Incremental Aggregation
apply-> WindowFunction(Legacy)
Side Output
Stateful operators and user functions are common building blocks of stream processing applications. In fact, most nontrivial operations need to memorize records or partial results because data is streamed and arrives over time. Many of Flink’s built-in DataStream operators, sources, and sinks are stateful and buffer records or maintain partial results or metadata. For instance, a window operator collects input records for a ProcessWindowFunction or the result of applying a ReduceFunction, a ProcessFunction memorizes scheduled timers, and some sink functions maintain state about transactions to provide exactly-once functionality. In addition to built-in operators and provided sources and sinks, Flink’s DataStream API exposes interfaces to register, maintain, and access state in user-defined functions.
Keyed State: If you want to use keyed state, you first need to specify a key on a DataStream that should be used to partition the state, and then applied the state at keyedstream operators. (there are five types state we can apply at keyed state)
ValueState<T>: This keeps a value that can be updated and retrieved
ListState<T>: This keeps a list of elements. You can append elements and retrieve an Iterable over all currently stored elements.
ReducingState<T>: This keeps a single value that represents the aggregation of all values added to the state.
AggregatingState<IN, OUT>: This keeps a single value that represents the aggregation of all values added to the state. but allowed specify the difference output data type.
MapState<UK, UV>: This keeps a list of mappings. You can put key-value pairs into the state and retrieve an Iterable over all currently stored mappings
Operator State: Operator State (or non-keyed state) is state that is bound to one parallel operator instance. each parallel instance of operator have it own state.
Broadcast State: Broadcast State is a special type of Operator State. It was introduced to support use cases where records of one stream need to be broadcasted to all downstream tasks, where they are used to maintain the same state among all subtasks. This state can then be accessed while processing records of a second stream
ValueState<T>: This keeps a value that can be updated and retrieved (scoped to key of the input element as mentioned above, so there will possibly be one value for each key that the operation sees). The value can be set usingupdate(T)and retrieved usingT value()`.
ReducingState<T>: This keeps a single value that represents the aggregation of all values added to the state. When the elements added using add(T) the ReduceFunction will be applied.
AggregatingState<IN, OUT>: This keeps a single value that represents the aggregation of all values added to the state. Contrary to ReducingState, the aggregate type may be different from the type of elements that are added to the state. When elements added using add(IN) into state the aggregated function will be applied too.
ListState<T>: This keeps a list of elements. You can append elements and retrieve an Iterable over all currently stored elements. Elements are added using add(T) or addAll(List<T>), the Iterable can be retrieved using Iterable<T> get(). You can also override the existing list with update(List<T>)
MapState<UK, UV>: This keeps a list of mappings. You can put key-value pairs into the state and retrieve an Iterable over all currently stored mappings. Mappings are added using put(UK, UV) or putAll(Map<UK, UV>). The value associated with a user key can be retrieved using get(UK). The iterable views for mappings, keys and values can be retrieved using entries(), keys() and values() respectively. You can also use isEmpty() to check whether this map contains any key-value mappings.
Broadcast State is a special type of Operator State. It was introduced to support use cases where records of one stream need to be broadcasted to all downstream tasks, where they are used to maintain the same state among all subtasks. This state can then be accessed while processing records of a second stream.
A common usage pattern for broadcast state is: broadcast the rules to process operator rather than hard code. for example we can broadcast some threshold by runtime rather than upgrade the flink application.
This state backend holds the working state in the memory (JVM heap) of the TaskManagers, the Key/value state and window operators hold hash tables that store the values, triggers, etc.
If nothing else is configured, the system will use the HashMapStateBackend.
This backend can either be configured within the application (by creating the backend with the respective constructor parameters and setting it on the execution environment) or by specifying it in the Flink configuration.
This backend uses RocksDB by Facebook to store the data. If you are not aware of RocksDB, it’s an embeddable key-value store which offers ACID guarantees. It is based on LevelDB by Google but offers much better write performance.
Flink chose to use RocksDB instead of some of the most popular embeddable storage such as SQLlite because of its high write performance which comes from the LSM architecture based design. Since RocksDB also maintains an in-memory table (also known as mem-table) along with bloom filters, reading recent data also is extremely fast.
Each task manager maintains its own Rocks DB file and the backup of this state is checkpointed to a durable store such as HDFS.
Checkpoints make state in Flink fault tolerant by allowing state and the corresponding stream positions to be recovered, thereby giving the application the same semantics as a failure-free execution.
Flink’s recovery mechanism is based on consistent checkpoints of application state. A consistent checkpoint of a stateful streaming application is a copy the state of each of its tasks at a point when all tasks have processed exactly the same input.
When checkpointing is enabled, managed state is persisted to ensure consistent recovery in case of failures. Where the state is persisted to reliabled storage system such as hdfs, AWS S3 during checkpointing.
checkpoint storage: You can set the location where checkpoint snapshots are made durable. By default Flink will use the JobManager’s heap. For production deployments it is recommended to instead use a durable filesystem. See checkpoint storage for more details on the available options for job-wide and cluster-wide configuration.
exactly-once vs. at-least-once: You can optionally pass a mode to the enableCheckpointing(n) method to choose between the two guarantee levels. Exactly-once is preferable for most applications. At-least-once may be relevant for certain super-low-latency (consistently few milliseconds) applications.
checkpoint timeout: The time after which a checkpoint-in-progress is aborted, if it did not complete by then.
minimum time between checkpoints: To make sure that the streaming application makes a certain amount of progress between checkpoints, one can define how much time needs to pass between checkpoints. If this value is set for example to 5000, the next checkpoint will be started no sooner than 5 seconds after the previous checkpoint completed, regardless of the checkpoint duration and the checkpoint interval. Note that this implies that the checkpoint interval will never be smaller than this parameter.
tolerable checkpoint failure number: This defines how many consecutive checkpoint failures will be tolerated, before the whole job is failed over. The default value is 0, which means no checkpoint failures will be tolerated, and the job will fail on first reported checkpoint failure.
number of concurrent checkpoints: By default, the system will not trigger another checkpoint while one is still in progress.
externalized checkpoints: You can configure periodic checkpoints to be persisted externally. Externalized checkpoints write their meta data out to persistent storage and are not automatically cleaned up when the job fails.
unaligned checkpoints: You can enable unaligned checkpoints to greatly reduce checkpointing times under backpressure. This only works for exactly-once checkpoints and with one concurrent checkpoint.
checkpoints with finished tasks: By default Flink will continue performing checkpoints even if parts of the DAG have finished processing all of their records. Please refer to important considerations for details.
An application consists of multiple operators. Each operator can define one or more keyed and operator states. Operators are executed in parallel by one or more operator tasks. Hence, a typical application consists of multiple states that are distributed across multiple operator tasks that can run on different TaskManager processes.
The state copies in the savepoint are organized by an operator identifier and a state name. The operator identifier and state name are required to be able to map the state data of a savepoint to the states of the operators of a starting application. When an application is started from a savepoint, Flink redistributes the savepoint data to the tasks of the corresponding operators.
The savepoint does not contain information about operator tasks. That is because the number of tasks might change when an application is started with different parallelism.
The primary purpose of checkpoints is to provide a recovery mechanism in case of unexpected job failures. A checkpoint’s lifecycle is managed by Flink, i.e. a checkpoint is created, owned, and released by Flink - without user interaction. Because checkpoints are being triggered often, and are relied upon for failure recovery.
the two main design goals for the checkpoint implementation are:
i) being as lightweight to create
ii) being as fast to restore from as possible.
Savepoints are created internally with the same mechanisms as checkpoints, they are conceptually different and can be a bit more expensive to produce and restore from. Their design focuses more on portability and operational flexibility, especially with respect to changes to the job. The use case for savepoints is for planned, manual operations. For example, this could be an update of your Flink version, changing your job graph, and so on.
Savepoints are created, owned and deleted solely by the user. That means, Flink does not delete savepoints neither after job termination nor after restore.
beginTransaction: a transaction bundles all writes between two checkpoints, so writes are always within a scope of a transaction. This function is called at the beginning of a new checkpoint. So here you can open a DB transaction if your DB supports it, or create a temporary file in the file system. All subsequent event processing will use it until the next checkpoint.
preCommit: the pre-commit is called by the sink once it gets the checkpoint barrier after successfully persists its internal state. This will be called by every sink so that Flink JobManager (the coordinator) can commit the checkpoint only after all sinks perform the pre-commit successfully. Here, you can flush the file, close it and never write to it again. Or alternatively, start a new DB transaction for any subsequent writes that belong to the next checkpoint.
commit: the commit will be called by every sink only once the JobManager notifies them that the checkpoint is completed. In this phase, you can atomically move the pre-committed file to the actual destination directory or alternatively commit the DB transaction.
abort: abort function will be called as distributed checkpoint has been aborted or when aborting a transaction that was rejected by a coordinator after a failure. Here, for example, you can delete the temporary file or rollback the DB transaction.
A Split is a portion of data consumed by the source, like a file or a log partition. Splits are the granularity by which the source distributes the work and parallelizes reading data.
The SourceReader requests Splits and processes them, for example by reading the file or log partition represented by the Split. The SourceReaders run in parallel on the Task Managers in the SourceOperators and produce the parallel stream of events/records.
The SplitEnumerator generates the Splits and assigns them to the SourceReaders. It runs as a single instance on the Job Manager and is responsible for maintaining the backlog of pending Splits and assigning them to the readers in a balanced manner.
Thank you very much for enroll my course. This is a complete instructional video about Apache Flink 1.17.x. In this video, I will introduce it in great detail from shallow to deep and practice the usage details of each Apache Flink framework.
Apache Flink is a framework and distributed processing engine for stateful computation on unbounded and bounded data streams. Flink is designed to run in all common cluster environments, performing computations at memory speeds and at any scale.
【Course Features】
Code and cases Driven
A large number of cases
From shallow to deep
Course content is compact
Covers most of knowledge of the Apache Flink framework
Rich comprehensive cases
[Course Outline]
Apache Flink 1.17.2 Quick Start
Detailed explanation of Apache Flink 1.17.2 Job Deployment Mode
A detailed explanation of the runtime architecture of Apache Flink 1.17.2
Detailed explanation of Apache Flink 1.17.2 DataStream API and real-time application development
Detailed explanation of Apache Flink 1.17.2 Window API and real-time application development
In-depth interpretation of Apache Flink 1.17.2 Watermark-processing late and out-of-order data
Detailed explanation of Apache Flink 1.17.2 Stateful operators and Application
Detailed explanation of Apache Flink 1.17.2 Checkpoints & SavePoints & Exactly Once Semantic
I hope you like this course. After studying this course, you will become an expert in Apache Flink and be able to build complex real-time event processing applications based on Apache Flink.