Re:Re: flink sql api, exception when setting "table.exec.state.ttl"

2022-05-25 Thread 李诗君



I have feagured this out.
It was because I put a flink-connector-tidb-cdc.jar in my Flink's lib folder 
earlier, and it is shipped with scala 2.11, while my flink is shipped with 
scala2.12.
Some how when I submit a job with GroupAggregate operator, it needs to load 
keyed rocksdb states, and here come into a conflict.
I will look into it and give a solution.










At 2022-05-23 20:55:39, "Chesnay Schepler"  wrote:

You're probably mixing Flink versions.


From the stack trace we can see that Flink classes are being loaded from 2 
different jars (rocketmq-flink-1.0.0-SNAPSHOT.jar/flink-dist_2.12-1.13.5.jar); 
I'd suggest to resolve that first and see if the error persists.



On 23/05/2022 14:32, 李诗君 wrote:

flink version: 1.13.5


java code:


StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);env.enableCheckpointing(6);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(6);
env.getCheckpointConfig().setCheckpointTimeout(6);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(

CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
//
env.getCheckpointConfig().setCheckpointStorage("hdfs://test-wh-hadoop-1:9000/flink-checkpoints");

env.setStateBackend(new 
RocksDBStateBackend("hdfs://test-wh-hadoop-1:9000/flink-checkpoints",
true));

tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
Configuration configuration = tableEnv.getConfig().getConfiguration();
//
configuration.setString("table.exec.resource.default-parallelism","16");
configuration.setString("table.exec.state.ttl","720");


and when I submit this job , I got this:


Sink: Sink(table=[default_catalog.default_database.rts_board_trans_compute], 
fields=[mchnt_id, time_hour, channel, trans_count, trans_amount, 
average_amount]) (1/1) (f8649f8434775cbda10bcedce96c9ae3) switched 
fromINITIALIZING to FAILEDon container_1647420330066_0473_01_02 @ 
test-wh-hadoop-1 (dataPort=38604).
java.lang.UnsatisfiedLinkError: 
org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder()J
at 
org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder(NativeMethod)
 ~[flink-dist_2.12-1.13.5.jar:2.2.0]
at 
org.rocksdb.FlinkCompactionFilter.access$000(FlinkCompactionFilter.java:13) 
~[flink-dist_2.12-1.13.5.jar:2.2.0]
at 
org.rocksdb.FlinkCompactionFilter$ConfigHolder.(FlinkCompactionFilter.java:107)
 ~[flink-dist_2.12-1.13.5.jar:2.2.0]
at 
org.rocksdb.FlinkCompactionFilter$FlinkCompactionFilterFactory.(FlinkCompactionFilter.java:133)
 ~[flink-dist_2.12-1.13.5.jar:2.2.0]
at 
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.createAndSetCompactFilterFactory(RocksDbTtlCompactFiltersManager.java:84)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(RocksDbTtlCompactFiltersManager.java:74)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyDescriptor(RocksDBOperationUtils.java:167)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createStateInfo(RocksDBOperationUtils.java:144)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:643)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:208)
 ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:143)
 ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:130)
 ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:70)
 ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301)

Re: flink sql api, exception when setting "table.exec.state.ttl"

2022-05-23 Thread Chesnay Schepler

You're probably mixing Flink versions.

From the stack trace we can see that Flink classes are being loaded 
from 2 different jars 
(rocketmq-flink-1.0.0-SNAPSHOT.jar/flink-dist_2.12-1.13.5.jar); I'd 
suggest to resolve that first and see if the error persists.


On 23/05/2022 14:32, 李诗君 wrote:

flink version: 1.13.5

java code:

StreamExecutionEnvironment env 
=StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings 
settings =EnvironmentSettings.newInstance()
 .useBlinkPlanner()
 .inStreamingMode()
 .build(); StreamTableEnvironmenttableEnv = 
StreamTableEnvironment.create(env, 
settings);env.enableCheckpointing(6);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(6); 
env.getCheckpointConfig().setCheckpointTimeout(6); 
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2); 
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); 
env.getCheckpointConfig().enableExternalizedCheckpoints(
 CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // 
env.getCheckpointConfig().setCheckpointStorage("hdfs://test-wh-hadoop-1:9000/flink-checkpoints"); 
env.setStateBackend(new RocksDBStateBackend("hdfs://test-wh-hadoop-1:9000/flink-checkpoints", true)); tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); Configuration configuration =tableEnv.getConfig().getConfiguration(); // 
configuration.setString("table.exec.resource.default-parallelism","16"); 
configuration.setString("table.exec.state.ttl","720");


and when I submit this job , I got this:

Sink: 
Sink(table=[default_catalog.default_database.rts_board_trans_compute], 
fields=[mchnt_id, time_hour, channel, trans_count, trans_amount, 
average_amount]) (1/1) (f8649f8434775cbda10bcedce96c9ae3) switched 
from INITIALIZING to FAILED on container_1647420330066_0473_01_02 
@ test-wh-hadoop-1 (dataPort=38604).
java.lang.UnsatisfiedLinkError: 
org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder()J
at 
org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder(Native 
Method) ~[flink-dist_2.12-1.13.5.jar:2.2.0]
at 
org.rocksdb.FlinkCompactionFilter.access$000(FlinkCompactionFilter.java:13) 
~[flink-dist_2.12-1.13.5.jar:2.2.0]
at 
org.rocksdb.FlinkCompactionFilter$ConfigHolder.(FlinkCompactionFilter.java:107) 
~[flink-dist_2.12-1.13.5.jar:2.2.0]
at 
org.rocksdb.FlinkCompactionFilter$FlinkCompactionFilterFactory.(FlinkCompactionFilter.java:133) 
~[flink-dist_2.12-1.13.5.jar:2.2.0]
at 
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.createAndSetCompactFilterFactory(RocksDbTtlCompactFiltersManager.java:84) 
~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(RocksDbTtlCompactFiltersManager.java:74) 
~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyDescriptor(RocksDBOperationUtils.java:167) 
~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createStateInfo(RocksDBOperationUtils.java:144) 
~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:643) 
~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837) 
~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:208) 
~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:143) 
~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:130) 
~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:70) 
~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301) 
~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352) 
~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115) 
~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) 
~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
 

flink sql api, exception when setting "table.exec.state.ttl"

2022-05-23 Thread 李诗君
flink version: 1.13.5


java code:


StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

env.enableCheckpointing(6);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(6);
env.getCheckpointConfig().setCheckpointTimeout(6);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
//
env.getCheckpointConfig().setCheckpointStorage("hdfs://test-wh-hadoop-1:9000/flink-checkpoints");

env.setStateBackend(new 
RocksDBStateBackend("hdfs://test-wh-hadoop-1:9000/flink-checkpoints",
true));

tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
Configuration configuration = tableEnv.getConfig().getConfiguration();
//
configuration.setString("table.exec.resource.default-parallelism","16");
configuration.setString("table.exec.state.ttl","720");


and when I submit this job , I got this:


Sink: Sink(table=[default_catalog.default_database.rts_board_trans_compute], 
fields=[mchnt_id, time_hour, channel, trans_count, trans_amount, 
average_amount]) (1/1) (f8649f8434775cbda10bcedce96c9ae3) switched 
fromINITIALIZING to FAILEDon container_1647420330066_0473_01_02 @ 
test-wh-hadoop-1 (dataPort=38604).
java.lang.UnsatisfiedLinkError: 
org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder()J
at 
org.rocksdb.FlinkCompactionFilter.createNewFlinkCompactionFilterConfigHolder(NativeMethod)
 ~[flink-dist_2.12-1.13.5.jar:2.2.0]
at 
org.rocksdb.FlinkCompactionFilter.access$000(FlinkCompactionFilter.java:13) 
~[flink-dist_2.12-1.13.5.jar:2.2.0]
at 
org.rocksdb.FlinkCompactionFilter$ConfigHolder.(FlinkCompactionFilter.java:107)
 ~[flink-dist_2.12-1.13.5.jar:2.2.0]
at 
org.rocksdb.FlinkCompactionFilter$FlinkCompactionFilterFactory.(FlinkCompactionFilter.java:133)
 ~[flink-dist_2.12-1.13.5.jar:2.2.0]
at 
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.createAndSetCompactFilterFactory(RocksDbTtlCompactFiltersManager.java:84)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(RocksDbTtlCompactFiltersManager.java:74)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyDescriptor(RocksDBOperationUtils.java:167)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createStateInfo(RocksDBOperationUtils.java:144)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:643)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:208)
 ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:143)
 ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:130)
 ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:70)
 ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301)
 ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352)
 ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
 ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
 ~[rocketmq-flink-1.0.0-SNAPSHOT.jar:?]
at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:203)
 ~[flink-dist_2.12-1.13.5.jar:1.13.5]
at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.open(GroupAggFunction.java:119)