附件将于2024年11月15日过期
环境为:pyflink版本 1.20.0   Jdk11
具体报错如下:
/Users/xiaozhang/anaconda3/envs/pyflink/bin/python 
/Users/xiaozhang/Documents/临时/pyflink2/socketFlinkTest.py 
Traceback (most recent call last):
  File "/Users/xiaozhang/Documents/临时/pyflink2/socketFlinkTest.py", line 27, in 
<module>
    ds = env.from_source(source=myTestSourceFunction,
  File 
"/Users/xiaozhang/anaconda3/envs/pyflink/lib/python3.10/site-packages/pyflink/datastream/stream_execution_environment.py",
 line 972, in from_source
    j_data_stream = self._j_stream_execution_environment.fromSource(
  File 
"/Users/xiaozhang/anaconda3/envs/pyflink/lib/python3.10/site-packages/py4j/java_gateway.py",
 line 1322, in __call__
    return_value = get_return_value(
  File 
"/Users/xiaozhang/anaconda3/envs/pyflink/lib/python3.10/site-packages/pyflink/util/exceptions.py",
 line 146, in deco
    return f(*a, **kw)
  File 
"/Users/xiaozhang/anaconda3/envs/pyflink/lib/python3.10/site-packages/py4j/protocol.py",
 line 330, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o10.fromSource. Trace:
org.apache.flink.api.python.shaded.py4j.Py4JException: Method fromSource([class 
MyTestSourceFunction, class 
org.apache.flink.api.common.eventtime.WatermarkStrategy$$Lambda$184/0x000000080026fc40,
 class java.lang.String, null]) does not exist
        at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:321)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:329)
        at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:274)
        at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:834)


Java实现如下:
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class MyTestSourceFunction implements SourceFunction<String> {
    private boolean runningFlag = true;
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (runningFlag){
        ctx.collect("hi");
        ctx.collect("world");
        Thread.sleep(30000);
        }
    }

    @Override
    public void cancel() {
        runningFlag = false;
        }
}
Python 调用代码如下:
if __name__ == '__main__':
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

# websocket_builder = WebSocketBuilder()
# 
websocket_builder.set_websocket_url('wss://fstream.binance.com/ws/btcusdt@aggTrade')
# websocket_source = websocket_builder.build()

# 注意:仅支持本地文件URL(以"file:"开头)。
# env.get_config().set("pipeline.jars", 'file:///Users/xiaozhang/Documents/ 
<file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava2.jar>临时
 
<file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava2.jar>/pyflink2/data/jar/MyFlinkJava2.jar
 
<file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava2.jar>')
#
# # 注意:路径必须指定协议(例如:文件——"file"),并且用户应确保在客户端和群集上都可以访问这些URL。
# env.get_config().set("pipeline.classpaths", 
'file:///Users/xiaozhang/Documents/ 
<file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava2.jar>临时
 
<file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava2.jar>/pyflink2/data/jar/MyFlinkJava2.jar
 
<file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava2.jar>')

env.add_jars('file:///Users/xiaozhang/Documents/ 
<file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava3.jar>临时
 
<file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava3.jar>/pyflink2/data/jar/MyFlinkJava3.jar
 
<file:///Users/xiaozhang/Documents/%E4%B8%B4%E6%97%B6/pyflink2/data/jar/MyFlinkJava3.jar>')

websocket_source = 
MyWebSocketSourceFunction('wss://fstream.binance.com/ws/btcusdt@aggTrade')
myTestSourceFunction = MyTestSourceFunction()

ds = env.from_source(source=myTestSourceFunction,
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
source_name="websocket source")

# 这里可以对 DataStream 进行进一步的处理或输出

env.execute()
点按以下载
 
<https://www.icloud.com/attachment/?u=https%3A%2F%2Fcvws.icloud-content.com.cn%2FB%2FAS_hPICVuwMyqRslSuaVxafzE28BAQfrbb9zEnSqBKJNQPFq52CKy3jJ%2F%24%7Bf%7D%3Fo%3DAlMPOFyDuAUnklB6z1WxH3Goppe3dCVz045kgO4v4FWZ%26v%3D1%26x%3D3%26a%3DCAog5b0lBmoLgON8MghDYAm5-GzyY_0ektnm6f0deRTf0AMSehDdlbKvqTIY3aWtg7MyIgEAKgkC6AMA_0qFfGBSBPMTbwFaBIrLeMlqJ9fmE8i8lV-QQKOdxk6DpYiwmTqaaNcRQJOZwoWKypKimqxRXG-asXInndmhkw6eiqABj61DFSJ5_EE3oesXGq59O8qZsuhs1iwX6M_7gXEC%26e%3D1731684160%26fl%3D%26r%3D90C67D72-75BF-4407-B862-CB77E2B20EA2-1%26k%3D%24%7Buk%7D%26ckc%3Dcom.apple.largeattachment%26ckz%3D90FDAF6E-4E3F-4528-B0D9-1CEC40207386%26p%3D203%26s%3DSVOlLV6Fb-d8rlZG2BdKPQb6cwQ&uk=eFXBf3JGovxD2MX0bJlPhw&f=MyFlinkJava3.jar&sz=66618227>MyFlinkJava3.jar
66.6 MB
不知报错的原因是什么,或有哪些排查方向,附件为自己打包的jar包

还有就是看这个接口他的一些其他实现如RichParallelSourceFunction已经过时,那现在想自定义source该用什么呀,

回复