Re: 广播流与非广播流 数据先后问题

2023-02-20 文章 Weihua Hu
Hi,

可以看到你的问题,你需要订阅 user-zh 的邮件列表才能收到相关的回复,可以参考:
https://flink.apache.org/community.html

Best,
Weihua


On Tue, Feb 21, 2023 at 12:17 PM 知而不惑  wrote:

> 有收到我的问题吗
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
>
>  发送时间:2023年2月21日(星期二) 上午9:37
> 收件人:"user-zh"
> 主题:广播流与非广播流 数据先后问题
>
>
>
> 各位大佬好
> 我使用广播流与非广播流进行connet,随后继承实现了BroadcastProcessFunction,运行时发现在自定义实现的BroadcastProcessFunction
> 中,广播流数据会先到,导致processElement() 中获取广播流数据为空,请问有什么写法或机制解决该问题?我尝试在谷歌和chatgpt
> 找寻答案,给到的回复是 用list 缓存元素 ,在open中初始化,但是我在open中初始化得到了一个 必须在keyby() 之后使用的报错
> 以下是processElement 的最小工作单元代码示例 和 main 方法的使用:
> @Override
> public void processElement(FileEventOuterClass.FileEvent value,
> BroadcastProcessFunction List FileEventOuterClass.FileEventgt;.ReadOnlyContext ctx,
> Collector  try {
>  ReadOnlyBroadcastState List ctx.getBroadcastState(ruleDescriptor);
>
>  List sensitiveRules = broadcastState.get(null);
>  if
> (CollectionUtils.isEmpty(sensitiveRules)) {
>  return;
>  }
>  
>  } catch (Exception e) {
> 
> log.error("SensitiveDataClassify err:", e);
>  }
> }
> public static void main(String[] args) throws Exception {
>  StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>  env.setParallelism(1);
>
>  MapStateDescriptor List  new
> MapStateDescriptor<gt;("ruleBroadcastState", Types.VOID, new
> ListTypeInfo<gt;(SensitiveRule.class));
>
>  // 广播流
>  BroadcastStream broadcast = sensitiveRule.broadcast(ruleDescriptor);
>
>  DataStreamSource env.socketTextStream("localhost", 11451);
> 
> SingleOutputStreamOperator localhost.map((MapFunction value -gt;
> FileEventOuterClass.FileEvent.newBuilder().setChannel("").build());
>
> 
> SingleOutputStreamOperator streamOperator = stream.connect(broadcast).process(new
> SensitiveDataClassify());
>  streamOperator.print("qqq");
>  env.execute();
>
> }


Re: 广播流与非广播流 数据先后问题

2023-02-20 文章 Weihua Hu
Hi,

可以把具体的报错信息贴一下,另外代码中没有看到使用 listState 缓存元素的部分

Best,
Weihua


On Tue, Feb 21, 2023 at 9:38 AM 知而不惑  wrote:

> 各位大佬好
> 我使用广播流与非广播流进行connet,随后继承实现了BroadcastProcessFunction,运行时发现在自定义实现的BroadcastProcessFunction
> 中,广播流数据会先到,导致processElement() 中获取广播流数据为空,请问有什么写法或机制解决该问题?我尝试在谷歌和chatgpt
> 找寻答案,给到的回复是 用list 缓存元素 ,在open中初始化,但是我在open中初始化得到了一个 必须在keyby() 之后使用的报错
> 以下是processElement 的最小工作单元代码示例 和 main 方法的使用:
> @Override
> public void processElement(FileEventOuterClass.FileEvent value,
> BroadcastProcessFunction List ctx, Collector try {
> ReadOnlyBroadcastState broadcastState = ctx.getBroadcastState(ruleDescriptor);
>
> List if (CollectionUtils.isEmpty(sensitiveRules)) {
> return;
> }
> 
> } catch (Exception e) {
> log.error("SensitiveDataClassify err:", e);
> }
> }
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
>
> MapStateDescriptor new MapStateDescriptor<("ruleBroadcastState", Types.VOID,
> new ListTypeInfo<(SensitiveRule.class));
>
> // 广播流
> BroadcastStream sensitiveRule.broadcast(ruleDescriptor);
>
> DataStreamSource env.socketTextStream("localhost", 11451);
> SingleOutputStreamOperator localhost.map((MapFunction -
> FileEventOuterClass.FileEvent.newBuilder().setChannel("").build());
>
> SingleOutputStreamOperator streamOperator = stream.connect(broadcast).process(new
> SensitiveDataClassify());
> streamOperator.print("qqq");
> env.execute();
>
> }