FlinkKafkaConsumer011<String> consumer = new
FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), properties);
consumer.setStartFromLatest();
DataStreamSource<String> stream = env.addSource(consumer);
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.map(
new MapFunction<String, Tuple2<String, Integer>>() {
int num = 0;
@Override
public Tuple2<String, Integer> map(String s) {
num++;
if (num % 10 == 0) {
System.out.println("????????,????????");
throw new RuntimeException("????????????????????");
} else {
return new Tuple2(s, 1);
}
}
}).keyBy(0)
.sum(1);
sum.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
??????????????
------------------ ???????? ------------------
??????: "Benchao Li"<[email protected]>;
????????: 2020??5??26??(??????) ????11:29
??????: "user-zh"<[email protected]>;
????: Re: flink 1.10webui??????print????
??????????????????????????print??????
smq <[email protected]> ??2020??5??26?????? ????11:20??????
> ??????????????????????webui????????????????????????????????client??
>
>
> ---????????---
> ??????: &quot;Lijie Wang&quot;<[email protected]&gt;
> ????????: 2020??5??26??(????) ????10:14
> ??????:
&quot;[email protected]&quot;<[email protected]&gt;;
> ????: ??????flink 1.10webui??????print????
>
>
> ?????????????????????????????????????????????????? taskmanager.out
???????????? ?????????????????????? print
> ???????????????? TM ?????????????????? client ????????????
>
>
>
>
> ??2020??05??26?? 21:39??smq<[email protected]&gt; ??????
> Hi
>
??????????????????????????webui??stdout??????????????????????????????????????????????????????????1.10??????
> ????????????????????????????????????
--
Best,
Benchao Li