QuickStart: Table API#
This document is a short introduction to the PyFlink Table API, which is used to help novice users quickly understand the basic usage of PyFlink Table API.
You can run the latest version of these examples by yourself in ‘Live Notebook: Table’ at the quickstart page.
For advanced usage, you can refer to the latest version of PyFlink Table API doc
TableEnvironment Creation#
TableEnvironment is the entry point and central context for creating Table and SQL API programs. Flink is an unified streaming and batch computing engine, which provides unified streaming and batch API to create a TableEnvironment. TableEnvironment is responsible for:
Tablemanagement:TableCreation, listingTables, Conversion betweenTableandDataStream, etc.User-defined function management: User-defined function registration, dropping, listing, etc.
Executing
SQLqueriesJob configuration
Job submission
For more details of how to create a TableEnvironment, you can refer to the latest version Create a TableEnvironment
[1]:
# Create a batch TableEnvironment
from pyflink.table import EnvironmentSettings, TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
table_env
[1]:
<pyflink.table.table_environment.TableEnvironment at 0x7fcd16342ac8>
[2]:
# Create a streaming TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env
[2]:
<pyflink.table.table_environment.TableEnvironment at 0x7fcd1ad0c0f0>
Table Creation#
Table is a core component of the Python Table API. A Table object describes a pipeline of data transformations. It does not contain the data itself in any way. Instead, it describes how to read data from a table source, how to add some compute on data and how to eventually write data to a table sink. The declared pipeline can be printed, optimized, and eventually executed in a cluster. The pipeline can work with bounded or unbounded streams which enables both streaming and batch
scenarios.
A Table is always bound to a specific TableEnvironment. It is not possible to combine tables from different TableEnvironments in same query, e.g., to join or union them.
Firstly, you can create a Table from a Python List Object
[3]:
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')])
table.get_schema()
[3]:
root
|-- _1: BIGINT
|-- _2: STRING
Create a Table with an explicit schema.
[4]:
from pyflink.table import DataTypes
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')],
DataTypes.ROW([DataTypes.FIELD("id", DataTypes.TINYINT()),
DataTypes.FIELD("data", DataTypes.STRING())]))
table.get_schema()
[4]:
root
|-- id: TINYINT
|-- data: STRING
Create a Table from a Pandas DataFrame
[5]:
import pandas as pd
df = pd.DataFrame({'id': [1, 2], 'data': ['Hi', 'Hello']})
table = table_env.from_pandas(df)
table.get_schema()
/Users/duanchen/sourcecode/flink/flink-python/dev/.conda/lib/python3.7/site-packages/pyflink/table/utils.py:55: FutureWarning: Schema passed to names= option, please pass schema= explicitly. Will raise exception in future
return pa.RecordBatch.from_arrays(arrays, schema)
[5]:
root
|-- id: BIGINT
|-- data: STRING
Create a Table from DDL statements
[6]:
table_env.execute_sql("""
CREATE TABLE random_source (
id TINYINT,
data STRING
) WITH (
'connector' = 'datagen',
'fields.id.kind' = 'sequence',
'fields.id.start' = '1',
'fields.id.end' = '2',
'fields.data.kind' = 'random'
)
""")
table = table_env.from_path("random_source")
table.get_schema()
[6]:
root
|-- id: TINYINT
|-- data: STRING
Create a Table from TableDescriptor
[7]:
from pyflink.table import DataTypes
from pyflink.table.schema import Schema
from pyflink.table.table_descriptor import TableDescriptor
schema = (Schema.new_builder()
.column('id', DataTypes.TINYINT())
.column('data', DataTypes.STRING())
.build())
table = table_env.from_descriptor(
TableDescriptor
.for_connector('datagen')
.option('fields.id.kind', 'sequence')
.option('fields.id.start', '1')
.option('fields.id.end', '2')
.option('fields.data.kind', 'random')
.schema(schema)
.build())
table.get_schema()
[7]:
root
|-- id: TINYINT
|-- data: STRING
Create a Table from a DataStream
[8]:
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
# create a StreamExecutionEnvironment which is the entry point of `DataStream` program.
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
ds = env.from_collection([(1, 'Hi'), (2, 'Hello')],
type_info=Types.ROW_NAMED(
["id", "data"],
[Types.BYTE(), Types.STRING()]))
table = t_env.from_data_stream(ds,
Schema.new_builder()
.column("id", DataTypes.TINYINT())
.column("data", DataTypes.STRING())
.build())
table.get_schema()
[8]:
root
|-- id: TINYINT
|-- data: STRING
Create a Table from Catalog
[9]:
# prepare the catalog
# register Table API tables in the catalog
old_table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table_env.create_temporary_view('source_table', old_table)
# create Table API table from catalog
table = table_env.from_path('source_table')
table.get_schema()
[9]:
root
|-- id: BIGINT
|-- data: STRING
Viewing Data on Table#
You can get the schema of Table as follows:
[10]:
table.get_schema()
[10]:
root
|-- id: BIGINT
|-- data: STRING
[11]:
table.print_schema()
(
`id` BIGINT,
`data` STRING
)
Table.execute() collects the contents of the current Table to local client.
[12]:
list(table.execute().collect())
[12]:
[<Row(1, 'Hi')>, <Row(2, 'Hello')>]
[13]:
table.execute().print()
+----+----------------------+--------------------------------+
| op | id | data |
+----+----------------------+--------------------------------+
| +I | 1 | Hi |
| +I | 2 | Hello |
+----+----------------------+--------------------------------+
2 rows in set
PyFlink Table also provides the conversion back to a pandas DataFrame to leverage pandas API.
[14]:
table.to_pandas()
[14]:
| id | data | |
|---|---|---|
| 0 | 1 | Hi |
| 1 | 2 | Hello |
Selecting and Accessing Data on Table#
PyFlink Table is lazily evaluated and simply selecting a column does not trigger the computation but it returns a Column Expression instance.
[15]:
True
These Column Expressions can be used to select the columns from a Table. For example, Table.select() takes the column Expression instances that returns another Table.
[16]:
table.select(table.id).to_pandas()
[16]:
| id | |
|---|---|
| 0 | 1 |
| 1 | 2 |
[17]:
table.select(col('id')).to_pandas()
[17]:
| id | |
|---|---|
| 0 | 1 |
| 1 | 2 |
Assign new Column Expression instance.
[18]:
table.add_columns(col('data').upper_case.alias('upper_data')).to_pandas()
[18]:
| id | data | upper_data | |
|---|---|---|---|
| 0 | 1 | Hi | HI |
| 1 | 2 | Hello | HELLO |
To select a subset of rows, use Table.filter().
[19]:
table.filter(col('id') == 1).to_pandas()
[19]:
| id | data | |
|---|---|---|
| 0 | 1 | Hi |
Applying a Function on Table#
PyFlink supports various UDFs and APIs to allow users to execute Python native functions. See also the latest User-defined Functions and Row-based Operations.
The first example is UDFs used in Table API & SQL
[20]:
from pyflink.table.udf import udf
# create a general Python UDF
@udf(result_type=DataTypes.BIGINT())
def plus_one(i):
return i + 1
table.select(plus_one(col('id'))).to_pandas()
[20]:
| _c0 | |
|---|---|
| 0 | 2 |
| 1 | 3 |
[21]:
# create a general Python UDF
@udf(result_type=DataTypes.BIGINT(), func_type='pandas')
def pandas_plus_one(series):
return series + 1
table.select(pandas_plus_one(col('id'))).to_pandas()
/Users/duanchen/sourcecode/flink/flink-python/dev/.conda/lib/python3.7/site-packages/pyflink/table/utils.py:55: FutureWarning: Schema passed to names= option, please pass schema= explicitly. Will raise exception in future
return pa.RecordBatch.from_arrays(arrays, schema)
[21]:
| _c0 | |
|---|---|
| 0 | 2 |
| 1 | 3 |
[ ]:
# use the Python function in SQL API
table_env.create_temporary_function("plus_one", plus_one)
table_env.sql_query("SELECT plus_one(id) FROM {}".format(table)).to_pandas()
Another example is UDFs used in Row-based Operations
[23]:
from pyflink.common.types import Row
@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("data", DataTypes.STRING())]))
def func(data: Row):
return Row(data.id, data.data * 2)
table.map(func).execute().print()
+----+----------------------+--------------------------------+
| op | id | data |
+----+----------------------+--------------------------------+
| +I | 1 | HiHi |
| +I | 2 | HelloHello |
+----+----------------------+--------------------------------+
2 rows in set
Emits Results of Table#
There are many connectors and formats available in Flink. See also the latest Table & SQL Connectors.
[24]:
# create a `Print` connector
schema = (Schema.new_builder()
.column('id', DataTypes.BIGINT())
.column('data', DataTypes.STRING())
.build())
table.execute_insert(
TableDescriptor
.for_connector('print')
.schema(schema)
.build())
2> +I[1, Hi]
2> +I[2, Hello]
[24]:
<pyflink.table.table_result.TableResult at 0x7fcd1ba83be0>