The Foundations for Building an Apache Flink Application

Understanding stream processing using Flink from bottom-up; cloud services and other platforms provide stream processing solutions (for some, Flink is integrated under the hood). If you miss the basics, this guide is for you.

Our monolith solution does not cope with the increased load of incoming data, and thus it has to evolve. This is the time for the next generation of our product. Stream processing is the new data ingestion paradigm, as compared to the batch processing we’ve implemented so far.

So, my team embarked on processing information using Flink. There is an abundance of articles about Flink’s features and benefits, Cloudera shared an excellent slide about Flink; this article is a practical hands-on guide on how to build a simple stream processing application starting from the basics.

Apache Flink in Two Sentences

Apache Flink is a scalable distributed stream-processing framework, meaning being able to process continuous streams of data. This framework provides a variety of functionalities: sources, stream transformations, parallel processing, scheduling, resource assignments, and a wide range of destinations. Some of its connectors are HDFS, Kafka, Amazon Kinesis, RabbitMQ, and Cassandra.

Flink is known for its high throughput and low latency, supporting exactly-one consistency (all data is processed once without duplications), and it also supports high availability. Like any other successful open-source product, it has a broad community that cultivates and extends its features.

Flink can process indefinite data streams or definite data sets. This blog will focus on the former (working with DataStream objects).

Streams Processing — The Challenges

Nowadays, when IoT devices and other sensors are ubiquitous, data is flowing endlessly from many sources. This endless flow of data forces the traditional batch computation to adapt.

  • This data is unbounded; there is no start and end.
  • Unpredictable and inconsistent intervals of new data.
  • Data can be out of order, with various timestamps.

Due to these unique characteristics, processing and querying data are intricate tasks. Results change rapidly, and it is almost impossible to obtain definite conclusions; at times, the computation may be hindered when trying to produce valid results. Moreover, the results are not repeatable since the data keeps on changing. Lastly, the latency is a factor as it impacts the accuracy of the results.

Apache Flink copes with these problems by processing based on timestamps in the source of the incoming data. It has a mechanism to accumulate events based on their timestamp before applying the processing execution. It eliminates the use of micro-batches, and with that, it improves the accuracy of the results.

Flink implements exactly once consistency, which ensures the correctness of computations without the developer programming for it.

The Foundations — Flink Packages Building blocks

Flink predominantly ingests streams from various sources. The basic object is DataStream<T> , which represents a stream of elements of the same type; its elements’ type is defined in compile time by setting the generic type T (read here about the DataStream object).

The DataStream object contains many useful methods to transform, split, and filter its data[1]. Familiarity with the methods map, reduce, and filter is a good start; these are the main transformation methods:

  • Map: receives T object and returns a result of an object of type R; the MapFunction is applied exactly once on each element of the DataStream object.
SingleOutputStreamOperator<R> map(MapFunction<T,R> mapper)
  • Reduce: receives two consecutive values and returns one object after combining them into the same object type; this method runs on all values in the group until only a single value remains.
T reduce(T value1, T value2)
  • Filter: receives T object and returns a stream of T objects; this method runs on each element in the DataStream but returns only those which the function returns true.
SingleOutputStreamOperator<T> filter(FilterFunction<T> filter)

Data Sink

Besides transforming the data, Flink’s main purpose is to steer streams, after processing them into different destinations. These destinations are called “sinks”. Flink has built-in sinks (text, CSV, socket), as well as out-of-the-box connectors to other systems (such as Apache Kafka)[2].

Flink Event Timestamps

The notion of time is paramount for processing data streams. There are three options to define a timestamp:

  • Processing time (the default option): refers to the system time of the machine that executes the stream processing operation, and thus it is the simplest notion of time; it does not require any coordination between streams and machines. Since it is based on the machine’s time, it provides the best performance and the lowest latency.
    The drawback of using processing time is significant in distributed and asynchronous environments since it is not a deterministic method. The timestamp of the stream’s events can go out of sync if there’s a gap between machines’ clocks; network latency can also create a gap between the time an event left one machine and arrived at the other.
// Setting the Processing Time attribute of StreamExecutionEnvironment objectstreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  • Event time: refers to the time that each individual event received on its producing source, before entering Flink. The event time is embedded in the event itself and can be extracted so Flink can process it properly.
    Since the timestamp is not set by Flink, there should be a mechanism to signal the event should be processed or not; this mechanism is called Watermark. This topic is beyond the scope of this blog-post (since I wanted to keep it concise); you can find more information in Flink documentation.
// Defining the Event Time as the timestamp method
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream<String> dataStream
   = streamEnv.readFile(auditFormat,
          dataDir,    // the progon of the events
                  new TimestampExtractor());// ... more code ...// Defining a class to extract the timestamp from the stream events
public class TimestampExtractor implements 
public Watermark getCurrentWatermark() {
      return new Watermark(System.currentTimeMillis()-maxTimeFrame);
public long extractTimestamp(String str, long l) {
     return InputData.getDataObject(str).timestamp;
  • Ingestion time: refers to the time that the event enters Flink; it is assigned once at the source, and thus considers as more stable than processing time, which is assigned upon commencing the process.
    Ingestion time cannot handle out-of-order events or late data since the timestamp is set once the ingestion starts, as opposed to event time that has the feature to identify delayed events and handle them based on the watermarking mechanism.
// Setting the Ingestion Time attribute of StreamExecutionEnvironment

You can read more about timestamp and how it affects the stream processing in the following link.


By definition, a stream is endless; therefore, the mechanism for processing is by defining frames (a time-based window, for example). With that, the stream is divided into buckets for aggregation and analysis. The window definition is an operation on a DataStream object or one of its inheritor.

There are several time-based windows:

  • Tumbling Window (the default configuration):
    The stream is divided into equivalent-sized windows, without any overlapping. As long as the stream flows, Flink calculates the data based on this fixed time-frame continuously.
Tumbling Window Illustration

Code implementation:

// To be used for a non-keyed stream
public AllWindowedStream<T,TimeWindow> timeWindowAll(Time size)

// Tumbling window for a key-based stream
public WindowedStream<T,KEY,TimeWindow> timeWindow(Time size)
  • Sliding window
    An overlapping window that is composed of window size and an offset (when to start the next window). With that, events can be processed in more than one window in a given time.
Sliding Window Illustration

and this is how it looks in the code:

// sliding time window of 1 minute length and 30 secs trigger interval
dataStreamObject.timeWindow(Time.minutes(1), Time.seconds(30))
  • Session window
    Includes all events under the session’s boundary. A session ends when there is no activity or no events after a defined time-frame. This time-frame can be fixed or dynamic, based on the processed events. Theoretically, if the session’s gap between events is smaller than the size of the window, the session can never end.
Session Window Illustration

The first code snippet below exemplifies a fixed time-based session (2 seconds). The second session window implements a dynamic window, base on the stream’s events.

// Defining a fixed session window of 2 seconds

// Defining a dynamic window session, which can be set by the stream elements
dataStreamObject.window(EventTimeSessionWindows.withDynamicGap((elem) -> {
        // return the session gap, which can be based on the stream's events
  • Global window
    Treats the entire stream as one single window.
Global Window Illustration

Flink also enables the implementation of custom windows with user-defined logic, which will be a topic for another blog-post 😎.

Besides time-based windows, there are other windows such as Count Window, which verges the limits by the number of the incoming events; once an X threshold has reached, Flink processes X events.

A count window of three events

After the theoretical introduction, let’s dive into a practical data flow. You can find more information about Apache Flink and stream processes on the official website.

Steaming Flow Description

To recap the theoretical part, the below diagram portrays the main data flow of the codes samples in this blog-post. The flow below starts from a source (files are written into a folder) and continues with processing the events into objects.

The depicted implementation below is composed of two processing tracks. The one on the tops splits a stream into two side streams and then merges them to form a third type of stream. The scenario in the bottom processes a stream and then transfers the results into a sink.

The next part aims to convert the theoretical stream processing into tangible practice; you can find the full source code on GitHub.

Basic Stream Handling (example #1)

Starting with a basic application is much easier to grasp the concepts of Flink. In this application, the producer writes files into a folder, which simulates a flowing stream. Flink reads files from this folder, process them, and writes a summary into a destination folder; this is the sink.

Now let’s focus on the process part:

  1. Converting the raw data into an object:
// Convert each record to an InputData object; each new line is considered a new record
DataStream<InputData> inputDataObjectStream
= dataStream
.map((MapFunction<String, InputData>) inputStr -> {
System.out.println("— Received Record : " + inputStr);
return InputData.getDataObject(inputStr);

2. The code sample below converts the stream object (InputData) into a Tuple of string and integer. It extracts only certain fields from a stream of objects, grouping them by one field in quants of two seconds.

// Convert each record to a Tuple with name and score
DataStream<Tuple2<String, Integer>> userCounts
= inputDataObjectStream
.map(new MapFunction<InputData,Tuple2<String,Integer>>() {
public Tuple2<String,Integer> map(InputData item) {
return new Tuple2<String,Integer>(item.getName() ,item.getScore() );
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(0) // returns KeyedStream<T, Tuple> based on the first item ('name' fields)
//.timeWindowAll(Time.seconds(windowInterval)) // DO NOT use timeWindowAll for a key-based stream
.timeWindow(Time.seconds(2)) // return WindowedStream<T, KEY, TimeWindow>
.reduce((x,y) -> new Tuple2<String,Integer>( x.f0+"-"+y.f0, x.f1+y.f1));

3. Creating a destination for the stream (implementing data sink):

Splitting Streams (example #2)

In this example, we demonstrate how to split the main stream while using the side output streams. Flink enables producing multiple side streams from the main DataStream . The type of data resides in each side stream can vary from the main stream and from each side stream as well.

So, using a side output stream can kill two birds in one shot: splitting the stream and converts the may stream type into multiple data types (can be unique for each side output stream).

The code sample below calls the ProcessFunction that splits a stream into two side stream based on a property of the input. To obtain the same result, we should have been using the function filter more than once.

The ProcessFunction collects certain objects (based on criteria) to the main output collector (captures in the SingleOutputStreamOperator), while adding other events to side outputs. The DataStream is split vertically and publish different formats for each side stream.

Notice the side output stream definition is based on a unique output tag (OutputTag object).

Merging Streams (example #3)

The last operation in this blog-post demonstrates the operation of merging stream. The idea is to combine two different streams, which can differ in their data format, and produce one stream with a unified data structure. As opposed to an SQL merge operation, which merges data horizontally, the operation of the merging stream is vertical since the events continue to flow without any bounded time frame.

Merging streams is done by calling the method connect, and then defining the map operation on each element in each individual stream. The result is a merged stream.

// The returned stream definition includes both streams data type
ConnectedStreams<Tuple2<String, Integer>, Tuple2<String, String>> mergedStream
= singerStream
DataStream<Tuple4<String, String, String, Integer>> combinedStream
= CoMapFunction<
Tuple2<String, Integer>, //Stream 1
Tuple2<String, String>, //Stream 2
Tuple4<String, String, String, Integer> //Output
>() {
public Tuple4<String, String, String, Integer> //Process Stream 1
map1(Tuple2<String, Integer> singer) throws Exception {
return new Tuple4<String, String, String, Integer>
("Source: singer stream", singer.f0, "", singer.f1);
public Tuple4<String, String, String, Integer> //Process Stream 2
map2(Tuple2<String, String> player) throws Exception {
return new Tuple4<String, String, String, Integer>
("Source: player stream", player.f0, player.f1, 0);

Building a Workable Project

Bringing it all together: I uploaded a demo project to GitHub. You can follow the instructions on how to build and compile it. This is a good start to play with Flink.

I hope you find this repo useful. Do not hesitate to contact me if you have any issues.

The Takeaways

This article focused on the essential foundations to build a working stream processing application based on Flink. Its purpose is to provide a basic understanding of stream processing challenges and set the foundations for building a stand-alone Flink application.

Since stream processing has many aspects and complexities, many topics were not covered. For example, Flink execution and task management, using watermarks to set Event time into the stream events, planting states in the stream’s events, running stream iterations, executing SQL-like queries on streams, and much more. I hope to cover some of these topics in subsequent articles.

I hope this blog equipped you with essential information to start using Flink.

Keep on coding 😎.

— Lior

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s