谢谢解答!
稍微用代码写了一下, 如下:
public class JoinMain1 {
private static final String host = "127.0.0.1";
private static final int port = 9000;
private static final int port1 = 9001;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
SingleOutputStreamOperator<Student> student =
env.socketTextStream(host, port, "\n")
.map(new MapFunction<String, Student>() {
@Override
public Student map(String s) throws Exception {
String[] tokens = s.toLowerCase().split(",");
Student ss = new Student();
ss.setId(Integer.valueOf(tokens[0]));
ss.setName(tokens[1]);
return ss;
}
}
);
SingleOutputStreamOperator<Score> score =
env.socketTextStream(host, port1, "\n")
.map(new MapFunction<String, Score>() {
@Override
public Score map(String s) throws Exception {
String[] tokens = s.toLowerCase().split(",");
Score ss = new Score();
ss.setId(Integer.valueOf(tokens[0]));
ss.setName(tokens[1]);
ss.setSid(Integer.valueOf(tokens[2]));
ss.setScore(Integer.valueOf(tokens[3]));
return ss;
}
}
);
/// sql
tEnv.registerDataStream("student", student, "id, name");
tEnv.registerDataStream("score", score, "id, name, sid, score");
Table t = tEnv.sqlQuery("select sum(t2.score) from student t1
inner join score t2 on t1.id = t2.sid");
tEnv.toRetractStream(t, Integer.class).print("result");
env.execute("table join");
}
}
测试数据如下(红色数据为操作顺序):
[image: image.png]
执行结果如下:
[image: image.png]
1 2 3步骤计算出195(1个studen join 2个分数),
4步骤计算出390(原来195 + 新的student join 2个分数)
5步计算出196(390 + 新分数 join 2个student, 其实是 390 - 97 -97)
6步骤计算出390(196 + 新分数 join 2个student)
在第4步的时候, 分数就不太对了, 因为流中同一个student对应了2个事件
xin Destiny <[email protected]> 于2020年1月14日周二 下午6:39写道:
> Hi,
> 如果说插入两条update操作呢,一次分数是-97,一次是97
>
>
>
>
> Ren Xie <[email protected]> 于2020年1月14日周二 下午6:20写道:
>
> > 实际场景还是有点复杂的, 便于理解 我简化成这样的, 简化后的这个, 没有实际的代码, 抱歉
> >
> > 大致 写一下 也就是这样了
> > ```sql
> > select sum(score)
> > from
> > student t1 inner join score t2 on t1.student_id = t2.std_id
> > where
> > t1.student_id = 11
> > ```
> > 然后
> >
> > ```Java
> > String sql = ↑;
> > Table t = tEnv.sqlQuery(sql);
> > DataStream<Integer> stream1 = tEnv.toAppendStream(t, Integer.class);
> > stream1.keyBy("xxxx").sum("xxxx");
> > ```
> >
> > 这样的一个sql, 在student表插入一个数据, score表插入2个数据后, 会执行一次计算出一个结果97 + 98
> >
> > update 学生表的name后, 一个新事件进入student的流, 还会触发一次计算, 得到97 + 98
> >
> > 因为可能有新的成绩插入, 所以对 stream1进行sum操作, 导致 97和98 都被重复计算了一次
> >
> >
> > Caizhi Weng <[email protected]> 于2020年1月14日周二 下午5:49写道:
> >
> > > Hi,
> > >
> > > 有可能的话,是否方便提供一下代码呢?
> > >
> > > Ren Xie <[email protected]> 于2020年1月14日周二 下午5:38写道:
> > >
> > > > 学生
> > > > student_id name
> > > > 11 foo
> > > >
> > > > 学科分数
> > > > id name score std_id
> > > > 100 math 97 11
> > > > 101 english 98 11
> > > >
> > > > 有如下一个场景(假设只有一个学生)
> > > >
> > > > 基于binlog检测这2个表的变化, 计算这个学生的总分数, 使用了Table/SQL API join操作计算
> > > >
> > > > 假设insert以上数据后到达某时刻, 以上数据都进入了flink, 计算出这个学生总分数 97 + 98 = 195
> > > >
> > > > 但此时发现学生姓名登记错误, 于是进行了修改,
> > > > 结果此时Flink中学生流中有2个事件(insert的一个+update的一个), 分数流中有2个事件, 计算的总分数就会是 2 *
> (97
> > +
> > > > 98) = 390
> > > >
> > > > Q: 请问这种场景下使用什么能够解决, 计算出正确的结果 97 + 98 = 193
> > > >
> > > > 接触flink不久, 不是太了解, 请大佬给个提示, 谢谢!!
> > > >
> > >
> >
>