[jira] [Updated] (FLINK-30935) Add KafkaSerializer deserialize check when using SimpleVersionedSerializer
[ https://issues.apache.org/jira/browse/FLINK-30935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-30935: --- Labels: auto-deprioritized-major pull-request-available (was: pull-request-available stale-major) Priority: Minor (was: Major) This issue was labeled "stale-major" 7 days ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > Add KafkaSerializer deserialize check when using SimpleVersionedSerializer > -- > > Key: FLINK-30935 > URL: https://issues.apache.org/jira/browse/FLINK-30935 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Ran Tao >Priority: Minor > Labels: auto-deprioritized-major, pull-request-available > > Current kafka many implemented serializers do not deal with version check > while other implementations of SimpleVersionedSerializer supports it. > we can add it like many other connectors's implementation in case of > incompatible or corrupt state when restoring from checkpoint. > > {code:java} > @Override > public int getVersion() { > return CURRENT_VERSION; > } > @Override > public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws > IOException { > try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); > DataInputStream in = new DataInputStream(bais)) { > String topic = in.readUTF(); > int partition = in.readInt(); > long offset = in.readLong(); > long stoppingOffset = in.readLong(); > return new KafkaPartitionSplit( > new TopicPartition(topic, partition), offset, stoppingOffset); > } > } {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30935) Add KafkaSerializer deserialize check when using SimpleVersionedSerializer
[ https://issues.apache.org/jira/browse/FLINK-30935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-30935: --- Labels: pull-request-available stale-major (was: pull-request-available) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Major but is unassigned and neither itself nor its Sub-Tasks have been updated for 60 days. I have gone ahead and added a "stale-major" to the issue". If this ticket is a Major, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Add KafkaSerializer deserialize check when using SimpleVersionedSerializer > -- > > Key: FLINK-30935 > URL: https://issues.apache.org/jira/browse/FLINK-30935 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Ran Tao >Priority: Major > Labels: pull-request-available, stale-major > > Current kafka many implemented serializers do not deal with version check > while other implementations of SimpleVersionedSerializer supports it. > we can add it like many other connectors's implementation in case of > incompatible or corrupt state when restoring from checkpoint. > > {code:java} > @Override > public int getVersion() { > return CURRENT_VERSION; > } > @Override > public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws > IOException { > try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); > DataInputStream in = new DataInputStream(bais)) { > String topic = in.readUTF(); > int partition = in.readInt(); > long offset = in.readLong(); > long stoppingOffset = in.readLong(); > return new KafkaPartitionSplit( > new TopicPartition(topic, partition), offset, stoppingOffset); > } > } {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30935) Add KafkaSerializer deserialize check when using SimpleVersionedSerializer
[ https://issues.apache.org/jira/browse/FLINK-30935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ran Tao updated FLINK-30935: Description: Current kafka many implemented serializers do not deal with version check while other implementations of SimpleVersionedSerializer supports it. we can add it like many other connectors's implementation in case of incompatible or corrupt state when restoring from checkpoint. {code:java} @Override public int getVersion() { return CURRENT_VERSION; } @Override public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws IOException { try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); DataInputStream in = new DataInputStream(bais)) { String topic = in.readUTF(); int partition = in.readInt(); long offset = in.readLong(); long stoppingOffset = in.readLong(); return new KafkaPartitionSplit( new TopicPartition(topic, partition), offset, stoppingOffset); } } {code} was: {code:java} @Override public int getVersion() { return CURRENT_VERSION; } @Override public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws IOException { try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); DataInputStream in = new DataInputStream(bais)) { String topic = in.readUTF(); int partition = in.readInt(); long offset = in.readLong(); long stoppingOffset = in.readLong(); return new KafkaPartitionSplit( new TopicPartition(topic, partition), offset, stoppingOffset); } } {code} Current kafka many implemented serializers do not deal with version check. I think we can add it like many other connectors's implementation in case of incompatible or corrupt state when restoring from checkpoint. > Add KafkaSerializer deserialize check when using SimpleVersionedSerializer > -- > > Key: FLINK-30935 > URL: https://issues.apache.org/jira/browse/FLINK-30935 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Ran Tao >Priority: Major > Labels: pull-request-available > > Current kafka many implemented serializers do not deal with version check > while other implementations of SimpleVersionedSerializer supports it. > we can add it like many other connectors's implementation in case of > incompatible or corrupt state when restoring from checkpoint. > > {code:java} > @Override > public int getVersion() { > return CURRENT_VERSION; > } > @Override > public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws > IOException { > try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); > DataInputStream in = new DataInputStream(bais)) { > String topic = in.readUTF(); > int partition = in.readInt(); > long offset = in.readLong(); > long stoppingOffset = in.readLong(); > return new KafkaPartitionSplit( > new TopicPartition(topic, partition), offset, stoppingOffset); > } > } {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30935) Add KafkaSerializer deserialize check when using SimpleVersionedSerializer
[ https://issues.apache.org/jira/browse/FLINK-30935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-30935: --- Labels: pull-request-available (was: ) > Add KafkaSerializer deserialize check when using SimpleVersionedSerializer > -- > > Key: FLINK-30935 > URL: https://issues.apache.org/jira/browse/FLINK-30935 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Ran Tao >Priority: Major > Labels: pull-request-available > > {code:java} > @Override > public int getVersion() { > return CURRENT_VERSION; > } > @Override > public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws > IOException { > try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); > DataInputStream in = new DataInputStream(bais)) { > String topic = in.readUTF(); > int partition = in.readInt(); > long offset = in.readLong(); > long stoppingOffset = in.readLong(); > return new KafkaPartitionSplit( > new TopicPartition(topic, partition), offset, stoppingOffset); > } > } {code} > Current kafka many implemented serializers do not deal with version check. I > think we can add it like many other connectors's implementation in case of > incompatible or corrupt state when restoring from checkpoint. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30935) Add KafkaSerializer deserialize check when using SimpleVersionedSerializer
[ https://issues.apache.org/jira/browse/FLINK-30935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ran Tao updated FLINK-30935: Description: {code:java} @Override public int getVersion() { return CURRENT_VERSION; } @Override public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws IOException { try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); DataInputStream in = new DataInputStream(bais)) { String topic = in.readUTF(); int partition = in.readInt(); long offset = in.readLong(); long stoppingOffset = in.readLong(); return new KafkaPartitionSplit( new TopicPartition(topic, partition), offset, stoppingOffset); } } {code} Current kafka many implemented serializers do not deal with version check. I think we can add it like many other connectors's implementation in case of incompatible or corrupt state when restoring from checkpoint. was: {code:java} @Override public int getVersion() { return CURRENT_VERSION; } @Override public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws IOException { try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); DataInputStream in = new DataInputStream(bais)) { String topic = in.readUTF(); int partition = in.readInt(); long offset = in.readLong(); long stoppingOffset = in.readLong(); return new KafkaPartitionSplit( new TopicPartition(topic, partition), offset, stoppingOffset); } } {code} Current kafka many implemented serializers do not deal with version check. I think we can add it like many other connectors's implementation in case of incompatible or corrupt state when restoring from checkpoint. e.g. {code:java} @Override public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws IOException { switch (version) { case 0: return deserializeV0(serialized); default: throw new IOException("Unrecognized version or corrupt state: " + version); } } private KafkaPartitionSplit deserializeV0(byte[] serialized) throws IOException { try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); DataInputStream in = new DataInputStream(bais)) { String topic = in.readUTF(); int partition = in.readInt(); long offset = in.readLong(); long stoppingOffset = in.readLong(); return new KafkaPartitionSplit( new TopicPartition(topic, partition), offset, stoppingOffset); } } {code} > Add KafkaSerializer deserialize check when using SimpleVersionedSerializer > -- > > Key: FLINK-30935 > URL: https://issues.apache.org/jira/browse/FLINK-30935 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Ran Tao >Priority: Major > > {code:java} > @Override > public int getVersion() { > return CURRENT_VERSION; > } > @Override > public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws > IOException { > try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); > DataInputStream in = new DataInputStream(bais)) { > String topic = in.readUTF(); > int partition = in.readInt(); > long offset = in.readLong(); > long stoppingOffset = in.readLong(); > return new KafkaPartitionSplit( > new TopicPartition(topic, partition), offset, stoppingOffset); > } > } {code} > Current kafka many implemented serializers do not deal with version check. I > think we can add it like many other connectors's implementation in case of > incompatible or corrupt state when restoring from checkpoint. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30935) Add KafkaSerializer deserialize check when using SimpleVersionedSerializer
[ https://issues.apache.org/jira/browse/FLINK-30935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ran Tao updated FLINK-30935: Description: {code:java} @Override public int getVersion() { return CURRENT_VERSION; } @Override public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws IOException { try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); DataInputStream in = new DataInputStream(bais)) { String topic = in.readUTF(); int partition = in.readInt(); long offset = in.readLong(); long stoppingOffset = in.readLong(); return new KafkaPartitionSplit( new TopicPartition(topic, partition), offset, stoppingOffset); } } {code} Current kafka many implemented serializers do not deal with version check. I think we can add it like many other connectors's implementation in case of incompatible or corrupt state when restoring from checkpoint. e.g. {code:java} @Override public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws IOException { switch (version) { case 0: return deserializeV0(serialized); default: throw new IOException("Unrecognized version or corrupt state: " + version); } } private KafkaPartitionSplit deserializeV0(byte[] serialized) throws IOException { try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); DataInputStream in = new DataInputStream(bais)) { String topic = in.readUTF(); int partition = in.readInt(); long offset = in.readLong(); long stoppingOffset = in.readLong(); return new KafkaPartitionSplit( new TopicPartition(topic, partition), offset, stoppingOffset); } } {code} was: {code:java} @Override public int getVersion() { return CURRENT_VERSION; } @Override public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws IOException { try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); DataInputStream in = new DataInputStream(bais)) { String topic = in.readUTF(); int partition = in.readInt(); long offset = in.readLong(); long stoppingOffset = in.readLong(); return new KafkaPartitionSplit( new TopicPartition(topic, partition), offset, stoppingOffset); } } {code} Current kafka many implemented serializers do not deal with version check. I think we can add it like many other connectors in case of incompatible or corrupt state when restoring from checkpoint.. > Add KafkaSerializer deserialize check when using SimpleVersionedSerializer > -- > > Key: FLINK-30935 > URL: https://issues.apache.org/jira/browse/FLINK-30935 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Ran Tao >Priority: Major > > {code:java} > @Override > public int getVersion() { > return CURRENT_VERSION; > } > @Override > public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws > IOException { > try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); > DataInputStream in = new DataInputStream(bais)) { > String topic = in.readUTF(); > int partition = in.readInt(); > long offset = in.readLong(); > long stoppingOffset = in.readLong(); > return new KafkaPartitionSplit( > new TopicPartition(topic, partition), offset, stoppingOffset); > } > } {code} > Current kafka many implemented serializers do not deal with version check. I > think we can add it like many other connectors's implementation in case of > incompatible or corrupt state when restoring from checkpoint. > e.g. > {code:java} > @Override > public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws > IOException { > switch (version) { > case 0: > return deserializeV0(serialized); > default: > throw new IOException("Unrecognized version or corrupt state: " + > version); > } > } > private KafkaPartitionSplit deserializeV0(byte[] serialized) throws > IOException { > try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); > DataInputStream in = new DataInputStream(bais)) { > String topic = in.readUTF(); > int partition = in.readInt(); > long offset = in.readLong(); > long stoppingOffset = in.readLong(); > return new KafkaPartitionSplit( > new TopicPartition(topic, partition), offset, stoppingOffset); > } > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30935) Add KafkaSerializer deserialize check when using SimpleVersionedSerializer
[ https://issues.apache.org/jira/browse/FLINK-30935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ran Tao updated FLINK-30935: Description: {code:java} @Override public int getVersion() { return CURRENT_VERSION; } @Override public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws IOException { try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); DataInputStream in = new DataInputStream(bais)) { String topic = in.readUTF(); int partition = in.readInt(); long offset = in.readLong(); long stoppingOffset = in.readLong(); return new KafkaPartitionSplit( new TopicPartition(topic, partition), offset, stoppingOffset); } } {code} Current kafka many implemented serializers do not deal with version check. I think we can add it like many other connectors in case of incompatible or corrupt state when restoring from checkpoint.. was: {code:java} @Override public int getVersion() { return CURRENT_VERSION; } @Override public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws IOException { try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); DataInputStream in = new DataInputStream(bais)) { String topic = in.readUTF(); int partition = in.readInt(); long offset = in.readLong(); long stoppingOffset = in.readLong(); return new KafkaPartitionSplit( new TopicPartition(topic, partition), offset, stoppingOffset); } } {code} Current kafka many implemented serializers do not deal with version check. I think we can add it like many other connectors in case of incompatible or corrupt state when restoring from checkpoint.. > Add KafkaSerializer deserialize check when using SimpleVersionedSerializer > -- > > Key: FLINK-30935 > URL: https://issues.apache.org/jira/browse/FLINK-30935 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Ran Tao >Priority: Major > > {code:java} > @Override > public int getVersion() { > return CURRENT_VERSION; > } > @Override > public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws > IOException { > try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); > DataInputStream in = new DataInputStream(bais)) { > String topic = in.readUTF(); > int partition = in.readInt(); > long offset = in.readLong(); > long stoppingOffset = in.readLong(); > return new KafkaPartitionSplit( > new TopicPartition(topic, partition), offset, stoppingOffset); > } > } {code} > Current kafka many implemented serializers do not deal with version check. I > think we can add it like many other connectors in case of incompatible or > corrupt state when restoring from checkpoint.. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30935) Add KafkaSerializer deserialize check when using SimpleVersionedSerializer
[ https://issues.apache.org/jira/browse/FLINK-30935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-30935: --- Affects Version/s: (was: 1.16.1) > Add KafkaSerializer deserialize check when using SimpleVersionedSerializer > -- > > Key: FLINK-30935 > URL: https://issues.apache.org/jira/browse/FLINK-30935 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Ran Tao >Priority: Major > > {code:java} > @Override > public int getVersion() { > return CURRENT_VERSION; > } > @Override > public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws > IOException { > try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); > DataInputStream in = new DataInputStream(bais)) { > String topic = in.readUTF(); > int partition = in.readInt(); > long offset = in.readLong(); > long stoppingOffset = in.readLong(); > return new KafkaPartitionSplit( > new TopicPartition(topic, partition), offset, stoppingOffset); > } > } {code} > Current kafka many implemented serializers do not deal with version check. I > think we can add it like many other connectors in case of incompatible or > corrupt state when restoring from checkpoint.. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30935) Add KafkaSerializer deserialize check when using SimpleVersionedSerializer
[ https://issues.apache.org/jira/browse/FLINK-30935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ran Tao updated FLINK-30935: Description: {code:java} @Override public int getVersion() { return CURRENT_VERSION; } @Override public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws IOException { try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); DataInputStream in = new DataInputStream(bais)) { String topic = in.readUTF(); int partition = in.readInt(); long offset = in.readLong(); long stoppingOffset = in.readLong(); return new KafkaPartitionSplit( new TopicPartition(topic, partition), offset, stoppingOffset); } } {code} Current kafka many implemented serializers do not deal with version check. I think we can add it like many other connectors in case of incompatible or corrupt state. was: {code:java} @Override public int getVersion() { return CURRENT_VERSION; } @Override public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws IOException { try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); DataInputStream in = new DataInputStream(bais)) { String topic = in.readUTF(); int partition = in.readInt(); long offset = in.readLong(); long stoppingOffset = in.readLong(); return new KafkaPartitionSplit( new TopicPartition(topic, partition), offset, stoppingOffset); } } {code} Current kafka serializers do not deal with version check. I think we can add it like many other connectors in case of incompatible or corrupt state. > Add KafkaSerializer deserialize check when using SimpleVersionedSerializer > -- > > Key: FLINK-30935 > URL: https://issues.apache.org/jira/browse/FLINK-30935 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.16.1 >Reporter: Ran Tao >Priority: Major > > {code:java} > @Override > public int getVersion() { > return CURRENT_VERSION; > } > @Override > public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws > IOException { > try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); > DataInputStream in = new DataInputStream(bais)) { > String topic = in.readUTF(); > int partition = in.readInt(); > long offset = in.readLong(); > long stoppingOffset = in.readLong(); > return new KafkaPartitionSplit( > new TopicPartition(topic, partition), offset, stoppingOffset); > } > } {code} > Current kafka many implemented serializers do not deal with version check. I > think we can add it like many other connectors in case of incompatible or > corrupt state. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30935) Add KafkaSerializer deserialize check when using SimpleVersionedSerializer
[ https://issues.apache.org/jira/browse/FLINK-30935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ran Tao updated FLINK-30935: Description: {code:java} @Override public int getVersion() { return CURRENT_VERSION; } @Override public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws IOException { try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); DataInputStream in = new DataInputStream(bais)) { String topic = in.readUTF(); int partition = in.readInt(); long offset = in.readLong(); long stoppingOffset = in.readLong(); return new KafkaPartitionSplit( new TopicPartition(topic, partition), offset, stoppingOffset); } } {code} Current kafka many implemented serializers do not deal with version check. I think we can add it like many other connectors in case of incompatible or corrupt state when restoring from checkpoint.. was: {code:java} @Override public int getVersion() { return CURRENT_VERSION; } @Override public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws IOException { try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); DataInputStream in = new DataInputStream(bais)) { String topic = in.readUTF(); int partition = in.readInt(); long offset = in.readLong(); long stoppingOffset = in.readLong(); return new KafkaPartitionSplit( new TopicPartition(topic, partition), offset, stoppingOffset); } } {code} Current kafka many implemented serializers do not deal with version check. I think we can add it like many other connectors in case of incompatible or corrupt state. > Add KafkaSerializer deserialize check when using SimpleVersionedSerializer > -- > > Key: FLINK-30935 > URL: https://issues.apache.org/jira/browse/FLINK-30935 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.16.1 >Reporter: Ran Tao >Priority: Major > > {code:java} > @Override > public int getVersion() { > return CURRENT_VERSION; > } > @Override > public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws > IOException { > try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); > DataInputStream in = new DataInputStream(bais)) { > String topic = in.readUTF(); > int partition = in.readInt(); > long offset = in.readLong(); > long stoppingOffset = in.readLong(); > return new KafkaPartitionSplit( > new TopicPartition(topic, partition), offset, stoppingOffset); > } > } {code} > Current kafka many implemented serializers do not deal with version check. I > think we can add it like many other connectors in case of incompatible or > corrupt state when restoring from checkpoint.. -- This message was sent by Atlassian Jira (v8.20.10#820010)