{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "# QuickStart: Table API\n",
    "\n",
    "This document is a short introduction to the PyFlink Table API, which is used to help novice users quickly understand the basic usage of PyFlink Table API. \n",
    "\n",
    "You can run the latest version of these examples by yourself in 'Live Notebook: Table' at [the quickstart page](https://pyflink.readthedocs.io/en/latest/getting_started/index.html).\n",
    "\n",
    "For advanced usage, you can refer to the latest version of [PyFlink Table API doc](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/table/intro_to_table_api/)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "## TableEnvironment Creation\n",
    "\n",
    "`TableEnvironment` is the entry point and central context for creating Table and SQL API programs. Flink is an unified streaming and batch computing engine, which provides unified streaming and batch API to create a `TableEnvironment`.\n",
    "`TableEnvironment` is responsible for:\n",
    "\n",
    "* `Table` management: `Table` Creation, listing `Table`s, Conversion between `Table` and `DataStream`, etc.\n",
    "\n",
    "* User-defined function management: User-defined function registration, dropping, listing, etc.\n",
    "\n",
    "* Executing `SQL` queries\n",
    "\n",
    "* Job configuration\n",
    "\n",
    "* [Python dependency management](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/dependency_management/)\n",
    "\n",
    "* Job submission\n",
    "\n",
    "For more details of how to create a `TableEnvironment`, you can refer to the latest version [Create a TableEnvironment](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/table/table_environment/#create-a-tableenvironment)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "pycharm": {
     "is_executing": false,
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<pyflink.table.table_environment.TableEnvironment at 0x7fcd16342ac8>"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Create a batch TableEnvironment\n",
    "from pyflink.table import EnvironmentSettings, TableEnvironment\n",
    "\n",
    "env_settings = EnvironmentSettings.in_batch_mode()\n",
    "table_env = TableEnvironment.create(env_settings)\n",
    "table_env"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<pyflink.table.table_environment.TableEnvironment at 0x7fcd1ad0c0f0>"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Create a streaming TableEnvironment\n",
    "env_settings = EnvironmentSettings.in_streaming_mode()\n",
    "table_env = TableEnvironment.create(env_settings)\n",
    "table_env\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Table Creation\n",
    "\n",
    "`Table` is a core component of the Python Table API. A `Table` object describes a pipeline of data transformations.\n",
    "It does not contain the data itself in any way. Instead, it describes how to read data from a table source, how to add some compute on data and how to eventually write data to a table sink.\n",
    "The declared pipeline can be printed, optimized, and eventually executed in a cluster. The pipeline can work with bounded or unbounded streams which enables both streaming and batch scenarios.\n",
    "\n",
    "A `Table` is always bound to a specific `TableEnvironment`. It is not possible to combine tables from different `TableEnvironment`s in same query, e.g., to join or union them."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "Firstly, you can create a `Table` from a Python `List` Object"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "root\n",
       " |-- _1: BIGINT\n",
       " |-- _2: STRING"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')])\n",
    "table.get_schema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create a `Table` with an explicit schema."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "root\n",
       " |-- id: TINYINT\n",
       " |-- data: STRING"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from pyflink.table import DataTypes\n",
    "table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')],\n",
    "                                DataTypes.ROW([DataTypes.FIELD(\"id\", DataTypes.TINYINT()),\n",
    "                                               DataTypes.FIELD(\"data\", DataTypes.STRING())]))\n",
    "table.get_schema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create a `Table` from a Pandas `DataFrame`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/Users/duanchen/sourcecode/flink/flink-python/dev/.conda/lib/python3.7/site-packages/pyflink/table/utils.py:55: FutureWarning: Schema passed to names= option, please pass schema= explicitly. Will raise exception in future\n",
      "  return pa.RecordBatch.from_arrays(arrays, schema)\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "root\n",
       " |-- id: BIGINT\n",
       " |-- data: STRING"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import pandas as pd\n",
    "df = pd.DataFrame({'id': [1, 2], 'data': ['Hi', 'Hello']})\n",
    "table = table_env.from_pandas(df)\n",
    "table.get_schema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create a `Table` from DDL statements"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "root\n",
       " |-- id: TINYINT\n",
       " |-- data: STRING"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "table_env.execute_sql(\"\"\"\n",
    "    CREATE TABLE random_source (\n",
    "        id TINYINT,\n",
    "        data STRING\n",
    "    ) WITH (\n",
    "        'connector' = 'datagen',\n",
    "        'fields.id.kind' = 'sequence',\n",
    "        'fields.id.start' = '1',\n",
    "        'fields.id.end' = '2',\n",
    "        'fields.data.kind' = 'random'\n",
    "    )\n",
    "\"\"\")\n",
    "table = table_env.from_path(\"random_source\")\n",
    "table.get_schema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create a `Table` from `TableDescriptor`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "root\n",
       " |-- id: TINYINT\n",
       " |-- data: STRING"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from pyflink.table import DataTypes\n",
    "from pyflink.table.schema import Schema\n",
    "from pyflink.table.table_descriptor import TableDescriptor\n",
    "\n",
    "\n",
    "schema = (Schema.new_builder()\n",
    "         .column('id', DataTypes.TINYINT())\n",
    "         .column('data', DataTypes.STRING())\n",
    "         .build())\n",
    "\n",
    "table = table_env.from_descriptor(\n",
    "    TableDescriptor\n",
    "        .for_connector('datagen')\n",
    "        .option('fields.id.kind', 'sequence')\n",
    "        .option('fields.id.start', '1')\n",
    "        .option('fields.id.end', '2')\n",
    "        .option('fields.data.kind', 'random')\n",
    "        .schema(schema)\n",
    "        .build())\n",
    "table.get_schema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create a `Table` from a `DataStream`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "root\n",
       " |-- id: TINYINT\n",
       " |-- data: STRING"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from pyflink.common import Types\n",
    "from pyflink.datastream import StreamExecutionEnvironment\n",
    "from pyflink.table import StreamTableEnvironment\n",
    "\n",
    "# create a StreamExecutionEnvironment which is the entry point of `DataStream` program.\n",
    "env = StreamExecutionEnvironment.get_execution_environment()\n",
    "t_env = StreamTableEnvironment.create(env)\n",
    "\n",
    "ds = env.from_collection([(1, 'Hi'), (2, 'Hello')],\n",
    "                              type_info=Types.ROW_NAMED(\n",
    "                                  [\"id\", \"data\"],\n",
    "                                  [Types.BYTE(), Types.STRING()]))\n",
    "\n",
    "table = t_env.from_data_stream(ds,\n",
    "                               Schema.new_builder()\n",
    "                               .column(\"id\", DataTypes.TINYINT())\n",
    "                               .column(\"data\", DataTypes.STRING())\n",
    "                               .build())\n",
    "table.get_schema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create a `Table` from `Catalog`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "root\n",
       " |-- id: BIGINT\n",
       " |-- data: STRING"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# prepare the catalog\n",
    "# register Table API tables in the catalog\n",
    "old_table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])\n",
    "table_env.create_temporary_view('source_table', old_table)\n",
    "\n",
    "# create Table API table from catalog\n",
    "table = table_env.from_path('source_table')\n",
    "table.get_schema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Viewing Data on Table\n",
    "\n",
    "You can get the schema of `Table` as follows:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "root\n",
       " |-- id: BIGINT\n",
       " |-- data: STRING"
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "table.get_schema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "(\n",
      "  `id` BIGINT,\n",
      "  `data` STRING\n",
      ")\n"
     ]
    }
   ],
   "source": [
    "table.print_schema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "`Table.execute()` collects the contents of the current `Table` to local client."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[<Row(1, 'Hi')>, <Row(2, 'Hello')>]"
      ]
     },
     "execution_count": 12,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "list(table.execute().collect())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+----------------------+--------------------------------+\n",
      "| op |                   id |                           data |\n",
      "+----+----------------------+--------------------------------+\n",
      "| +I |                    1 |                             Hi |\n",
      "| +I |                    2 |                          Hello |\n",
      "+----+----------------------+--------------------------------+\n",
      "2 rows in set\n"
     ]
    }
   ],
   "source": [
    "table.execute().print()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "PyFlink Table also provides the conversion back to a [pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) to leverage pandas API."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>id</th>\n",
       "      <th>data</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>1</td>\n",
       "      <td>Hi</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>2</td>\n",
       "      <td>Hello</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "   id   data\n",
       "0   1     Hi\n",
       "1   2  Hello"
      ]
     },
     "execution_count": 14,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "table.to_pandas()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Selecting and Accessing Data on Table\n",
    "\n",
    "PyFlink Table is lazily evaluated and simply selecting a column does not trigger the computation but it returns a Column `Expression` instance."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "True"
      ]
     },
     "execution_count": 15,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from pyflink.table.expressions import col\n",
    "type(table.id)==type(col('id'))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "These Column `Expression`s can be used to select the columns from a `Table`. For example, `Table.select()` takes the column `Expression` instances that returns another `Table`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>id</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>1</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>2</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "   id\n",
       "0   1\n",
       "1   2"
      ]
     },
     "execution_count": 16,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "table.select(table.id).to_pandas()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>id</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>1</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>2</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "   id\n",
       "0   1\n",
       "1   2"
      ]
     },
     "execution_count": 17,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "table.select(col('id')).to_pandas()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Assign new Column `Expression` instance."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>id</th>\n",
       "      <th>data</th>\n",
       "      <th>upper_data</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>1</td>\n",
       "      <td>Hi</td>\n",
       "      <td>HI</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>2</td>\n",
       "      <td>Hello</td>\n",
       "      <td>HELLO</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "   id   data upper_data\n",
       "0   1     Hi         HI\n",
       "1   2  Hello      HELLO"
      ]
     },
     "execution_count": 18,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "table.add_columns(col('data').upper_case.alias('upper_data')).to_pandas()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "To select a subset of rows, use `Table.filter()`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>id</th>\n",
       "      <th>data</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>1</td>\n",
       "      <td>Hi</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "   id data\n",
       "0   1   Hi"
      ]
     },
     "execution_count": 19,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "table.filter(col('id') == 1).to_pandas()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "## Applying a Function on Table\n",
    "\n",
    "PyFlink supports various UDFs and APIs to allow users to execute Python native functions. See also the latest\n",
    "[User-defined Functions](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/table/udfs/overview/) and\n",
    "[Row-based Operations](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/table/operations/row_based_operations/).\n",
    "\n",
    "The first example is UDFs used in `Table API` & `SQL`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>_c0</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>2</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>3</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "   _c0\n",
       "0    2\n",
       "1    3"
      ]
     },
     "execution_count": 20,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from pyflink.table.udf import udf\n",
    "\n",
    "# create a general Python UDF\n",
    "@udf(result_type=DataTypes.BIGINT())\n",
    "def plus_one(i):\n",
    "  return i + 1\n",
    "\n",
    "table.select(plus_one(col('id'))).to_pandas()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/Users/duanchen/sourcecode/flink/flink-python/dev/.conda/lib/python3.7/site-packages/pyflink/table/utils.py:55: FutureWarning: Schema passed to names= option, please pass schema= explicitly. Will raise exception in future\n",
      "  return pa.RecordBatch.from_arrays(arrays, schema)\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>_c0</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>2</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>3</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "   _c0\n",
       "0    2\n",
       "1    3"
      ]
     },
     "execution_count": 21,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# create a general Python UDF\n",
    "@udf(result_type=DataTypes.BIGINT(), func_type='pandas')\n",
    "def pandas_plus_one(series):\n",
    "  return series + 1\n",
    "table.select(pandas_plus_one(col('id'))).to_pandas()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "# use the Python function in SQL API\n",
    "table_env.create_temporary_function(\"plus_one\", plus_one)\n",
    "table_env.sql_query(\"SELECT plus_one(id) FROM {}\".format(table)).to_pandas()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "Another example is UDFs used in `Row-based Operations`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+----------------------+--------------------------------+\n",
      "| op |                   id |                           data |\n",
      "+----+----------------------+--------------------------------+\n",
      "| +I |                    1 |                           HiHi |\n",
      "| +I |                    2 |                     HelloHello |\n",
      "+----+----------------------+--------------------------------+\n",
      "2 rows in set\n"
     ]
    }
   ],
   "source": [
    "from pyflink.common.types import Row\n",
    "@udf(result_type=DataTypes.ROW([DataTypes.FIELD(\"id\", DataTypes.BIGINT()),\n",
    "                                DataTypes.FIELD(\"data\", DataTypes.STRING())]))\n",
    "def func(data: Row):\n",
    "  return Row(data.id, data.data * 2)\n",
    "table.map(func).execute().print()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Emits Results of Table\n",
    "\n",
    "There are many connectors and formats available in Flink. See also the latest [Table & SQL Connectors](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/overview/)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2> +I[1, Hi]\n",
      "2> +I[2, Hello]\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "<pyflink.table.table_result.TableResult at 0x7fcd1ba83be0>"
      ]
     },
     "execution_count": 24,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# create a `Print` connector\n",
    "schema = (Schema.new_builder()\n",
    "         .column('id', DataTypes.BIGINT())\n",
    "         .column('data', DataTypes.STRING())\n",
    "         .build())\n",
    "\n",
    "table.execute_insert(\n",
    "    TableDescriptor\n",
    "        .for_connector('print')\n",
    "        .schema(schema)\n",
    "        .build())"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.3"
  },
  "pycharm": {
   "stem_cell": {
    "cell_type": "raw",
    "metadata": {
     "collapsed": false
    },
    "source": []
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
