Frequently Asked Questions (FAQ)#

Installation issues#

O1: Scala Dependency#

PyFlink only provides official installation packages which contain JAR packages for Scala 2.11 before Flink 1.15 , Scala 2.12 in Flink 1.15 and without Scala dependency since Flink 1.16. If you want to use Scala 2.12, you can download the binary distribution of Scala 2.12, unzip it and then set the environment variable FLINK_HOME to point to the unzipped directory. This makes it use the JAR packages specified by FLINK_HOME instead of the JAR packages under PyFlink installation package. You can refer to PyFlink documentation for more details.

O2: Java gateway process exited before sending its port number#

The exception stack is as following:

Traceback (most recent call last):
  File "/Users/dianfu/code/src/github/pyflink-faq/testing/test_utils.py", line 122, in setUp
    self.t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
  File "/Users/dianfu/code/src/github/pyflink-faq/testing/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-10.9-x86_64.egg/pyflink/table/environment_settings.py", line 267, in in_streaming_mode
    get_gateway().jvm.EnvironmentSettings.inStreamingMode())
  File "/Users/dianfu/code/src/github/pyflink-faq/testing/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-10.9-x86_64.egg/pyflink/java_gateway.py", line 62, in get_gateway
    _gateway = launch_gateway()
  File "/Users/dianfu/code/src/github/pyflink-faq/testing/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-10.9-x86_64.egg/pyflink/java_gateway.py", line 112, in launch_gateway
    raise Exception("Java gateway process exited before sending its port number")
Exception: Java gateway process exited before sending its port number

This issue is usually caused by the reason that PyFlink isn’t installed correctly. You can verify whether PyFlink is installed correctly as following:

Execute the following command:

python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__)))"

It will print something like the following:

/Users/duanchen/miniconda3/lib/python3.7/site-packages/pyflink

Execute the following command:

ls -lh /Users/duanchen/miniconda3/lib/python3.7/site-packages/pyflink

The structure would be as following:

total 144
-rw-r--r--   1 duanchen  staff   1.3K Oct 19 16:01 README.txt
-rw-r--r--   1 duanchen  staff   1.9K Oct 19 16:01 __init__.py
drwxr-xr-x  11 duanchen  staff   352B Oct 19 16:03 __pycache__
drwxr-xr-x  25 duanchen  staff   800B Oct 19 16:03 bin
drwxr-xr-x  22 duanchen  staff   704B Oct 19 16:03 common
drwxr-xr-x  13 duanchen  staff   416B Oct 19 16:03 conf
drwxr-xr-x  20 duanchen  staff   640B Oct 19 16:03 datastream
drwxr-xr-x   4 duanchen  staff   128B Oct 19 16:03 examples
-rw-r--r--   1 duanchen  staff   3.2K Oct 19 16:01 find_flink_home.py
drwxr-xr-x  25 duanchen  staff   800B Oct 19 16:03 fn_execution
-rw-r--r--   1 duanchen  staff   9.1K Oct 19 16:01 gen_protos.py
-rw-r--r--   1 duanchen  staff   7.7K Oct 19 16:01 java_gateway.py
drwxr-xr-x  16 duanchen  staff   512B Oct 19 16:03 lib
drwxr-xr-x  26 duanchen  staff   832B Oct 19 16:03 licenses
drwxr-xr-x   4 duanchen  staff   128B Oct 19 16:04 log
drwxr-xr-x   5 duanchen  staff   160B Oct 19 16:03 metrics
drwxr-xr-x   4 duanchen  staff   128B Oct 19 16:03 opt
drwxr-xr-x  11 duanchen  staff   352B Oct 19 16:03 plugins
-rw-r--r--   1 duanchen  staff   1.3K Oct 19 16:01 pyflink_callback_server.py
-rw-r--r--   1 duanchen  staff    13K Oct 19 16:01 pyflink_gateway_server.py
-rw-r--r--   1 duanchen  staff   5.3K Oct 19 16:01 serializers.py
-rw-r--r--   1 duanchen  staff   7.9K Oct 19 16:01 shell.py
drwxr-xr-x  31 duanchen  staff   992B Oct 19 16:03 table
drwxr-xr-x   6 duanchen  staff   192B Oct 19 16:03 util
-rw-r--r--   1 duanchen  staff   1.1K Oct 19 16:01 version.py

Please check whether the directories lib, opt are available. Besides, the jar packages under these directories should be as following:

(base) ➜  pyflink ls -lh /Users/duanchen/miniconda3/lib/python3.7/site-packages/pyflink/lib
total 401216
-rw-r--r--  1 duanchen  staff   190K Oct 19 16:02 flink-cep-1.15.2.jar
-rw-r--r--  1 duanchen  staff   475K Oct 19 16:02 flink-connector-files-1.15.2.jar
-rw-r--r--  1 duanchen  staff    93K Oct 19 16:02 flink-csv-1.15.2.jar
-rw-r--r--  1 duanchen  staff   110M Oct 19 16:02 flink-dist-1.15.2.jar
-rw-r--r--  1 duanchen  staff   171K Oct 19 16:02 flink-json-1.15.2.jar
-rw-r--r--  1 duanchen  staff    20M Oct 19 16:02 flink-scala_2.12-1.15.2.jar
-rw-r--r--  1 duanchen  staff    10M Oct 19 16:02 flink-shaded-zookeeper-3.5.9.jar
-rw-r--r--  1 duanchen  staff    15M Oct 19 16:02 flink-table-api-java-uber-1.15.2.jar
-rw-r--r--  1 duanchen  staff    35M Oct 19 16:02 flink-table-planner-loader-1.15.2.jar
-rw-r--r--  1 duanchen  staff   2.9M Oct 19 16:02 flink-table-runtime-1.15.2.jar
-rw-r--r--  1 duanchen  staff   203K Oct 19 16:02 log4j-1.2-api-2.17.1.jar
-rw-r--r--  1 duanchen  staff   295K Oct 19 16:02 log4j-api-2.17.1.jar
-rw-r--r--  1 duanchen  staff   1.7M Oct 19 16:02 log4j-core-2.17.1.jar
-rw-r--r--  1 duanchen  staff    24K Oct 19 16:02 log4j-slf4j-impl-2.17.1.jar

(base) ➜  pyflink ls -lh /Users/duanchen/miniconda3/lib/python3.7/site-packages/pyflink/opt
total 76736
-rw-r--r--  1 duanchen  staff    37M Oct 19 16:02 flink-python_2.12-1.15.2.jar
-rw-r--r--  1 duanchen  staff   472K Oct 19 16:02 flink-sql-client-1.15.2.jar

Usage issues#

O1: How to prepare Python Virtual Environment#

You can execute the following script to prepare a Python virtual env zip which can be used on Mac OS and most Linux distributions.

 1set -e
 2# download miniconda.sh
 3if [[ `uname -s` == "Darwin" ]]; then
 4    wget "https://repo.continuum.io/miniconda/Miniconda3-4.7.10-MacOSX-x86_64.sh" -O "miniconda.sh"
 5else
 6    wget "https://repo.continuum.io/miniconda/Miniconda3-4.7.10-Linux-x86_64.sh" -O "miniconda.sh"
 7fi
 8
 9# add the execution permission
10chmod +x miniconda.sh
11
12# create python virtual environment
13./miniconda.sh -b -p venv
14
15# activate the conda python virtual environment
16source venv/bin/activate ""
17
18# specify your apache-flink version and you can add other dependencies
19pip install "apache-flink==$1"
20
21# deactivate the conda python virtual environment
22conda deactivate
23
24# remove the cached packages
25rm -rf venv/pkgs
26
27# package the prepared conda python virtual environment
28zip -r venv.zip venv

O2: How to add Python Files#

You can use the command-line arguments pyfs

$ ./bin/flink run --python examples/python/table/word_count.py --pyFiles file:///user.txt,hdfs:///$namenode_address/username.txt

For example, if you have a directory named myDir which has the following hierarchy:

myDir
├──utils
    ├──__init__.py
    ├──my_util.py

You can add the Python files of directory myDir as following:

table_env.add_python_file('myDir')

def my_udf():
    from utils import my_util

JDK issues#

O1: InaccessibleObjectException: Unable to make field private final byte[] java.lang.String.value accessible: module java.base does not “opens java.lang” to unnamed module @4e4aea35#

: java.lang.reflect.InaccessibleObjectException: Unable to make field private final byte[] java.lang.String.value accessible: module java.base does not "opens java.lang" to unnamed module @4e4aea35
    at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354)
    at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
    at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:178)
    at java.base/java.lang.reflect.Field.setAccessible(Field.java:172)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:106)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2138)
    at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.createSinkFunctionTransformation(CommonExecSink.java:331)
    at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.applySinkProvider(CommonExecSink.java:306)
    at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.createSinkTransformation(CommonExecSink.java:146)
    at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:140)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)

This is an issue around Java 17. It still doesn’t support Java 17 in Flink. You can refer to FLINK-15736 for more details. To solve this issue, you need to use JDK 1.8 or JDK 11.

Connector issues#

O2: ClassNotFoundException: com.mysql.cj.jdbc.Driver#

py4j.protocol.Py4JJavaError: An error occurred while calling o13.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
...
Caused by: java.io.IOException: unable to open JDBC writer
    at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.open(JdbcOutputFormat.java:145)
    at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.open(GenericJdbcSinkFunction.java:52)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
    at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
    at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.loadDriver(SimpleJdbcConnectionProvider.java:90)
    at org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getLoadedDriver(SimpleJdbcConnectionProvider.java:100)
    at org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getOrEstablishConnection(SimpleJdbcConnectionProvider.java:117)
    at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.open(JdbcOutputFormat.java:143)

This indicates that it the JDBC driver JAR package is missing. It should be noted that the JDBC driver is also required when using JDBC connector. The JAR packages of the JDBC drivers could be found in the JDBC connector page.

Runtime issues#

Q1: OverflowError: timeout value is too large#

File "D:\Anaconda3\envs\py37\lib\threading.py", line 926, in _bootstrap_inner
    self.run()
File "D:\Anaconda3\envs\py37\lib\site-packages\apache_beam\runners\worker\data_plane.py", line 218, in run
    while not self._finished.wait(next_call - time.time()):
File "D:\Anaconda3\envs\py37\lib\threading.py", line 552, in wait
    signaled = self._cond.wait(timeout)
File "D:\Anaconda3\envs\py37\lib\threading.py", line 300, in wait
    gotit = waiter.acquire(True, timeout)
OverflowError: timeout value is too large

This exception only occurs on Windows. It doesn’t affect the execution of PyFlink jobs and so you could ignore it usually. Besides, you could also upgrade PyFlink versions to 1.12.8, 1.13.7, 1.14.6, 1.15.2 or 1.16.0 where this issue has been fixed. You could refer to FLINK-25883 for more details.

Data type issues#

Q1: ‘tuple’ object has no attribute ‘_values’#

Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction 4:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute    response = task()
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda>    lambda:
    self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction    getattr(request, request_type),
    request.instruction_id)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle
    bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1000, in process_bundle    element.data)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228, in process_encoded    self.output(decoded_value)  File "apache_beam/runners/worker/operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 158, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 174, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 104, in
    pyflink.fn_execution.beam.beam_operations_fast.IntermediateOutputProcessor.process_outputs
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 158, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 174, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 92, in
    pyflink.fn_execution.beam.beam_operations_fast.NetworkOutputProcessor.process_outputs
File "pyflink/fn_execution/beam/beam_coder_impl_fast.pyx", line 101, in
    pyflink.fn_execution.beam.beam_coder_impl_fast.FlinkLengthPrefixCoderBeamWrapper.encode_to_stream
File "pyflink/fn_execution/coder_impl_fast.pyx", line 271, in pyflink.fn_execution.coder_impl_fast.IterableCoderImpl.encode_to_stream
File "pyflink/fn_execution/coder_impl_fast.pyx", line 399, in pyflink.fn_execution.coder_impl_fast.RowCoderImpl.encode_to_stream
File "pyflink/fn_execution/coder_impl_fast.pyx", line 389, in pyflink.fn_execution.coder_impl_fast.RowCoderImpl.encode_to_streamAttributeError: 'tuple'
    object has no attribute '_values'

This issue is usually caused by the reason that it returns an object other than Row type in a Python user-defined function, however, the return type of the function is declared as Row. Please double check the return value of the Python user-defined function to make sure that the type of the returned value is consitent with the declartion.

Q2: AttributeError: ‘int’ object has no attribute ‘encode’#

File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 85, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
File "pyflink/fn_execution/coder_impl_fast.pyx", line 83, in pyflink.fn_execution.coder_impl_fast.TableFunctionRowCoderImpl.encode_to_stream
File "pyflink/fn_execution/coder_impl_fast.pyx", line 256, in pyflink.fn_execution.coder_impl_fast.FlattenRowCoderImpl._encode_one_row
File "pyflink/fn_execution/coder_impl_fast.pyx", line 260, in pyflink.fn_execution.coder_impl_fast.FlattenRowCoderImpl._encode_one_row_with_row_kind
File "pyflink/fn_execution/coder_impl_fast.pyx", line 244, in pyflink.fn_execution.coder_impl_fast.FlattenRowCoderImpl._encode_one_row_to_buffer
File "pyflink/fn_execution/coder_impl_fast.pyx", line 550, in pyflink.fn_execution.coder_impl_fast.FlattenRowCoderImpl._encode_field_simple
AttributeError: 'int' object has no attribute 'encode'

This reason to this issue is usually that the actual result value of a Python user-defined function isn’t consistent with the declared result type of the Python user-defined function.

Q3: Types.BIG_INT() VS Types.LONG()#

It should be noted that Types.BIG_INT() represents type information for the Java BigInteger, while Types.LONG() represents type information for long integer. There are several users are using Types.BIG_INT() for long integer by mistake.