QuickStart: Table API#

This document is a short introduction to the PyFlink Table API, which is used to help novice users quickly understand the basic usage of PyFlink Table API.

You can run the latest version of these examples by yourself in ‘Live Notebook: Table’ at the quickstart page.

For advanced usage, you can refer to the latest version of PyFlink Table API doc

TableEnvironment Creation#

TableEnvironment is the entry point and central context for creating Table and SQL API programs. Flink is an unified streaming and batch computing engine, which provides unified streaming and batch API to create a TableEnvironment. TableEnvironment is responsible for:

  • Table management: Table Creation, listing Tables, Conversion between Table and DataStream, etc.

  • User-defined function management: User-defined function registration, dropping, listing, etc.

  • Executing SQL queries

  • Job configuration

  • Python dependency management

  • Job submission

For more details of how to create a TableEnvironment, you can refer to the latest version Create a TableEnvironment

[1]:
# Create a batch TableEnvironment
from pyflink.table import EnvironmentSettings, TableEnvironment

env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
table_env
[1]:
<pyflink.table.table_environment.TableEnvironment at 0x7fcd16342ac8>
[2]:
# Create a streaming TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env

[2]:
<pyflink.table.table_environment.TableEnvironment at 0x7fcd1ad0c0f0>

Table Creation#

Table is a core component of the Python Table API. A Table object describes a pipeline of data transformations. It does not contain the data itself in any way. Instead, it describes how to read data from a table source, how to add some compute on data and how to eventually write data to a table sink. The declared pipeline can be printed, optimized, and eventually executed in a cluster. The pipeline can work with bounded or unbounded streams which enables both streaming and batch scenarios.

A Table is always bound to a specific TableEnvironment. It is not possible to combine tables from different TableEnvironments in same query, e.g., to join or union them.

Firstly, you can create a Table from a Python List Object

[3]:
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')])
table.get_schema()
[3]:
root
 |-- _1: BIGINT
 |-- _2: STRING

Create a Table with an explicit schema.

[4]:
from pyflink.table import DataTypes
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')],
                                DataTypes.ROW([DataTypes.FIELD("id", DataTypes.TINYINT()),
                                               DataTypes.FIELD("data", DataTypes.STRING())]))
table.get_schema()
[4]:
root
 |-- id: TINYINT
 |-- data: STRING

Create a Table from a Pandas DataFrame

[5]:
import pandas as pd
df = pd.DataFrame({'id': [1, 2], 'data': ['Hi', 'Hello']})
table = table_env.from_pandas(df)
table.get_schema()
/Users/duanchen/sourcecode/flink/flink-python/dev/.conda/lib/python3.7/site-packages/pyflink/table/utils.py:55: FutureWarning: Schema passed to names= option, please pass schema= explicitly. Will raise exception in future
  return pa.RecordBatch.from_arrays(arrays, schema)
[5]:
root
 |-- id: BIGINT
 |-- data: STRING

Create a Table from DDL statements

[6]:
table_env.execute_sql("""
    CREATE TABLE random_source (
        id TINYINT,
        data STRING
    ) WITH (
        'connector' = 'datagen',
        'fields.id.kind' = 'sequence',
        'fields.id.start' = '1',
        'fields.id.end' = '2',
        'fields.data.kind' = 'random'
    )
""")
table = table_env.from_path("random_source")
table.get_schema()
[6]:
root
 |-- id: TINYINT
 |-- data: STRING

Create a Table from TableDescriptor

[7]:
from pyflink.table import DataTypes
from pyflink.table.schema import Schema
from pyflink.table.table_descriptor import TableDescriptor


schema = (Schema.new_builder()
         .column('id', DataTypes.TINYINT())
         .column('data', DataTypes.STRING())
         .build())

table = table_env.from_descriptor(
    TableDescriptor
        .for_connector('datagen')
        .option('fields.id.kind', 'sequence')
        .option('fields.id.start', '1')
        .option('fields.id.end', '2')
        .option('fields.data.kind', 'random')
        .schema(schema)
        .build())
table.get_schema()
[7]:
root
 |-- id: TINYINT
 |-- data: STRING

Create a Table from a DataStream

[8]:
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

# create a StreamExecutionEnvironment which is the entry point of `DataStream` program.
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)

ds = env.from_collection([(1, 'Hi'), (2, 'Hello')],
                              type_info=Types.ROW_NAMED(
                                  ["id", "data"],
                                  [Types.BYTE(), Types.STRING()]))

table = t_env.from_data_stream(ds,
                               Schema.new_builder()
                               .column("id", DataTypes.TINYINT())
                               .column("data", DataTypes.STRING())
                               .build())
table.get_schema()
[8]:
root
 |-- id: TINYINT
 |-- data: STRING

Create a Table from Catalog

[9]:
# prepare the catalog
# register Table API tables in the catalog
old_table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table_env.create_temporary_view('source_table', old_table)

# create Table API table from catalog
table = table_env.from_path('source_table')
table.get_schema()
[9]:
root
 |-- id: BIGINT
 |-- data: STRING

Viewing Data on Table#

You can get the schema of Table as follows:

[10]:
table.get_schema()
[10]:
root
 |-- id: BIGINT
 |-- data: STRING
[11]:
table.print_schema()
(
  `id` BIGINT,
  `data` STRING
)

Table.execute() collects the contents of the current Table to local client.

[12]:
list(table.execute().collect())
[12]:
[<Row(1, 'Hi')>, <Row(2, 'Hello')>]
[13]:
table.execute().print()
+----+----------------------+--------------------------------+
| op |                   id |                           data |
+----+----------------------+--------------------------------+
| +I |                    1 |                             Hi |
| +I |                    2 |                          Hello |
+----+----------------------+--------------------------------+
2 rows in set

PyFlink Table also provides the conversion back to a pandas DataFrame to leverage pandas API.

[14]:
table.to_pandas()
[14]:
id data
0 1 Hi
1 2 Hello

Selecting and Accessing Data on Table#

PyFlink Table is lazily evaluated and simply selecting a column does not trigger the computation but it returns a Column Expression instance.

[15]:
from pyflink.table.expressions import col
type(table.id)==type(col('id'))
[15]:
True

These Column Expressions can be used to select the columns from a Table. For example, Table.select() takes the column Expression instances that returns another Table.

[16]:
table.select(table.id).to_pandas()
[16]:
id
0 1
1 2
[17]:
table.select(col('id')).to_pandas()
[17]:
id
0 1
1 2

Assign new Column Expression instance.

[18]:
table.add_columns(col('data').upper_case.alias('upper_data')).to_pandas()
[18]:
id data upper_data
0 1 Hi HI
1 2 Hello HELLO

To select a subset of rows, use Table.filter().

[19]:
table.filter(col('id') == 1).to_pandas()
[19]:
id data
0 1 Hi

Applying a Function on Table#

PyFlink supports various UDFs and APIs to allow users to execute Python native functions. See also the latest User-defined Functions and Row-based Operations.

The first example is UDFs used in Table API & SQL

[20]:
from pyflink.table.udf import udf

# create a general Python UDF
@udf(result_type=DataTypes.BIGINT())
def plus_one(i):
  return i + 1

table.select(plus_one(col('id'))).to_pandas()
[20]:
_c0
0 2
1 3
[21]:
# create a general Python UDF
@udf(result_type=DataTypes.BIGINT(), func_type='pandas')
def pandas_plus_one(series):
  return series + 1
table.select(pandas_plus_one(col('id'))).to_pandas()
/Users/duanchen/sourcecode/flink/flink-python/dev/.conda/lib/python3.7/site-packages/pyflink/table/utils.py:55: FutureWarning: Schema passed to names= option, please pass schema= explicitly. Will raise exception in future
  return pa.RecordBatch.from_arrays(arrays, schema)
[21]:
_c0
0 2
1 3
[ ]:
# use the Python function in SQL API
table_env.create_temporary_function("plus_one", plus_one)
table_env.sql_query("SELECT plus_one(id) FROM {}".format(table)).to_pandas()

Another example is UDFs used in Row-based Operations

[23]:
from pyflink.common.types import Row
@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
                                DataTypes.FIELD("data", DataTypes.STRING())]))
def func(data: Row):
  return Row(data.id, data.data * 2)
table.map(func).execute().print()
+----+----------------------+--------------------------------+
| op |                   id |                           data |
+----+----------------------+--------------------------------+
| +I |                    1 |                           HiHi |
| +I |                    2 |                     HelloHello |
+----+----------------------+--------------------------------+
2 rows in set

Emits Results of Table#

There are many connectors and formats available in Flink. See also the latest Table & SQL Connectors.

[24]:
# create a `Print` connector
schema = (Schema.new_builder()
         .column('id', DataTypes.BIGINT())
         .column('data', DataTypes.STRING())
         .build())

table.execute_insert(
    TableDescriptor
        .for_connector('print')
        .schema(schema)
        .build())
2> +I[1, Hi]
2> +I[2, Hello]
[24]:
<pyflink.table.table_result.TableResult at 0x7fcd1ba83be0>