QuickStart: Table API#
This document is a short introduction to the PyFlink Table API, which is used to help novice users quickly understand the basic usage of PyFlink Table API.
You can run the latest version of these examples by yourself in ‘Live Notebook: Table’ at the quickstart page.
For advanced usage, you can refer to the latest version of PyFlink Table API doc
TableEnvironment Creation#
TableEnvironment
is the entry point and central context for creating Table and SQL API programs. Flink is an unified streaming and batch computing engine, which provides unified streaming and batch API to create a TableEnvironment
. TableEnvironment
is responsible for:
Table
management:Table
Creation, listingTable
s, Conversion betweenTable
andDataStream
, etc.User-defined function management: User-defined function registration, dropping, listing, etc.
Executing
SQL
queriesJob configuration
Job submission
For more details of how to create a TableEnvironment
, you can refer to the latest version Create a TableEnvironment
[1]:
# Create a batch TableEnvironment
from pyflink.table import EnvironmentSettings, TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
table_env
[1]:
<pyflink.table.table_environment.TableEnvironment at 0x7fcd16342ac8>
[2]:
# Create a streaming TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env
[2]:
<pyflink.table.table_environment.TableEnvironment at 0x7fcd1ad0c0f0>
Table Creation#
Table
is a core component of the Python Table API. A Table
object describes a pipeline of data transformations. It does not contain the data itself in any way. Instead, it describes how to read data from a table source, how to add some compute on data and how to eventually write data to a table sink. The declared pipeline can be printed, optimized, and eventually executed in a cluster. The pipeline can work with bounded or unbounded streams which enables both streaming and batch
scenarios.
A Table
is always bound to a specific TableEnvironment
. It is not possible to combine tables from different TableEnvironment
s in same query, e.g., to join or union them.
Firstly, you can create a Table
from a Python List
Object
[3]:
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')])
table.get_schema()
[3]:
root
|-- _1: BIGINT
|-- _2: STRING
Create a Table
with an explicit schema.
[4]:
from pyflink.table import DataTypes
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')],
DataTypes.ROW([DataTypes.FIELD("id", DataTypes.TINYINT()),
DataTypes.FIELD("data", DataTypes.STRING())]))
table.get_schema()
[4]:
root
|-- id: TINYINT
|-- data: STRING
Create a Table
from a Pandas DataFrame
[5]:
import pandas as pd
df = pd.DataFrame({'id': [1, 2], 'data': ['Hi', 'Hello']})
table = table_env.from_pandas(df)
table.get_schema()
/Users/duanchen/sourcecode/flink/flink-python/dev/.conda/lib/python3.7/site-packages/pyflink/table/utils.py:55: FutureWarning: Schema passed to names= option, please pass schema= explicitly. Will raise exception in future
return pa.RecordBatch.from_arrays(arrays, schema)
[5]:
root
|-- id: BIGINT
|-- data: STRING
Create a Table
from DDL statements
[6]:
table_env.execute_sql("""
CREATE TABLE random_source (
id TINYINT,
data STRING
) WITH (
'connector' = 'datagen',
'fields.id.kind' = 'sequence',
'fields.id.start' = '1',
'fields.id.end' = '2',
'fields.data.kind' = 'random'
)
""")
table = table_env.from_path("random_source")
table.get_schema()
[6]:
root
|-- id: TINYINT
|-- data: STRING
Create a Table
from TableDescriptor
[7]:
from pyflink.table import DataTypes
from pyflink.table.schema import Schema
from pyflink.table.table_descriptor import TableDescriptor
schema = (Schema.new_builder()
.column('id', DataTypes.TINYINT())
.column('data', DataTypes.STRING())
.build())
table = table_env.from_descriptor(
TableDescriptor
.for_connector('datagen')
.option('fields.id.kind', 'sequence')
.option('fields.id.start', '1')
.option('fields.id.end', '2')
.option('fields.data.kind', 'random')
.schema(schema)
.build())
table.get_schema()
[7]:
root
|-- id: TINYINT
|-- data: STRING
Create a Table
from a DataStream
[8]:
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
# create a StreamExecutionEnvironment which is the entry point of `DataStream` program.
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
ds = env.from_collection([(1, 'Hi'), (2, 'Hello')],
type_info=Types.ROW_NAMED(
["id", "data"],
[Types.BYTE(), Types.STRING()]))
table = t_env.from_data_stream(ds,
Schema.new_builder()
.column("id", DataTypes.TINYINT())
.column("data", DataTypes.STRING())
.build())
table.get_schema()
[8]:
root
|-- id: TINYINT
|-- data: STRING
Create a Table
from Catalog
[9]:
# prepare the catalog
# register Table API tables in the catalog
old_table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table_env.create_temporary_view('source_table', old_table)
# create Table API table from catalog
table = table_env.from_path('source_table')
table.get_schema()
[9]:
root
|-- id: BIGINT
|-- data: STRING
Viewing Data on Table#
You can get the schema of Table
as follows:
[10]:
table.get_schema()
[10]:
root
|-- id: BIGINT
|-- data: STRING
[11]:
table.print_schema()
(
`id` BIGINT,
`data` STRING
)
Table.execute()
collects the contents of the current Table
to local client.
[12]:
list(table.execute().collect())
[12]:
[<Row(1, 'Hi')>, <Row(2, 'Hello')>]
[13]:
table.execute().print()
+----+----------------------+--------------------------------+
| op | id | data |
+----+----------------------+--------------------------------+
| +I | 1 | Hi |
| +I | 2 | Hello |
+----+----------------------+--------------------------------+
2 rows in set
PyFlink Table also provides the conversion back to a pandas DataFrame to leverage pandas API.
[14]:
table.to_pandas()
[14]:
id | data | |
---|---|---|
0 | 1 | Hi |
1 | 2 | Hello |
Selecting and Accessing Data on Table#
PyFlink Table is lazily evaluated and simply selecting a column does not trigger the computation but it returns a Column Expression
instance.
[15]:
True
These Column Expression
s can be used to select the columns from a Table
. For example, Table.select()
takes the column Expression
instances that returns another Table
.
[16]:
table.select(table.id).to_pandas()
[16]:
id | |
---|---|
0 | 1 |
1 | 2 |
[17]:
table.select(col('id')).to_pandas()
[17]:
id | |
---|---|
0 | 1 |
1 | 2 |
Assign new Column Expression
instance.
[18]:
table.add_columns(col('data').upper_case.alias('upper_data')).to_pandas()
[18]:
id | data | upper_data | |
---|---|---|---|
0 | 1 | Hi | HI |
1 | 2 | Hello | HELLO |
To select a subset of rows, use Table.filter()
.
[19]:
table.filter(col('id') == 1).to_pandas()
[19]:
id | data | |
---|---|---|
0 | 1 | Hi |
Applying a Function on Table#
PyFlink supports various UDFs and APIs to allow users to execute Python native functions. See also the latest User-defined Functions and Row-based Operations.
The first example is UDFs used in Table API
& SQL
[20]:
from pyflink.table.udf import udf
# create a general Python UDF
@udf(result_type=DataTypes.BIGINT())
def plus_one(i):
return i + 1
table.select(plus_one(col('id'))).to_pandas()
[20]:
_c0 | |
---|---|
0 | 2 |
1 | 3 |
[21]:
# create a general Python UDF
@udf(result_type=DataTypes.BIGINT(), func_type='pandas')
def pandas_plus_one(series):
return series + 1
table.select(pandas_plus_one(col('id'))).to_pandas()
/Users/duanchen/sourcecode/flink/flink-python/dev/.conda/lib/python3.7/site-packages/pyflink/table/utils.py:55: FutureWarning: Schema passed to names= option, please pass schema= explicitly. Will raise exception in future
return pa.RecordBatch.from_arrays(arrays, schema)
[21]:
_c0 | |
---|---|
0 | 2 |
1 | 3 |
[ ]:
# use the Python function in SQL API
table_env.create_temporary_function("plus_one", plus_one)
table_env.sql_query("SELECT plus_one(id) FROM {}".format(table)).to_pandas()
Another example is UDFs used in Row-based Operations
[23]:
from pyflink.common.types import Row
@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("data", DataTypes.STRING())]))
def func(data: Row):
return Row(data.id, data.data * 2)
table.map(func).execute().print()
+----+----------------------+--------------------------------+
| op | id | data |
+----+----------------------+--------------------------------+
| +I | 1 | HiHi |
| +I | 2 | HelloHello |
+----+----------------------+--------------------------------+
2 rows in set
Emits Results of Table#
There are many connectors and formats available in Flink. See also the latest Table & SQL Connectors.
[24]:
# create a `Print` connector
schema = (Schema.new_builder()
.column('id', DataTypes.BIGINT())
.column('data', DataTypes.STRING())
.build())
table.execute_insert(
TableDescriptor
.for_connector('print')
.schema(schema)
.build())
2> +I[1, Hi]
2> +I[2, Hello]
[24]:
<pyflink.table.table_result.TableResult at 0x7fcd1ba83be0>