QuickStart: DataStream API#
Apache Flink offers a DataStream API for building robust, stateful streaming applications. It provides fine-grained control over state and timer, which allows for the implementation of advanced event-driven systems.
You can run the latest version of these examples by yourself in ‘Live Notebook: DataStream’ at the quickstart page.
For advanced usage, you can refer to the latest version of PyFlink DataStream API doc
StreamExecutionEnvironment Creation#
StreamExecutionEnvironment
is the entry point and central concept for creating DataStream API programs. Flink is an unified streaming and batch computing engine, which provides unified streaming and batch API to create a StreamExecutionEnvironment
.
StreamExecutionEnvironment
is responsible for:
DataStream
CreationJob configuration
Job submission
[1]:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream import RuntimeExecutionMode
env = StreamExecutionEnvironment.get_execution_environment()
# Config the Program run in Streaming Mode
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env
[1]:
<pyflink.datastream.stream_execution_environment.StreamExecutionEnvironment at 0x7fd9e8fc6e48>
DataStream Creation#
DataStream
is a core component of the Python DataStream API. A DataStream
object describes a pipeline of data transformations. It does not contain the data itself in any way. Instead, it describes how to read data from a source, how to add some compute on data and how to eventually write data to a sink. The declared pipeline can be printed, optimized, and eventually executed in a cluster. The pipeline can work with bounded or unbounded streams which enables both streaming and batch
scenarios.
A DataStream
can be created by a specific StreamExecutionEnvironment
.
Firstly, you can create a DataStream
from a Python List
Object
[2]:
from pyflink.common.typeinfo import Types
ds = env.from_collection([(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')])
# if you don't specify the `type_info`, the default `type_info` is `PickledByteArrayTypeInfo`
ds.get_type()
[2]:
PickledByteArrayTypeInfo
Create a DataStream
with an explicit type_info
.
[3]:
ds = env.from_collection(
collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],
type_info=Types.ROW([Types.INT(), Types.STRING()]))
ds.get_type()
[3]:
RowTypeInfo(f0: Integer, f1: String)
Create a DataStream
from DataStream Connectors
[4]:
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream.connectors.number_seq import NumberSequenceSource
env = StreamExecutionEnvironment.get_execution_environment()
seq_num_source = NumberSequenceSource(1, 1000)
ds = env.from_source(
source=seq_num_source,
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
source_name='seq_num_source',
type_info=Types.LONG())
ds.get_type()
[4]:
Long
Create a DataStream
from a Table
[5]:
from pyflink.table import DataTypes
from pyflink.table import StreamTableEnvironment
# create a `TableEnvironment` which is the entry point of `Table` & `SQL` program.
t_env = StreamTableEnvironment.create(env)
table = t_env.from_elements([(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],
DataTypes.ROW([DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("data", DataTypes.STRING())]))
ds = t_env.to_data_stream(table)
ds.get_type()
[5]:
ExternalTypeInfo<RowTypeInfo(id: Integer, data: String)>
Viewing Data on DataStream#
DataStream.execute_and_collect()
collects the contents of the current DataStream
to local client.
[6]:
list(ds.execute_and_collect())
[6]:
[<Row(1, 'aaa|bb')>, <Row(2, 'bb|a')>, <Row(3, 'aaa|a')>]
Print the data of DataStream
to the console
[7]:
ds.print()
env.execute()
[7]:
<pyflink.common.job_execution_result.JobExecutionResult at 0x7fd5dcc73550>
Applying a Function on DataStream#
DataStream
programs in Flink are regular programs that implement transformations on data streams (e.g., mapping, filtering, reducing). Please see operators for an overview of the available transformations in Python DataStream API.
[7]:
from pyflink.common import Row
from pyflink.datastream import FlatMapFunction
class MyFlatMapFunction(FlatMapFunction):
def flat_map(self, value):
for s in str(value.data).split('|'):
yield Row(value.id, s)
list(ds.flat_map(MyFlatMapFunction(), output_type=Types.ROW([Types.INT(), Types.STRING()])).execute_and_collect())
[7]:
[<Row(1, 'aaa')>,
<Row(1, 'bb')>,
<Row(2, 'bb')>,
<Row(2, 'a')>,
<Row(3, 'aaa')>,
<Row(3, 'a')>]
Emits Results of DataStream#
There are many connectors and formats available in Flink. See also the latest DataStream Connectors.
[8]:
from pyflink.common import Encoder
from pyflink.datastream.connectors.file_system import FileSink, RollingPolicy
def split(s):
splits = s[1].split('|')
for sp in splits:
yield s[0], sp
sink = (FileSink
.for_row_format('/tmp/sink', Encoder.simple_string_encoder("UTF-8"))
.with_rolling_policy(RollingPolicy.default_rolling_policy(
part_size=1024 ** 3, rollover_interval=15 * 60 * 1000, inactivity_interval=5 * 60 * 1000))
.build())
ds.map(lambda i: (i[0] + 1, i[1]), Types.TUPLE([Types.INT(), Types.STRING()])).sink_to(sink)
# the result will be stored in the directory of /tmp/sink.
env.execute()
[8]:
<pyflink.common.job_execution_result.JobExecutionResult at 0x7fd9ececa080>