附件将于2024年11月15日过期 环境为:pyflink版本 1.20.0 Jdk11 具体报错如下: /Users/xiaozhang/anaconda3/envs/pyflink/bin/python /Users/xiaozhang/Documents/临时/pyflink2/socketFlinkTest.py Traceback (most recent call last): File "/Users/xiaozhang/Documents/临时/pyflink2/socketFlinkTest.py", line 27, in <module> ds = env.from_source(source=myTestSourceFunction, File "/Users/xiaozhang/anaconda3/envs/pyflink/lib/python3.10/site-packages/pyflink/datastream/stream_execution_environment.py", line 972, in from_source j_data_stream = self._j_stream_execution_environment.fromSource( File "/Users/xiaozhang/anaconda3/envs/pyflink/lib/python3.10/site-packages/py4j/java_gateway.py", line 1322, in __call__ return_value = get_return_value( File "/Users/xiaozhang/anaconda3/envs/pyflink/lib/python3.10/site-packages/pyflink/util/exceptions.py", line 146, in deco return f(*a, **kw) File "/Users/xiaozhang/anaconda3/envs/pyflink/lib/python3.10/site-packages/py4j/protocol.py", line 330, in get_return_value raise Py4JError( py4j.protocol.Py4JError: An error occurred while calling o10.fromSource. Trace: org.apache.flink.api.python.shaded.py4j.Py4JException: Method fromSource([class MyTestSourceFunction, class org.apache.flink.api.common.eventtime.WatermarkStrategy$$Lambda$184/0x000000080026fc40, class java.lang.String, null]) does not exist at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:321) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:329) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:274) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:834)
Java实现如下: import org.apache.flink.streaming.api.functions.source.SourceFunction; public class MyTestSourceFunction implements SourceFunction<String> { private boolean runningFlag = true; @Override public void run(SourceContext<String> ctx) throws Exception { while (runningFlag){ ctx.collect("hi"); ctx.collect("world"); Thread.sleep(30000); } } @Override public void cancel() { runningFlag = false; } } Python 调用代码如下: if __name__ == '__main__': env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) # websocket_builder = WebSocketBuilder() # websocket_builder.set_websocket_url('wss://fstream.binance.com/ws/btcusdt@aggTrade') # websocket_source = websocket_builder.build() # 注意:仅支持本地文件URL(以"file:"开头)。 # env.get_config().set("pipeline.jars", 'file:///Users/xiaozhang/Documents/ <file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava2.jar>临时 <file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava2.jar>/pyflink2/data/jar/MyFlinkJava2.jar <file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava2.jar>') # # # 注意:路径必须指定协议(例如:文件——"file"),并且用户应确保在客户端和群集上都可以访问这些URL。 # env.get_config().set("pipeline.classpaths", 'file:///Users/xiaozhang/Documents/ <file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava2.jar>临时 <file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava2.jar>/pyflink2/data/jar/MyFlinkJava2.jar <file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava2.jar>') env.add_jars('file:///Users/xiaozhang/Documents/ <file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava3.jar>临时 <file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava3.jar>/pyflink2/data/jar/MyFlinkJava3.jar <file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava3.jar>') websocket_source = MyWebSocketSourceFunction('wss://fstream.binance.com/ws/btcusdt@aggTrade') myTestSourceFunction = MyTestSourceFunction() ds = env.from_source(source=myTestSourceFunction, watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(), source_name="websocket source") # 这里可以对 DataStream 进行进一步的处理或输出 env.execute() 点按以下载 <https://www.icloud.com/attachment/?u=https%3A%2F%2Fcvws.icloud-content.com.cn%2FB%2FAS_hPICVuwMyqRslSuaVxafzE28BAQfrbb9zEnSqBKJNQPFq52CKy3jJ%2F%24%7Bf%7D%3Fo%3DAlMPOFyDuAUnklB6z1WxH3Goppe3dCVz045kgO4v4FWZ%26v%3D1%26x%3D3%26a%3DCAog5b0lBmoLgON8MghDYAm5-GzyY_0ektnm6f0deRTf0AMSehDdlbKvqTIY3aWtg7MyIgEAKgkC6AMA_0qFfGBSBPMTbwFaBIrLeMlqJ9fmE8i8lV-QQKOdxk6DpYiwmTqaaNcRQJOZwoWKypKimqxRXG-asXInndmhkw6eiqABj61DFSJ5_EE3oesXGq59O8qZsuhs1iwX6M_7gXEC%26e%3D1731684160%26fl%3D%26r%3D90C67D72-75BF-4407-B862-CB77E2B20EA2-1%26k%3D%24%7Buk%7D%26ckc%3Dcom.apple.largeattachment%26ckz%3D90FDAF6E-4E3F-4528-B0D9-1CEC40207386%26p%3D203%26s%3DSVOlLV6Fb-d8rlZG2BdKPQb6cwQ&uk=eFXBf3JGovxD2MX0bJlPhw&f=MyFlinkJava3.jar&sz=66618227>MyFlinkJava3.jar 66.6 MB 不知报错的原因是什么,或有哪些排查方向,附件为自己打包的jar包 还有就是看这个接口他的一些其他实现如RichParallelSourceFunction已经过时,那现在想自定义source该用什么呀,