I think this is related to Iceberg Flink connector implementation and connector memory configurations, I cc @Jingsong who is more familiar with this part.
Best, Leonard > 在 2021年8月2日,12:20,Ayush Chauhan <[email protected]> 写道: > > Hi Leonard, > > I am using flink 1.11.2 and using debezium-json to read CDC data generated by > debezium. > > For each table, I convert the Kafka dynamic table to a retract stream and > finally that stream is converted to DataStream<RowData>. Here's the sample > function > private DataStream<RowData> getDataStream(String sql) { > LOGGER.debug(sql); > Table out = tEnv.sqlQuery(sql); > DataStream<Tuple2<Boolean, Row>> dsRow = tEnv.toRetractStream(out, > Row.class); > return dsRow.map((MapFunction<Tuple2<Boolean, Row>, RowData>) t2 -> { > RowKind rowKind = t2.f1.getKind(); > GenericRowData genericRowData = new GenericRowData(rowKind, > t2.f1.getArity()); > for (int pos = 0; pos < t2.f1.getArity(); pos = pos + 1) { > Object object = t2.f1.getField(pos); > Object convertedType; > if (object instanceof String) { > convertedType = > RowDataUtil.convertConstant(Types.StringType.get(), object); > } else if (object instanceof LocalDateTime) { > convertedType = > TimestampData.fromLocalDateTime((LocalDateTime) object); > } else { > convertedType = object; > } > genericRowData.setField(pos, convertedType); > } > return genericRowData; > }); > } > > I then pass this datastream to the Flink sink. > FlinkSink.forRowData(rowDataDataStream) > .table(icebergTable) > > .tableSchema(FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergTable.schema()))) > .tableLoader(tableLoader) > .equalityFieldColumns(tableConfig.getEqualityColumns()) > .build(); > > Please let me know if you need some other information too > > > On Mon, Aug 2, 2021 at 7:48 AM Leonard Xu <[email protected] > <mailto:[email protected]>> wrote: > Hi, Ayush > > Thanks for the detailed description. > > Before analyze the issue, I have two questions that which Flink and Flink CDC > version are you using? Is Flink CDC used in SQL or DataStream ? > That’s helpful if you can post you Flink CDC connector parameters. > > Best, > Leonard > >> 在 2021年7月29日,18:57,Ayush Chauhan <[email protected] >> <mailto:[email protected]>> 写道: >> >> Hi all, >> >> We are using Flink + iceberg to consume CDC data. We have combined all the >> tables of a single DB in one job. Our job is frequently running into GC >> issues. Earlier it was running default on parallel GC and I have changed it >> to G1GC. G1GC did bring some improvements but still, I am facing the same >> problem. >> >> Following are the params on my job - -ytm 5120m -yjm 1024m -yD >> env.java.opts="-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35" >> >> This job is running CDC ingestion for 17 tables with a parallelism of 1 and >> throughput is around ~10k messages for the 10minutes checkpointing interval >> >> I am attaching a part of the thread dump in this email. >> >> During old GC, the job gets stuck and its checkpointing which is normally >> under 1 sec gets increased exponentially to the timeout threshold. Job >> either get failed due to checkpointing timeout or it failed to get the >> heartbeat of the task manager >> >> <Screenshot 2021-07-29 at 16.09.19.png> >> <Screenshot 2021-07-29 at 16.08.58.png> >> >> >> -- >> Ayush Chauhan >> >> >> >> This email is intended only for the person or the entity to whom it is >> addressed. If you are not the intended recipient, please delete this email >> and contact the sender. >> <thread_dump.txt> > > > > -- > Ayush Chauhan > Data Platform > +91 9990747111 > > > > This email is intended only for the person or the entity to whom it is > addressed. If you are not the intended recipient, please delete this email > and contact the sender.
