YARN#

Apache Hadoop YARN is a cluster resource management framework for managing the resources and scheduling jobs in a Hadoop cluster. It’s supported to submit PyFlink jobs to YARN for execution.

Set up Python environment#

It requires Python 3.6 or above with PyFlink pre-installed to be available on the nodes of the YARN cluster. It’s suggested to use Python virtual environments to set up the Python environment. See Create a Python virtual environment for more details on how to prepare Python virtual environments with PyFlink installed.

Once the Python virtual environment is available, it needs to be deployed on the cluster. There are the following options to deploy it:

  • Install Python virtual environments on all the cluster nodes in advance

You could install Python virtual environments on all the cluster nodes with PyFlink pre-installed before submitting PyFlink jobs. Note that if you have a lot of jobs which use different Python versions and Flink versions, you could create multiple Python virtual environments to isolate them. For each PyFlink job, it could choose one of these Python virtual environments to use.

./bin/flink run-application -t yarn-application \
      -Djobmanager.memory.process.size=1024m \
      -Dtaskmanager.memory.process.size=1024m \
      -Dyarn.application.name=<ApplicationName> \
      -pyclientexec /path/to/venv/bin/python3 \
      -pyexec /path/to/venv/bin/python3 \
      -py word_count.py

In the above example, it assumes that there is already a Python virtual environment available at /path/to/venv on all the cluster nodes. It should be noted that options -pyclientexec and -pyexec are also required to specify to use the given Python virtual environment at client side (for job compiling) and server side (for Python UDF execution) separately.

  • Specify the Python virtual environments during submitting PyFlink jobs

It also supports to distribute the Python virtual environment during submitting PyFlink jobs. In this way, the Python virtual environment will be distributed to the cluster nodes where PyFlink jobs are running on during job starting up. This is more flexible and useful when it’s not possible to set up the Python environments in advance on the cluster nodes or when there are some special requirements where the pre-installed Python environments could not meet.

./bin/flink run-application -t yarn-application \
      -Djobmanager.memory.process.size=1024m \
      -Dtaskmanager.memory.process.size=1024m \
      -Dyarn.application.name=<ApplicationName> \
      -Dyarn.ship-files=/path/to/shipfiles \
      -pyarch shipfiles/venv.zip \
      -pyclientexec venv.zip/venv/bin/python3 \
      -pyexec venv.zip/venv/bin/python3 \
      -py shipfiles/word_count.py

In the above example, the Python virtual environment is specified via option -pyarch. It will be distributed to the cluster nodes during job execution. It should be noted that options -pyclientexec and -pyexec are also required to specify to use the given Python virtual environment at client side (for job compiling) and server side (for Python UDF execution) separately.

Note

It assumes that the Python dependencies needed to execute the job are already placed in the directory /path/to/shipfiles. For example, it should contain venv.zip and word_count.py for the above example.

As it executes the job on the JobManager in YARN application mode, the paths specified in -pyarch and -py are paths relative to shipfiles which is the directory name of the shipped files.

The archive files specified via -pyarch will be distributed to the TaskManagers through blob server where the file size limit is 2 GB. If the size of an archive file is more than 2 GB, you could upload it to a distributed file system and then use the path in the command line option -pyarch.

  • Mix use of the above options

You could also mix use of the above options, that is, pre-install a few commonly used Python virtual environments on the cluster nodes and use custom Python virtual environment when there are some special requirements.