{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "# QuickStart: DataStream API\n",
    "\n",
    "Apache Flink offers a DataStream API for building robust, stateful streaming applications. It provides fine-grained control over state and timer, which allows for the implementation of advanced event-driven systems.\n",
    "\n",
    "You can run the latest version of these examples by yourself in 'Live Notebook: DataStream' at [the quickstart page](https://pyflink.readthedocs.io/en/latest/getting_started/index.html).\n",
    "\n",
    "For advanced usage, you can refer to the latest version of [PyFlink DataStream API doc](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/datastream/intro_to_datastream_api/)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## StreamExecutionEnvironment Creation\n",
    "\n",
    "`StreamExecutionEnvironment` is the entry point and central concept for creating DataStream API programs. Flink is an unified streaming and batch computing engine, which provides unified streaming and batch API to create a `StreamExecutionEnvironment`.\n",
    "\n",
    "`StreamExecutionEnvironment` is responsible for:\n",
    "\n",
    "* `DataStream` Creation\n",
    "\n",
    "* [Python dependency management](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/dependency_management/)\n",
    "\n",
    "* Job configuration\n",
    "\n",
    "* Job submission\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<pyflink.datastream.stream_execution_environment.StreamExecutionEnvironment at 0x7fd9e8fc6e48>"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from pyflink.datastream import StreamExecutionEnvironment\n",
    "from pyflink.datastream import RuntimeExecutionMode\n",
    "\n",
    "env = StreamExecutionEnvironment.get_execution_environment()\n",
    "\n",
    "# Config the Program run in Streaming Mode\n",
    "env.set_runtime_mode(RuntimeExecutionMode.STREAMING)\n",
    "env"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## DataStream Creation\n",
    "\n",
    "`DataStream` is a core component of the Python DataStream API. A `DataStream` object describes a pipeline of data transformations.\n",
    "It does not contain the data itself in any way. Instead, it describes how to read data from a source, how to add some compute on data and how to eventually write data to a sink.\n",
    "The declared pipeline can be printed, optimized, and eventually executed in a cluster. The pipeline can work with bounded or unbounded streams which enables both streaming and batch scenarios.\n",
    "\n",
    "A `DataStream` can be created by a specific `StreamExecutionEnvironment`."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "Firstly, you can create a `DataStream` from a Python `List` Object"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "PickledByteArrayTypeInfo"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from pyflink.common.typeinfo import Types\n",
    "ds = env.from_collection([(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')])\n",
    "# if you don't specify the `type_info`, the default `type_info` is `PickledByteArrayTypeInfo`\n",
    "ds.get_type()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create a `DataStream` with an explicit `type_info`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "RowTypeInfo(f0: Integer, f1: String)"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "ds = env.from_collection(\n",
    "    collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],\n",
    "    type_info=Types.ROW([Types.INT(), Types.STRING()]))\n",
    "ds.get_type()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create a `DataStream` from DataStream Connectors"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "Long"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from pyflink.common.watermark_strategy import WatermarkStrategy\n",
    "from pyflink.datastream.connectors.number_seq import NumberSequenceSource\n",
    "\n",
    "env = StreamExecutionEnvironment.get_execution_environment()\n",
    "seq_num_source = NumberSequenceSource(1, 1000)\n",
    "ds = env.from_source(\n",
    "    source=seq_num_source,\n",
    "    watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),\n",
    "    source_name='seq_num_source',\n",
    "    type_info=Types.LONG())\n",
    "ds.get_type()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create a `DataStream` from a `Table`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "ExternalTypeInfo<RowTypeInfo(id: Integer, data: String)>"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from pyflink.table import DataTypes\n",
    "from pyflink.table import StreamTableEnvironment\n",
    "\n",
    "# create a `TableEnvironment` which is the entry point of `Table` & `SQL` program.\n",
    "t_env = StreamTableEnvironment.create(env)\n",
    "table = t_env.from_elements([(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],\n",
    "                            DataTypes.ROW([DataTypes.FIELD(\"id\", DataTypes.INT()),\n",
    "                                           DataTypes.FIELD(\"data\", DataTypes.STRING())]))\n",
    "ds = t_env.to_data_stream(table)\n",
    "ds.get_type()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Viewing Data on DataStream\n",
    "\n",
    "`DataStream.execute_and_collect()` collects the contents of the current `DataStream` to local client."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[<Row(1, 'aaa|bb')>, <Row(2, 'bb|a')>, <Row(3, 'aaa|a')>]"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "list(ds.execute_and_collect())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Print the data of `DataStream` to the console"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<pyflink.common.job_execution_result.JobExecutionResult at 0x7fd5dcc73550>"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "ds.print()\n",
    "env.execute()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Applying a Function on DataStream\n",
    "\n",
    "`DataStream` programs in Flink are regular programs that implement transformations on data streams\n",
    "(e.g., mapping, filtering, reducing). Please see [operators](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/)\n",
    "for an overview of the available transformations in Python DataStream API."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[<Row(1, 'aaa')>,\n",
       " <Row(1, 'bb')>,\n",
       " <Row(2, 'bb')>,\n",
       " <Row(2, 'a')>,\n",
       " <Row(3, 'aaa')>,\n",
       " <Row(3, 'a')>]"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from pyflink.common import Row\n",
    "from pyflink.datastream import FlatMapFunction\n",
    "\n",
    "class MyFlatMapFunction(FlatMapFunction):\n",
    "    def flat_map(self, value):\n",
    "        for s in str(value.data).split('|'):\n",
    "            yield Row(value.id, s)\n",
    "\n",
    "list(ds.flat_map(MyFlatMapFunction(), output_type=Types.ROW([Types.INT(), Types.STRING()])).execute_and_collect())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "## Emits Results of DataStream\n",
    "\n",
    "There are many connectors and formats available in Flink. See also the latest [DataStream Connectors](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/overview/).\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<pyflink.common.job_execution_result.JobExecutionResult at 0x7fd9ececa080>"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from pyflink.common import Encoder\n",
    "from pyflink.datastream.connectors.file_system import FileSink, RollingPolicy\n",
    "\n",
    "def split(s):\n",
    "    splits = s[1].split('|')\n",
    "    for sp in splits:\n",
    "        yield s[0], sp\n",
    "\n",
    "sink = (FileSink\n",
    "    .for_row_format('/tmp/sink', Encoder.simple_string_encoder(\"UTF-8\"))\n",
    "    .with_rolling_policy(RollingPolicy.default_rolling_policy(\n",
    "        part_size=1024 ** 3, rollover_interval=15 * 60 * 1000, inactivity_interval=5 * 60 * 1000))\n",
    "    .build())\n",
    "\n",
    "ds.map(lambda i: (i[0] + 1, i[1]), Types.TUPLE([Types.INT(), Types.STRING()])).sink_to(sink)\n",
    "# the result will be stored in the directory of /tmp/sink.\n",
    "env.execute()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.3"
  },
  "pycharm": {
   "stem_cell": {
    "cell_type": "raw",
    "source": [],
    "metadata": {
     "collapsed": false
    }
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}