如果去掉group by会抛出异常,请问有没有关这个异常的解决方式:
Exception in thread "main" org.apache.flink.table.api.TableException:
Provided trait [BEFORE_AND_AFTER] can't satisfy required trait
[ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
Current node is TableSourceScan(table=[[default_catalog, default_database,
ddd]], fields=[id, age])
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitChildren$2(FlinkChangelogModeInferenceProgram.scala:626)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:614)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitSink$1(FlinkChangelogModeInferenceProgram.scala:690)
at
scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:240)
Jark wrote
> Hi,
>
> 能确认一下 kafka 中有完整的全量数据吗? 也就是 这个 DELETE 消息之前,有对应的 INSERT 消息吗?
> 如果没有的话,是可能会发生这个现象的(DELETE 在 group by 节点会被认为脏数据而丢掉)。
> 当然也可以像 godfrey 建议的那样,不 groupby,直接全部字段 INSERT INTO sink,DELETE 就不会被丢弃掉。
>
> Best,
> Jark
>
> On Thu, 16 Jul 2020 at 21:56, godfrey he <
> godfreyhe@
> > wrote:
>
>> 为什么要 GROUP BY id,name ,description, weight ?
>> 直接 "INSERT INTO sink SELECT id,name ,description, weight FROM
>> debezium_source" 不能满足需求?
>>
>> 曹武 <
> 14701319164@
>> 于2020年7月16日周四 下午9:30写道:
>>
>> > 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候
>> > 从checkpoint恢复以后,新来op=d的数据会删除失败
>> > 重启命令:./bin/flink run -m yarn-cluster /root/bigdata-flink-1.0.jar -s
>> >
>> >
>> hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata
>> > 代码: EnvironmentSettings settings = EnvironmentSettings.newInstance()
>> > .useBlinkPlanner()
>> > .inStreamingMode()
>> > .build();
>> >
>> > StreamExecutionEnvironment env =
>> > StreamExecutionEnvironment.getExecutionEnvironment();
>> >
>> > env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
>> > env.getCheckpointConfig().setCheckpointTimeout(6000L); // 超时时间
>> > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //
>> > 最大允许同时出现几个CheckPoint
>> > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10L);
>> //
>> > 最小得间隔时间
>> > env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
>> //
>> > 是否倾向于用CheckPoint做故障恢复
>> >
>> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
>> > //
>> > 容忍多少次CheckPoint失败
>> > //Checkpoint文件清理策略
>> >
>> >
>> >
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> > //Checkpoint外部文件路径
>> > env.setStateBackend(new FsStateBackend(new
>> > URI("hdfs://172.22.20.205:8020/flink/checkpoints"), false));
>> > TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));
>> > StreamTableEnvironment tEnv =
>> StreamTableEnvironment.create(env,
>> > settings);
>> > String sourceDDL = String.format(
>> > "CREATE TABLE debezium_source (" +
>> > " id INT NOT NULL," +
>> > " name STRING," +
>> > " description STRING," +
>> > " weight Double" +
>> > ") WITH (" +
>> > " 'connector' = 'kafka-0.11'," +
>> > " 'topic' = '%s'," +
>> > " 'properties.bootstrap.servers' = '%s'," +
>> > " 'scan.startup.mode' = 'group-offsets'," +
>> > " 'format' = 'debezium-json'" +
>> > ")", "ddd", " 172.22.20.206:9092");
>> > String sinkDDL = "CREATE TABLE sink (" +
>> > " id INT NOT NULL," +
>> > " name STRING," +
>> > " description STRING," +
>> > " weight Double," +
>> > " PRIMARY KEY (id,name, description,weight) NOT
>> ENFORCED
>> "
>> > +
>> > ") WITH (" +
>> > " 'connector' = 'jdbc'," +
>> > " 'url' =
>> > 'jdbc:mysql://172.27.4.22:3306/test?autoReconnect=true'," +
>> > " 'table-name' = 'products'," +
>> > " 'driver'= 'com.mysql.cj.jdbc.Driver'," +
>> > " 'username'='DataPip'," +
>> > " 'password'='DataPip'" +
>> > ")";
>> > String dml = "INSERT INTO sink SELECT id,name ,description,
>> weight
>> > FROM debezium_source GROUP BY id,name ,description, weight";
>> > tEnv.executeSql(sourceDDL);
>> > tEnv.executeSql(sinkDDL);
>> > tEnv.executeSql(dml);
>> >
>> >
>> >
>> > --
>> > Sent from: http://apache-flink.147419.n8.nabble.com/
>> >
>>
--
Sent from: http://apache-flink.147419.n8.nabble.com/