[jira] [Updated] (FLINK-30935) Add KafkaSerializer deserialize check when using SimpleVersionedSerializer

2023-09-10 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-30935:
---
  Labels: auto-deprioritized-major pull-request-available  (was: 
pull-request-available stale-major)
Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Major, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.


> Add KafkaSerializer deserialize check when using SimpleVersionedSerializer
> --
>
> Key: FLINK-30935
> URL: https://issues.apache.org/jira/browse/FLINK-30935
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Ran Tao
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
>
> Current kafka many implemented serializers do not deal with version check 
> while other implementations of SimpleVersionedSerializer supports it.
> we can add it like many other connectors's implementation in case of 
> incompatible or corrupt state when restoring from checkpoint.
>  
> {code:java}
> @Override
> public int getVersion() {
> return CURRENT_VERSION;
> }
> @Override
> public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
> IOException {
> try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
> DataInputStream in = new DataInputStream(bais)) {
> String topic = in.readUTF();
> int partition = in.readInt();
> long offset = in.readLong();
> long stoppingOffset = in.readLong();
> return new KafkaPartitionSplit(
> new TopicPartition(topic, partition), offset, stoppingOffset);
> }
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30935) Add KafkaSerializer deserialize check when using SimpleVersionedSerializer

2023-09-02 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-30935:
---
Labels: pull-request-available stale-major  (was: pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Major but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 60 days. I have gone ahead and added a "stale-major" to the issue". If this 
ticket is a Major, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Add KafkaSerializer deserialize check when using SimpleVersionedSerializer
> --
>
> Key: FLINK-30935
> URL: https://issues.apache.org/jira/browse/FLINK-30935
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Ran Tao
>Priority: Major
>  Labels: pull-request-available, stale-major
>
> Current kafka many implemented serializers do not deal with version check 
> while other implementations of SimpleVersionedSerializer supports it.
> we can add it like many other connectors's implementation in case of 
> incompatible or corrupt state when restoring from checkpoint.
>  
> {code:java}
> @Override
> public int getVersion() {
> return CURRENT_VERSION;
> }
> @Override
> public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
> IOException {
> try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
> DataInputStream in = new DataInputStream(bais)) {
> String topic = in.readUTF();
> int partition = in.readInt();
> long offset = in.readLong();
> long stoppingOffset = in.readLong();
> return new KafkaPartitionSplit(
> new TopicPartition(topic, partition), offset, stoppingOffset);
> }
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30935) Add KafkaSerializer deserialize check when using SimpleVersionedSerializer

2023-03-22 Thread Ran Tao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ran Tao updated FLINK-30935:

Description: 
Current kafka many implemented serializers do not deal with version check while 
other implementations of SimpleVersionedSerializer supports it.

we can add it like many other connectors's implementation in case of 
incompatible or corrupt state when restoring from checkpoint.

 
{code:java}
@Override
public int getVersion() {
return CURRENT_VERSION;
}

@Override
public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
String topic = in.readUTF();
int partition = in.readInt();
long offset = in.readLong();
long stoppingOffset = in.readLong();
return new KafkaPartitionSplit(
new TopicPartition(topic, partition), offset, stoppingOffset);
}
} {code}
 

 

  was:
{code:java}
@Override
public int getVersion() {
return CURRENT_VERSION;
}

@Override
public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
String topic = in.readUTF();
int partition = in.readInt();
long offset = in.readLong();
long stoppingOffset = in.readLong();
return new KafkaPartitionSplit(
new TopicPartition(topic, partition), offset, stoppingOffset);
}
} {code}
Current kafka many implemented serializers do not deal with version check. I 
think we can add it like many other connectors's implementation in case of 
incompatible or corrupt state when restoring from checkpoint.


> Add KafkaSerializer deserialize check when using SimpleVersionedSerializer
> --
>
> Key: FLINK-30935
> URL: https://issues.apache.org/jira/browse/FLINK-30935
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Ran Tao
>Priority: Major
>  Labels: pull-request-available
>
> Current kafka many implemented serializers do not deal with version check 
> while other implementations of SimpleVersionedSerializer supports it.
> we can add it like many other connectors's implementation in case of 
> incompatible or corrupt state when restoring from checkpoint.
>  
> {code:java}
> @Override
> public int getVersion() {
> return CURRENT_VERSION;
> }
> @Override
> public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
> IOException {
> try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
> DataInputStream in = new DataInputStream(bais)) {
> String topic = in.readUTF();
> int partition = in.readInt();
> long offset = in.readLong();
> long stoppingOffset = in.readLong();
> return new KafkaPartitionSplit(
> new TopicPartition(topic, partition), offset, stoppingOffset);
> }
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30935) Add KafkaSerializer deserialize check when using SimpleVersionedSerializer

2023-02-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-30935:
---
Labels: pull-request-available  (was: )

> Add KafkaSerializer deserialize check when using SimpleVersionedSerializer
> --
>
> Key: FLINK-30935
> URL: https://issues.apache.org/jira/browse/FLINK-30935
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Ran Tao
>Priority: Major
>  Labels: pull-request-available
>
> {code:java}
> @Override
> public int getVersion() {
> return CURRENT_VERSION;
> }
> @Override
> public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
> IOException {
> try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
> DataInputStream in = new DataInputStream(bais)) {
> String topic = in.readUTF();
> int partition = in.readInt();
> long offset = in.readLong();
> long stoppingOffset = in.readLong();
> return new KafkaPartitionSplit(
> new TopicPartition(topic, partition), offset, stoppingOffset);
> }
> } {code}
> Current kafka many implemented serializers do not deal with version check. I 
> think we can add it like many other connectors's implementation in case of 
> incompatible or corrupt state when restoring from checkpoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30935) Add KafkaSerializer deserialize check when using SimpleVersionedSerializer

2023-02-09 Thread Ran Tao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ran Tao updated FLINK-30935:

Description: 
{code:java}
@Override
public int getVersion() {
return CURRENT_VERSION;
}

@Override
public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
String topic = in.readUTF();
int partition = in.readInt();
long offset = in.readLong();
long stoppingOffset = in.readLong();
return new KafkaPartitionSplit(
new TopicPartition(topic, partition), offset, stoppingOffset);
}
} {code}
Current kafka many implemented serializers do not deal with version check. I 
think we can add it like many other connectors's implementation in case of 
incompatible or corrupt state when restoring from checkpoint.

  was:
{code:java}
@Override
public int getVersion() {
return CURRENT_VERSION;
}

@Override
public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
String topic = in.readUTF();
int partition = in.readInt();
long offset = in.readLong();
long stoppingOffset = in.readLong();
return new KafkaPartitionSplit(
new TopicPartition(topic, partition), offset, stoppingOffset);
}
} {code}
Current kafka many implemented serializers do not deal with version check. I 
think we can add it like many other connectors's implementation in case of 
incompatible or corrupt state when restoring from checkpoint. 

e.g.
{code:java}
@Override
public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
IOException {
switch (version) {
case 0:
return deserializeV0(serialized);
default:
throw new IOException("Unrecognized version or corrupt state: " + 
version);
}
}

private KafkaPartitionSplit deserializeV0(byte[] serialized) throws IOException 
{
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
String topic = in.readUTF();
int partition = in.readInt();
long offset = in.readLong();
long stoppingOffset = in.readLong();
return new KafkaPartitionSplit(
new TopicPartition(topic, partition), offset, stoppingOffset);
}
} {code}


> Add KafkaSerializer deserialize check when using SimpleVersionedSerializer
> --
>
> Key: FLINK-30935
> URL: https://issues.apache.org/jira/browse/FLINK-30935
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Ran Tao
>Priority: Major
>
> {code:java}
> @Override
> public int getVersion() {
> return CURRENT_VERSION;
> }
> @Override
> public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
> IOException {
> try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
> DataInputStream in = new DataInputStream(bais)) {
> String topic = in.readUTF();
> int partition = in.readInt();
> long offset = in.readLong();
> long stoppingOffset = in.readLong();
> return new KafkaPartitionSplit(
> new TopicPartition(topic, partition), offset, stoppingOffset);
> }
> } {code}
> Current kafka many implemented serializers do not deal with version check. I 
> think we can add it like many other connectors's implementation in case of 
> incompatible or corrupt state when restoring from checkpoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30935) Add KafkaSerializer deserialize check when using SimpleVersionedSerializer

2023-02-08 Thread Ran Tao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ran Tao updated FLINK-30935:

Description: 
{code:java}
@Override
public int getVersion() {
return CURRENT_VERSION;
}

@Override
public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
String topic = in.readUTF();
int partition = in.readInt();
long offset = in.readLong();
long stoppingOffset = in.readLong();
return new KafkaPartitionSplit(
new TopicPartition(topic, partition), offset, stoppingOffset);
}
} {code}
Current kafka many implemented serializers do not deal with version check. I 
think we can add it like many other connectors's implementation in case of 
incompatible or corrupt state when restoring from checkpoint. 

e.g.
{code:java}
@Override
public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
IOException {
switch (version) {
case 0:
return deserializeV0(serialized);
default:
throw new IOException("Unrecognized version or corrupt state: " + 
version);
}
}

private KafkaPartitionSplit deserializeV0(byte[] serialized) throws IOException 
{
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
String topic = in.readUTF();
int partition = in.readInt();
long offset = in.readLong();
long stoppingOffset = in.readLong();
return new KafkaPartitionSplit(
new TopicPartition(topic, partition), offset, stoppingOffset);
}
} {code}

  was:
{code:java}
@Override
public int getVersion() {
return CURRENT_VERSION;
}

@Override
public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
String topic = in.readUTF();
int partition = in.readInt();
long offset = in.readLong();
long stoppingOffset = in.readLong();
return new KafkaPartitionSplit(
new TopicPartition(topic, partition), offset, stoppingOffset);
}
} {code}
Current kafka many implemented serializers do not deal with version check. I 
think we can add it like many other connectors in case of incompatible or 
corrupt state when restoring from checkpoint..

 

 


> Add KafkaSerializer deserialize check when using SimpleVersionedSerializer
> --
>
> Key: FLINK-30935
> URL: https://issues.apache.org/jira/browse/FLINK-30935
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Ran Tao
>Priority: Major
>
> {code:java}
> @Override
> public int getVersion() {
> return CURRENT_VERSION;
> }
> @Override
> public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
> IOException {
> try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
> DataInputStream in = new DataInputStream(bais)) {
> String topic = in.readUTF();
> int partition = in.readInt();
> long offset = in.readLong();
> long stoppingOffset = in.readLong();
> return new KafkaPartitionSplit(
> new TopicPartition(topic, partition), offset, stoppingOffset);
> }
> } {code}
> Current kafka many implemented serializers do not deal with version check. I 
> think we can add it like many other connectors's implementation in case of 
> incompatible or corrupt state when restoring from checkpoint. 
> e.g.
> {code:java}
> @Override
> public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
> IOException {
> switch (version) {
> case 0:
> return deserializeV0(serialized);
> default:
> throw new IOException("Unrecognized version or corrupt state: " + 
> version);
> }
> }
> private KafkaPartitionSplit deserializeV0(byte[] serialized) throws 
> IOException {
> try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
> DataInputStream in = new DataInputStream(bais)) {
> String topic = in.readUTF();
> int partition = in.readInt();
> long offset = in.readLong();
> long stoppingOffset = in.readLong();
> return new KafkaPartitionSplit(
> new TopicPartition(topic, partition), offset, stoppingOffset);
> }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30935) Add KafkaSerializer deserialize check when using SimpleVersionedSerializer

2023-02-08 Thread Ran Tao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ran Tao updated FLINK-30935:

Description: 
{code:java}
@Override
public int getVersion() {
return CURRENT_VERSION;
}

@Override
public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
String topic = in.readUTF();
int partition = in.readInt();
long offset = in.readLong();
long stoppingOffset = in.readLong();
return new KafkaPartitionSplit(
new TopicPartition(topic, partition), offset, stoppingOffset);
}
} {code}
Current kafka many implemented serializers do not deal with version check. I 
think we can add it like many other connectors in case of incompatible or 
corrupt state when restoring from checkpoint..

 

 

  was:
{code:java}
@Override
public int getVersion() {
return CURRENT_VERSION;
}

@Override
public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
String topic = in.readUTF();
int partition = in.readInt();
long offset = in.readLong();
long stoppingOffset = in.readLong();
return new KafkaPartitionSplit(
new TopicPartition(topic, partition), offset, stoppingOffset);
}
} {code}
Current kafka many implemented serializers do not deal with version check. I 
think we can add it like many other connectors in case of incompatible or 
corrupt state when restoring from checkpoint..


> Add KafkaSerializer deserialize check when using SimpleVersionedSerializer
> --
>
> Key: FLINK-30935
> URL: https://issues.apache.org/jira/browse/FLINK-30935
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Ran Tao
>Priority: Major
>
> {code:java}
> @Override
> public int getVersion() {
> return CURRENT_VERSION;
> }
> @Override
> public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
> IOException {
> try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
> DataInputStream in = new DataInputStream(bais)) {
> String topic = in.readUTF();
> int partition = in.readInt();
> long offset = in.readLong();
> long stoppingOffset = in.readLong();
> return new KafkaPartitionSplit(
> new TopicPartition(topic, partition), offset, stoppingOffset);
> }
> } {code}
> Current kafka many implemented serializers do not deal with version check. I 
> think we can add it like many other connectors in case of incompatible or 
> corrupt state when restoring from checkpoint..
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30935) Add KafkaSerializer deserialize check when using SimpleVersionedSerializer

2023-02-08 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-30935:
---
Affects Version/s: (was: 1.16.1)

> Add KafkaSerializer deserialize check when using SimpleVersionedSerializer
> --
>
> Key: FLINK-30935
> URL: https://issues.apache.org/jira/browse/FLINK-30935
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Ran Tao
>Priority: Major
>
> {code:java}
> @Override
> public int getVersion() {
> return CURRENT_VERSION;
> }
> @Override
> public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
> IOException {
> try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
> DataInputStream in = new DataInputStream(bais)) {
> String topic = in.readUTF();
> int partition = in.readInt();
> long offset = in.readLong();
> long stoppingOffset = in.readLong();
> return new KafkaPartitionSplit(
> new TopicPartition(topic, partition), offset, stoppingOffset);
> }
> } {code}
> Current kafka many implemented serializers do not deal with version check. I 
> think we can add it like many other connectors in case of incompatible or 
> corrupt state when restoring from checkpoint..



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30935) Add KafkaSerializer deserialize check when using SimpleVersionedSerializer

2023-02-07 Thread Ran Tao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ran Tao updated FLINK-30935:

Description: 
{code:java}
@Override
public int getVersion() {
return CURRENT_VERSION;
}

@Override
public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
String topic = in.readUTF();
int partition = in.readInt();
long offset = in.readLong();
long stoppingOffset = in.readLong();
return new KafkaPartitionSplit(
new TopicPartition(topic, partition), offset, stoppingOffset);
}
} {code}
Current kafka many implemented serializers do not deal with version check. I 
think we can add it like many other connectors in case of incompatible or 
corrupt state.

  was:
{code:java}
@Override
public int getVersion() {
return CURRENT_VERSION;
}

@Override
public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
String topic = in.readUTF();
int partition = in.readInt();
long offset = in.readLong();
long stoppingOffset = in.readLong();
return new KafkaPartitionSplit(
new TopicPartition(topic, partition), offset, stoppingOffset);
}
} {code}
Current kafka serializers do not deal with version check. I think we can add it 
like many other connectors in case of incompatible or corrupt state.


> Add KafkaSerializer deserialize check when using SimpleVersionedSerializer
> --
>
> Key: FLINK-30935
> URL: https://issues.apache.org/jira/browse/FLINK-30935
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.16.1
>Reporter: Ran Tao
>Priority: Major
>
> {code:java}
> @Override
> public int getVersion() {
> return CURRENT_VERSION;
> }
> @Override
> public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
> IOException {
> try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
> DataInputStream in = new DataInputStream(bais)) {
> String topic = in.readUTF();
> int partition = in.readInt();
> long offset = in.readLong();
> long stoppingOffset = in.readLong();
> return new KafkaPartitionSplit(
> new TopicPartition(topic, partition), offset, stoppingOffset);
> }
> } {code}
> Current kafka many implemented serializers do not deal with version check. I 
> think we can add it like many other connectors in case of incompatible or 
> corrupt state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30935) Add KafkaSerializer deserialize check when using SimpleVersionedSerializer

2023-02-07 Thread Ran Tao (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ran Tao updated FLINK-30935:

Description: 
{code:java}
@Override
public int getVersion() {
return CURRENT_VERSION;
}

@Override
public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
String topic = in.readUTF();
int partition = in.readInt();
long offset = in.readLong();
long stoppingOffset = in.readLong();
return new KafkaPartitionSplit(
new TopicPartition(topic, partition), offset, stoppingOffset);
}
} {code}
Current kafka many implemented serializers do not deal with version check. I 
think we can add it like many other connectors in case of incompatible or 
corrupt state when restoring from checkpoint..

  was:
{code:java}
@Override
public int getVersion() {
return CURRENT_VERSION;
}

@Override
public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
String topic = in.readUTF();
int partition = in.readInt();
long offset = in.readLong();
long stoppingOffset = in.readLong();
return new KafkaPartitionSplit(
new TopicPartition(topic, partition), offset, stoppingOffset);
}
} {code}
Current kafka many implemented serializers do not deal with version check. I 
think we can add it like many other connectors in case of incompatible or 
corrupt state.


> Add KafkaSerializer deserialize check when using SimpleVersionedSerializer
> --
>
> Key: FLINK-30935
> URL: https://issues.apache.org/jira/browse/FLINK-30935
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.16.1
>Reporter: Ran Tao
>Priority: Major
>
> {code:java}
> @Override
> public int getVersion() {
> return CURRENT_VERSION;
> }
> @Override
> public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws 
> IOException {
> try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
> DataInputStream in = new DataInputStream(bais)) {
> String topic = in.readUTF();
> int partition = in.readInt();
> long offset = in.readLong();
> long stoppingOffset = in.readLong();
> return new KafkaPartitionSplit(
> new TopicPartition(topic, partition), offset, stoppingOffset);
> }
> } {code}
> Current kafka many implemented serializers do not deal with version check. I 
> think we can add it like many other connectors in case of incompatible or 
> corrupt state when restoring from checkpoint..



--
This message was sent by Atlassian Jira
(v8.20.10#820010)