QuickStart: DataStream API#

Apache Flink offers a DataStream API for building robust, stateful streaming applications. It provides fine-grained control over state and timer, which allows for the implementation of advanced event-driven systems.

You can run the latest version of these examples by yourself in ‘Live Notebook: DataStream’ at the quickstart page.

For advanced usage, you can refer to the latest version of PyFlink DataStream API doc

StreamExecutionEnvironment Creation#

StreamExecutionEnvironment is the entry point and central concept for creating DataStream API programs. Flink is an unified streaming and batch computing engine, which provides unified streaming and batch API to create a StreamExecutionEnvironment.

StreamExecutionEnvironment is responsible for:

[1]:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream import RuntimeExecutionMode

env = StreamExecutionEnvironment.get_execution_environment()

# Config the Program run in Streaming Mode
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env
[1]:
<pyflink.datastream.stream_execution_environment.StreamExecutionEnvironment at 0x7fd9e8fc6e48>

DataStream Creation#

DataStream is a core component of the Python DataStream API. A DataStream object describes a pipeline of data transformations. It does not contain the data itself in any way. Instead, it describes how to read data from a source, how to add some compute on data and how to eventually write data to a sink. The declared pipeline can be printed, optimized, and eventually executed in a cluster. The pipeline can work with bounded or unbounded streams which enables both streaming and batch scenarios.

A DataStream can be created by a specific StreamExecutionEnvironment.

Firstly, you can create a DataStream from a Python List Object

[2]:
from pyflink.common.typeinfo import Types
ds = env.from_collection([(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')])
# if you don't specify the `type_info`, the default `type_info` is `PickledByteArrayTypeInfo`
ds.get_type()
[2]:
PickledByteArrayTypeInfo

Create a DataStream with an explicit type_info.

[3]:
ds = env.from_collection(
    collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],
    type_info=Types.ROW([Types.INT(), Types.STRING()]))
ds.get_type()
[3]:
RowTypeInfo(f0: Integer, f1: String)

Create a DataStream from DataStream Connectors

[4]:
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream.connectors.number_seq import NumberSequenceSource

env = StreamExecutionEnvironment.get_execution_environment()
seq_num_source = NumberSequenceSource(1, 1000)
ds = env.from_source(
    source=seq_num_source,
    watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
    source_name='seq_num_source',
    type_info=Types.LONG())
ds.get_type()
[4]:
Long

Create a DataStream from a Table

[5]:
from pyflink.table import DataTypes
from pyflink.table import StreamTableEnvironment

# create a `TableEnvironment` which is the entry point of `Table` & `SQL` program.
t_env = StreamTableEnvironment.create(env)
table = t_env.from_elements([(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],
                            DataTypes.ROW([DataTypes.FIELD("id", DataTypes.INT()),
                                           DataTypes.FIELD("data", DataTypes.STRING())]))
ds = t_env.to_data_stream(table)
ds.get_type()
[5]:
ExternalTypeInfo<RowTypeInfo(id: Integer, data: String)>

Viewing Data on DataStream#

DataStream.execute_and_collect() collects the contents of the current DataStream to local client.

[6]:
list(ds.execute_and_collect())
[6]:
[<Row(1, 'aaa|bb')>, <Row(2, 'bb|a')>, <Row(3, 'aaa|a')>]

Print the data of DataStream to the console

[7]:
ds.print()
env.execute()
[7]:
<pyflink.common.job_execution_result.JobExecutionResult at 0x7fd5dcc73550>

Applying a Function on DataStream#

DataStream programs in Flink are regular programs that implement transformations on data streams (e.g., mapping, filtering, reducing). Please see operators for an overview of the available transformations in Python DataStream API.

[7]:
from pyflink.common import Row
from pyflink.datastream import FlatMapFunction

class MyFlatMapFunction(FlatMapFunction):
    def flat_map(self, value):
        for s in str(value.data).split('|'):
            yield Row(value.id, s)

list(ds.flat_map(MyFlatMapFunction(), output_type=Types.ROW([Types.INT(), Types.STRING()])).execute_and_collect())
[7]:
[<Row(1, 'aaa')>,
 <Row(1, 'bb')>,
 <Row(2, 'bb')>,
 <Row(2, 'a')>,
 <Row(3, 'aaa')>,
 <Row(3, 'a')>]

Emits Results of DataStream#

There are many connectors and formats available in Flink. See also the latest DataStream Connectors.

[8]:
from pyflink.common import Encoder
from pyflink.datastream.connectors.file_system import FileSink, RollingPolicy

def split(s):
    splits = s[1].split('|')
    for sp in splits:
        yield s[0], sp

sink = (FileSink
    .for_row_format('/tmp/sink', Encoder.simple_string_encoder("UTF-8"))
    .with_rolling_policy(RollingPolicy.default_rolling_policy(
        part_size=1024 ** 3, rollover_interval=15 * 60 * 1000, inactivity_interval=5 * 60 * 1000))
    .build())

ds.map(lambda i: (i[0] + 1, i[1]), Types.TUPLE([Types.INT(), Types.STRING()])).sink_to(sink)
# the result will be stored in the directory of /tmp/sink.
env.execute()
[8]:
<pyflink.common.job_execution_result.JobExecutionResult at 0x7fd9ececa080>