官方例子:
https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html
按照例子写了程序,也安装了pyflink
|
python -m pip install apache-flink
|
代码:
|
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.udf import udf


env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)


add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], 
DataTypes.BIGINT())


t_env.register_function("add", add)


t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/src.txt'))
 \
.with_format(OldCsv()
.field('a', DataTypes.BIGINT())
.field('b', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('a', DataTypes.BIGINT())
.field('b', DataTypes.BIGINT())) \
.create_temporary_table('mySource')


t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/tar.txt'))
 \
.with_format(OldCsv()
.field('sum', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('sum', DataTypes.BIGINT())) \
.create_temporary_table('mySink')


t_env.from_path('mySource')\
.select("add(a, b)") \
.insert_into('mySink')


t_env.execute("tutorial_job")
|

执行:

|
python test_pyflink.py
|

报错:


|
Traceback (most recent call last):
  File 
"C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
 line 147, in deco
    return f(*a, **kw)
  File 
"C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py",
 line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
: org.apache.flink.table.api.TableException: The configured Task Off-Heap 
Memory 0 bytes is less than the least required Python worker Memory 79 mb. The 
Task Off-Heap Memory can be configured using the configuration key 
'taskmanager.memory.task.off-heap.size'.
        at 
org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.checkPythonWorkerMemory(CommonPythonBase.scala:158)
        at 
org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getMergedConfiguration(CommonPythonBase.scala:119)
        at 
org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getConfig(CommonPythonBase.scala:102)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.getConfig(StreamExecPythonCalc.scala:35)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:61)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:35)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:106)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
        at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1240)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:745)




During handling of the above exception, another exception occurred:


Traceback (most recent call last):
  File "test_pyflink.py", line 34, in <module>
    t_env.execute("tutorial_job")
  File 
"C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py",
 line 1057, in execute
    return JobExecutionResult(self._j_tenv.execute(job_name))
  File 
"C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
 line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File 
"C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
 line 154, in deco
    raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: "The configured Task Off-Heap Memory 0 
bytes is less than the least required Python worker Memory 79 mb. The Task 
Off-Heap Memory can be configured using the configuration key 
'taskmanager.memory.task.off-heap.size'."
|




里面提到还要配置taskmanager.memory.task.off-heap.size这个属性吗 , 

我找到..\Python\Python37\Lib\site-packages\pyflink\conf下面的flink-conf.yaml

增加了taskmanager.memory.task.off-heap.size: 100m

但是还是报一样的错误

请问用python安装的flink去哪里配置属性


回复