Hi,
可以看到你的问题,你需要订阅 user-zh 的邮件列表才能收到相关的回复,可以参考:
https://flink.apache.org/community.html
Best,
Weihua
On Tue, Feb 21, 2023 at 12:17 PM 知而不惑 wrote:
> 有收到我的问题吗
>
>
>
>
> --原始邮件--
> 发件人:
> "user-zh"
>
> 发送时间:2023年2月21日(星期二) 上午9:37
> 收件人:"user-zh"
> 主题:广播流与非广播流 数据先后问题
>
>
>
> 各位大佬好
> 我使用广播流与非广播流进行connet,随后继承实现了BroadcastProcessFunction,运行时发现在自定义实现的BroadcastProcessFunction
> 中,广播流数据会先到,导致processElement() 中获取广播流数据为空,请问有什么写法或机制解决该问题?我尝试在谷歌和chatgpt
> 找寻答案,给到的回复是 用list 缓存元素 ,在open中初始化,但是我在open中初始化得到了一个 必须在keyby() 之后使用的报错
> 以下是processElement 的最小工作单元代码示例 和 main 方法的使用:
> @Override
> public void processElement(FileEventOuterClass.FileEvent value,
> BroadcastProcessFunction List FileEventOuterClass.FileEventgt;.ReadOnlyContext ctx,
> Collector try {
> ReadOnlyBroadcastState List ctx.getBroadcastState(ruleDescriptor);
>
> List sensitiveRules = broadcastState.get(null);
> if
> (CollectionUtils.isEmpty(sensitiveRules)) {
> return;
> }
>
> } catch (Exception e) {
>
> log.error("SensitiveDataClassify err:", e);
> }
> }
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
>
> MapStateDescriptor List new
> MapStateDescriptor<gt;("ruleBroadcastState", Types.VOID, new
> ListTypeInfo<gt;(SensitiveRule.class));
>
> // 广播流
> BroadcastStream broadcast = sensitiveRule.broadcast(ruleDescriptor);
>
> DataStreamSource env.socketTextStream("localhost", 11451);
>
> SingleOutputStreamOperator localhost.map((MapFunction value -gt;
> FileEventOuterClass.FileEvent.newBuilder().setChannel("").build());
>
>
> SingleOutputStreamOperator streamOperator = stream.connect(broadcast).process(new
> SensitiveDataClassify());
> streamOperator.print("qqq");
> env.execute();
>
> }