Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
Guosmilesmile commented on PR #11662: URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2533696010 > Thanks @Guosmilesmile for the PR, and @mxm and @stevenzwu for the review! > > @Guosmilesmile please prepare the backport commits to the other Flink versions. > > Thanks, Peter Thank you all for your reviews @mxm @stevenzwu @pvary . @pvary The related backport has been submitted, see #11745. Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
pvary commented on PR #11662: URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2532940822 Thanks @Guosmilesmile for the PR, and @mxm and @stevenzwu for the review! @Guosmilesmile please prepare the backport commits to the other Flink versions. Thanks, Peter -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
pvary merged PR #11662: URL: https://github.com/apache/iceberg/pull/11662 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
pvary commented on PR #11662: URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2530496119 @stevenzwu: any last minute comments? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
Guosmilesmile commented on code in PR #11662: URL: https://github.com/apache/iceberg/pull/11662#discussion_r1876608916 ## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java: ## @@ -52,8 +52,32 @@ class SortKeySerializer extends TypeSerializer { private final int size; private final Types.NestedField[] transformedFields; + private int version = SortKeySerializerSnapshot.CURRENT_VERSION; + private transient SortKey sortKey; + SortKeySerializer(Schema schema, SortOrder sortOrder, int version) { +this.version = version; +this.schema = schema; +this.sortOrder = sortOrder; +this.size = sortOrder.fields().size(); + +this.transformedFields = new Types.NestedField[size]; +for (int i = 0; i < size; ++i) { + SortField sortField = sortOrder.fields().get(i); + Types.NestedField sourceField = schema.findField(sortField.sourceId()); + Type resultType = sortField.transform().getResultType(sourceField.type()); + Types.NestedField transformedField = + Types.NestedField.of( + sourceField.fieldId(), + sourceField.isOptional(), + sourceField.name(), + resultType, + sourceField.doc()); + transformedFields[i] = transformedField; +} Review Comment: @pvary Yes, it's feasible. I've made the adjustments. Could you please take another look to see if there are any other issues? Thanks ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
pvary commented on code in PR #11662: URL: https://github.com/apache/iceberg/pull/11662#discussion_r1876362946 ## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java: ## @@ -52,8 +52,32 @@ class SortKeySerializer extends TypeSerializer { private final int size; private final Types.NestedField[] transformedFields; + private int version = SortKeySerializerSnapshot.CURRENT_VERSION; + private transient SortKey sortKey; + SortKeySerializer(Schema schema, SortOrder sortOrder, int version) { +this.version = version; +this.schema = schema; +this.sortOrder = sortOrder; +this.size = sortOrder.fields().size(); + +this.transformedFields = new Types.NestedField[size]; +for (int i = 0; i < size; ++i) { + SortField sortField = sortOrder.fields().get(i); + Types.NestedField sourceField = schema.findField(sortField.sourceId()); + Type resultType = sortField.transform().getResultType(sourceField.type()); + Types.NestedField transformedField = + Types.NestedField.of( + sourceField.fieldId(), + sourceField.isOptional(), + sourceField.name(), + resultType, + sourceField.doc()); + transformedFields[i] = transformedField; +} Review Comment: Could this work? ``` SortKeySerializer(Schema schema, SortOrder sortOrder) { this(schema, sortOrder, SortKeySerializerSnapshot.CURRENT_VERSION); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
Guosmilesmile commented on code in PR #11662: URL: https://github.com/apache/iceberg/pull/11662#discussion_r1876059320 ## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java: ## @@ -52,8 +52,32 @@ class SortKeySerializer extends TypeSerializer { private final int size; private final Types.NestedField[] transformedFields; + private int version = SortKeySerializerSnapshot.CURRENT_VERSION; + private transient SortKey sortKey; + SortKeySerializer(Schema schema, SortOrder sortOrder, int version) { +this.version = version; +this.schema = schema; +this.sortOrder = sortOrder; +this.size = sortOrder.fields().size(); + +this.transformedFields = new Types.NestedField[size]; +for (int i = 0; i < size; ++i) { + SortField sortField = sortOrder.fields().get(i); + Types.NestedField sourceField = schema.findField(sortField.sourceId()); + Type resultType = sortField.transform().getResultType(sourceField.type()); + Types.NestedField transformedField = + Types.NestedField.of( + sourceField.fieldId(), + sourceField.isOptional(), + sourceField.name(), + resultType, + sourceField.doc()); + transformedFields[i] = transformedField; +} Review Comment: @pvary Thank you very much for your suggestion. Since all member variables are final, they can only be initialized through the constructor. Most of the initialization for SortKeySerializer uses the latest version, and only when restoring the serializer do we need to pass in the version number during initialization. Since the variables are final, I cannot extract a common method. Therefore, I added a new constructor. Another option is to remove the newly added constructor and use a setVersion method for injection. Because the version number is only needed for initialization in one place. Which approach do you think is more appropriate? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
Guosmilesmile commented on code in PR #11662: URL: https://github.com/apache/iceberg/pull/11662#discussion_r1876059320 ## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java: ## @@ -52,8 +52,32 @@ class SortKeySerializer extends TypeSerializer { private final int size; private final Types.NestedField[] transformedFields; + private int version = SortKeySerializerSnapshot.CURRENT_VERSION; + private transient SortKey sortKey; + SortKeySerializer(Schema schema, SortOrder sortOrder, int version) { +this.version = version; +this.schema = schema; +this.sortOrder = sortOrder; +this.size = sortOrder.fields().size(); + +this.transformedFields = new Types.NestedField[size]; +for (int i = 0; i < size; ++i) { + SortField sortField = sortOrder.fields().get(i); + Types.NestedField sourceField = schema.findField(sortField.sourceId()); + Type resultType = sortField.transform().getResultType(sourceField.type()); + Types.NestedField transformedField = + Types.NestedField.of( + sourceField.fieldId(), + sourceField.isOptional(), + sourceField.name(), + resultType, + sourceField.doc()); + transformedFields[i] = transformedField; +} Review Comment: @pvary Thank you very much for your suggestion. Since all member variables are final, they can only be initialized through the constructor. Most of the initialization for SortKeySerializer uses the latest version, and only when restoring the serializer do we need to pass in the version number during initialization. Therefore, I added a new constructor. Another option is to remove the newly added constructor and use a setVersion method for injection. Because the version number is only needed for initialization in one place. Which approach do you think is more appropriate? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
pvary commented on code in PR #11662: URL: https://github.com/apache/iceberg/pull/11662#discussion_r1876033356 ## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java: ## @@ -52,8 +52,32 @@ class SortKeySerializer extends TypeSerializer { private final int size; private final Types.NestedField[] transformedFields; + private int version = SortKeySerializerSnapshot.CURRENT_VERSION; + private transient SortKey sortKey; + SortKeySerializer(Schema schema, SortOrder sortOrder, int version) { +this.version = version; +this.schema = schema; +this.sortOrder = sortOrder; +this.size = sortOrder.fields().size(); + +this.transformedFields = new Types.NestedField[size]; +for (int i = 0; i < size; ++i) { + SortField sortField = sortOrder.fields().get(i); + Types.NestedField sourceField = schema.findField(sortField.sourceId()); + Type resultType = sortField.transform().getResultType(sourceField.type()); + Types.NestedField transformedField = + Types.NestedField.of( + sourceField.fieldId(), + sourceField.isOptional(), + sourceField.name(), + resultType, + sourceField.doc()); + transformedFields[i] = transformedField; +} Review Comment: This code part seems to be a duplicate for me? What is the reason for not using the same underlying method for the 2 constructors? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
Guosmilesmile commented on PR #11662: URL: https://github.com/apache/iceberg/pull/11662#issuecomment-252050 > I think this looks good, but I'll defer to @stevenzwu and @pvary for the final approval. @mxm Thank you very much for taking the time to review my code. The relevant comments have been added. @pvary @stevenzwu I would appreciate it if both of you could take a look for the final approval. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
mxm commented on code in PR #11662: URL: https://github.com/apache/iceberg/pull/11662#discussion_r1871472107 ## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsUtil.java: ## @@ -73,12 +73,25 @@ static byte[] serializeCompletedStatistics( } static CompletedStatistics deserializeCompletedStatistics( - byte[] bytes, TypeSerializer statisticsSerializer) { + byte[] bytes, CompletedStatisticsSerializer statisticsSerializer) { try { DataInputDeserializer input = new DataInputDeserializer(bytes); - return statisticsSerializer.deserialize(input); -} catch (IOException e) { - throw new UncheckedIOException("Fail to deserialize aggregated statistics", e); + CompletedStatistics completedStatistics = statisticsSerializer.deserialize(input); + if (!completedStatistics.isValid()) { +throw new RuntimeException("Fail to deserialize aggregated statistics,change to v1"); + } + + return completedStatistics; +} catch (Exception e) { + try { +statisticsSerializer.changeSortKeySerializerVersion(1); +DataInputDeserializer input = new DataInputDeserializer(bytes); +CompletedStatistics deserialize = statisticsSerializer.deserialize(input); +statisticsSerializer.changeSortKeySerializerVersionLatest(); +return deserialize; + } catch (IOException ioException) { +throw new UncheckedIOException("Fail to deserialize aggregated statistics", ioException); + } Review Comment: We could add a comment here, explaining why switching versions was necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
Guosmilesmile commented on code in PR #11662: URL: https://github.com/apache/iceberg/pull/11662#discussion_r1870045831 ## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsUtil.java: ## @@ -73,12 +73,24 @@ static byte[] serializeCompletedStatistics( } static CompletedStatistics deserializeCompletedStatistics( - byte[] bytes, TypeSerializer statisticsSerializer) { + byte[] bytes, CompletedStatisticsSerializer statisticsSerializer) { try { DataInputDeserializer input = new DataInputDeserializer(bytes); - return statisticsSerializer.deserialize(input); -} catch (IOException e) { - throw new UncheckedIOException("Fail to deserialize aggregated statistics", e); + CompletedStatistics completedStatistics = statisticsSerializer.deserialize(input); + if (!completedStatistics.isValid()) { +throw new RuntimeException("Fail to deserialize aggregated statistics,change to v1"); + } + return completedStatistics; Review Comment: @pvary Thank you very much for your suggestions. I have made the corresponding changes based on your advice -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
pvary commented on code in PR #11662: URL: https://github.com/apache/iceberg/pull/11662#discussion_r1869958746 ## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsUtil.java: ## @@ -73,12 +73,24 @@ static byte[] serializeCompletedStatistics( } static CompletedStatistics deserializeCompletedStatistics( - byte[] bytes, TypeSerializer statisticsSerializer) { + byte[] bytes, CompletedStatisticsSerializer statisticsSerializer) { try { DataInputDeserializer input = new DataInputDeserializer(bytes); - return statisticsSerializer.deserialize(input); -} catch (IOException e) { - throw new UncheckedIOException("Fail to deserialize aggregated statistics", e); + CompletedStatistics completedStatistics = statisticsSerializer.deserialize(input); + if (!completedStatistics.isValid()) { +throw new RuntimeException("Fail to deserialize aggregated statistics,change to v1"); + } + return completedStatistics; Review Comment: nit: newline before this line -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
Guosmilesmile commented on PR #11662: URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2517084700 @mxm @pvary @stevenzwu Thank you all very much for your guidance. I would appreciate it if you could take another look and let me know what needs to be done next to keep things moving forward. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
Guosmilesmile commented on PR #11662: URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2511169797 @mxm Thank you very much for your suggestions. I have made the necessary modifications, and I appreciate you taking the time out of your busy schedule to review it again. I am very grateful for your guidance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
Guosmilesmile commented on PR #11662: URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2511130152 @mxm Thank you very much for your suggestions. I have made the necessary modifications, and I appreciate you taking the time out of your busy schedule to review it again. I am very grateful for your guidance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
mxm commented on code in PR #11662: URL: https://github.com/apache/iceberg/pull/11662#discussion_r1865523100 ## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java: ## @@ -310,10 +366,16 @@ public void writeSnapshot(DataOutputView out) throws IOException { @Override public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException { - if (readVersion == 1) { -readV1(in); - } else { -throw new IllegalArgumentException("Unknown read version: " + readVersion); + switch (readVersion) { +case 1: + readV1(in); + this.version = 1; + break; +case 2: + readV1(in); Review Comment: Can we remove the version suffix in the method name? It handles both versions. ## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/CompletedStatisticsSerializer.java: ## @@ -93,15 +126,35 @@ public void serialize(CompletedStatistics record, DataOutputView target) throws @Override public CompletedStatistics deserialize(DataInputView source) throws IOException { +long checkpointId = source.readLong(); +changeSortKeySerializerVersion(source.readInt()); Review Comment: AFAIK We were not writing the serializer version before this change. So not sure this works when restoring the serializer to read old data. ## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsUtil.java: ## @@ -73,12 +73,17 @@ static byte[] serializeCompletedStatistics( } static CompletedStatistics deserializeCompletedStatistics( - byte[] bytes, TypeSerializer statisticsSerializer) { + byte[] bytes, CompletedStatisticsSerializer statisticsSerializer) { try { DataInputDeserializer input = new DataInputDeserializer(bytes); return statisticsSerializer.deserialize(input); -} catch (IOException e) { - throw new UncheckedIOException("Fail to deserialize aggregated statistics", e); +} catch (Exception e) { + try { +DataInputDeserializer input = new DataInputDeserializer(bytes); +return statisticsSerializer.deserializeV1(input); + } catch (IOException ioException) { +throw new UncheckedIOException("Fail to deserialize aggregated statistics", ioException); + } Review Comment: I see, we're retrying here in case the restore fails. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
mxm commented on PR #11662: URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2511041771 Thanks for the update @Guosmilesmile! Unfortunately, we don't have a way to encode the serializer version for all serializers, so a best-effort approach to retry with a different serializer version on restore failure may be the best we can do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
Guosmilesmile commented on PR #11662: URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2507769750 Hi @mxm! Thank you very much for your suggestions. Following your advice, I added a version number to SortKeySerializer through restoreSerializer, implementing different serialization methods for v1 and v2. Based on the performance in the test job, the TaskManager chieve state compatibility. However, the JobManager fails to recover due to the DataStatisticsCoordinator, which leads to a failure to start because the SortKeySerializer used in CompletedStatisticsSerializer and GlobalStatisticsSerializer is instantiated directly. This results in a failure when trying to parse the v1 state with the v2 version. Additionally, these two serializers do not change after the Coordinator is initialized. Given this scenario, do you have any good suggestions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
mxm commented on code in PR #11662: URL: https://github.com/apache/iceberg/pull/11662#discussion_r1863289530 ## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java: ## @@ -310,10 +324,16 @@ public void writeSnapshot(DataOutputView out) throws IOException { @Override public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException { - if (readVersion == 1) { -readV1(in); - } else { -throw new IllegalArgumentException("Unknown read version: " + readVersion); + switch (readVersion) { +case 1: + throw new UnsupportedOperationException( + String.format( + "No longer supported version [%s]. Please upgrade first . ", readVersion)); +case 2: + readV2(in); + break; Review Comment: Reading the snapshot isn't any different for V1/V2. The `readV1()` or `readV2()` methods are identical. We can just keep the current one. However, we need to save the serializer version as an `int` field which we can read in `restoreSerializer()` to pass it into the serializer, just like the schema and the sort order. We can then serialize/deserialize depending on the version used. For example, in the serializer we would do: ``` if (version == 1) { // Don't read/write null indicator } else { // Read/write null indicator } -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
mxm commented on code in PR #11662: URL: https://github.com/apache/iceberg/pull/11662#discussion_r1863289530 ## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java: ## @@ -310,10 +324,16 @@ public void writeSnapshot(DataOutputView out) throws IOException { @Override public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException { - if (readVersion == 1) { -readV1(in); - } else { -throw new IllegalArgumentException("Unknown read version: " + readVersion); + switch (readVersion) { +case 1: + throw new UnsupportedOperationException( + String.format( + "No longer supported version [%s]. Please upgrade first . ", readVersion)); +case 2: + readV2(in); + break; Review Comment: Reading the snapshot isn't any different for V1/V2. The `readV1()` or `readV2()` methods are identical. We can just keep the current one. However, we need to save the serializer version as an `int` field which we can read in `restoreSerializer()`, just like the schema and the sort order. We can then serialize/deserialize depending on the version used. For example, in the serializer we would do: ``` if (version == 1) { // Don't read/write null indicator } else { // Read/write null indicator } ## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java: ## @@ -324,6 +344,10 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( return TypeSerializerSchemaCompatibility.incompatible(); } + if (oldSerializerSnapshot.getCurrentVersion() != this.getCurrentVersion()) { +return TypeSerializerSchemaCompatibility.incompatible(); + } Review Comment: ```suggestion if (oldSerializerSnapshot.getCurrentVersion() == 1 && this.getCurrentVersion() == 2) { return TypeSerializerSchemaCompatibility.compatibleAfterMigration(); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
Guosmilesmile commented on PR #11662: URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2507255659 @mxm I'm sorry, I'm a bit confused and would like to ask for your advice. Which part do I need to modify? Based on the current modifications, simply change TypeSerializerSchemaCompatibility.incompatible() to compatibleAfterMigration(). Create a new NullAbleSortKeySerializer, keep the SortKeySerializer, replace all calls to SortKeySerializer, and in resolveSchemaCompatibility, return compatibleAfterMigration() after checking the type of oldSerializerSnapshot. Which option should I choose? Please provide detailed advice. Thank you very much. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
mxm commented on PR #11662: URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2507208103 @Guosmilesmile Ideally, we want to return compatibleAfterMigration() and make sure the old and new serializer can be instantiated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
Guosmilesmile commented on PR #11662: URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2506978587 @mxm @stevenzwu @pvary Thank you all for your suggestions. I have submitted a version that mainly modifies the SortKeySerializerSnapshot and implements version detection. If the version does not match, it will return the corresponding message to avoid parsing exceptions and data errors when restoring from an old state to a new state. I would appreciate it if you could take a look and see if this modification is feasible -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
Guosmilesmile commented on PR #11662: URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2505917957 @mxm Thank you very much for your suggestions. I need to add a version check in SortKeySerializerSnapshot. If the state is restored from an old version, I will directly return TypeSerializerSchemaCompatibility.incompatible(). Is this approach feasible? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
mxm commented on code in PR #11662: URL: https://github.com/apache/iceberg/pull/11662#discussion_r1861944660 ## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java: ## @@ -124,6 +124,14 @@ public void serialize(SortKey record, DataOutputView target) throws IOException for (int i = 0; i < size; ++i) { int fieldId = transformedFields[i].fieldId(); Type.TypeID typeId = transformedFields[i].type().typeId(); + Object value = record.get(i, Object.class); Review Comment: Nulls are not allowed everywhere in Flink. Primitive types generally don't support null. In other places, Flink uses boolean flags to encode nulls, e.g. in PojoSerializer: https://github.com/apache/flink/blob/b9c92371dba07b6bfe4368d7b6d7f7c575b4c603/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L392 KeyBy operations are not allowed on null values. +1 to Steven's suggestion to version the serializer. We already have [SortKeySerializerSnapshot](https://github.com/apache/iceberg/blob/163e2068f96f139632488f36928bf443c9be326f/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java#L278). We just need to add a version check for the new boolean serializer because this is an incompatible change which requires serializer migration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
Guosmilesmile commented on PR #11662: URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2505426989 @pvary Because a new flag has been added, the testSerializationSize test case needs to be modified to reflect the corresponding size value + 1byte for the flag . It has been adjusted. I have run the tests locally myself, and all unit tests have passedand I would appreciate it if you could start the tests again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
pvary commented on code in PR #11662: URL: https://github.com/apache/iceberg/pull/11662#discussion_r1861570865 ## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java: ## @@ -124,6 +124,14 @@ public void serialize(SortKey record, DataOutputView target) throws IOException for (int i = 0; i < size; ++i) { int fieldId = transformedFields[i].fieldId(); Type.TypeID typeId = transformedFields[i].type().typeId(); + Object value = record.get(i, Object.class); Review Comment: @stevenzwu: What do you think about a serializer version, something like the SimpleVersionedSerializer uses. Maybe don't store the version for every key, but inherit the version from the statistics object? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
Guosmilesmile commented on code in PR #11662: URL: https://github.com/apache/iceberg/pull/11662#discussion_r1861359801 ## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java: ## @@ -124,6 +124,14 @@ public void serialize(SortKey record, DataOutputView target) throws IOException for (int i = 0; i < size; ++i) { int fieldId = transformedFields[i].fieldId(); Type.TypeID typeId = transformedFields[i].type().typeId(); + Object value = record.get(i, Object.class); Review Comment: @stevenzwu Can we accept this method to fix the bug for now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
Guosmilesmile commented on code in PR #11662: URL: https://github.com/apache/iceberg/pull/11662#discussion_r1861436171 ## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java: ## @@ -124,6 +124,14 @@ public void serialize(SortKey record, DataOutputView target) throws IOException for (int i = 0; i < size; ++i) { int fieldId = transformedFields[i].fieldId(); Type.TypeID typeId = transformedFields[i].type().typeId(); + Object value = record.get(i, Object.class); Review Comment: @stevenzwu I have looked at the current implementation in Flink, and the serialization for null values references the implementation of NullableSerializer. The underlying approach is also to add a boolean flag at the beginning of the field to distinguish, which is quite similar to the approach in this PR. Can we use the current method to fix this issue for now, or are there any other better handling methods? ```java NullableSerializer extends TypeSerializer @Override public void serialize(T record, DataOutputView target) throws IOException { if (record == null) { target.writeBoolean(true); target.write(padding); } else { target.writeBoolean(false); originalSerializer.serialize(record, target); } } @Override public T deserialize(DataInputView source) throws IOException { boolean isNull = deserializeNull(source); return isNull ? null : originalSerializer.deserialize(source); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
Guosmilesmile commented on code in PR #11662: URL: https://github.com/apache/iceberg/pull/11662#discussion_r1861359801 ## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java: ## @@ -124,6 +124,14 @@ public void serialize(SortKey record, DataOutputView target) throws IOException for (int i = 0; i < size; ++i) { int fieldId = transformedFields[i].fieldId(); Type.TypeID typeId = transformedFields[i].type().typeId(); + Object value = record.get(i, Object.class); Review Comment: @stevenzwu Can we accept this method to fix the bug for now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
Guosmilesmile commented on code in PR #11662: URL: https://github.com/apache/iceberg/pull/11662#discussion_r1861359801 ## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java: ## @@ -124,6 +124,14 @@ public void serialize(SortKey record, DataOutputView target) throws IOException for (int i = 0; i < size; ++i) { int fieldId = transformedFields[i].fieldId(); Type.TypeID typeId = transformedFields[i].type().typeId(); + Object value = record.get(i, Object.class); Review Comment: Can we accept this method to fix the bug for now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
stevenzwu commented on code in PR #11662: URL: https://github.com/apache/iceberg/pull/11662#discussion_r1861329534 ## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java: ## @@ -124,6 +124,14 @@ public void serialize(SortKey record, DataOutputView target) throws IOException for (int i = 0; i < size; ++i) { int fieldId = transformedFields[i].fieldId(); Type.TypeID typeId = transformedFields[i].type().typeId(); + Object value = record.get(i, Object.class); Review Comment: this will add a boolean flag overhead for every field. but I can't think of any better alternative. so this is fine to me. However, ideally we need to figure out how to evolve it in a compatible way. probably leverage `TypeSerializerSnapshot` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
Guosmilesmile commented on PR #11662: URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2504787892 @pvary I'm sorry, I just re-uploaded the missing parts. Please help me trigger the test again. I apologize for the inconvenience. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
Guosmilesmile commented on code in PR #11662: URL: https://github.com/apache/iceberg/pull/11662#discussion_r1860984047 ## flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsCoordinator.java: ## @@ -152,6 +152,80 @@ public void testDataStatisticsEventHandling(StatisticsType type) throws Exceptio } } + @ParameterizedTest + @EnumSource(StatisticsType.class) + public void testDataStatisticsEventHandlingWithNullValue(StatisticsType type) throws Exception { +try (DataStatisticsCoordinator dataStatisticsCoordinator = createCoordinator(type)) { + dataStatisticsCoordinator.start(); + tasksReady(dataStatisticsCoordinator); + + SortKey NullSortKey = Fixtures.SORT_KEY.copy(); Review Comment: @pvary I'm sorry, there was a little mistake just now. Please try again. Thank you very much. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
Guosmilesmile commented on code in PR #11662: URL: https://github.com/apache/iceberg/pull/11662#discussion_r1860974486 ## flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsCoordinator.java: ## @@ -152,6 +152,80 @@ public void testDataStatisticsEventHandling(StatisticsType type) throws Exceptio } } + @ParameterizedTest + @EnumSource(StatisticsType.class) + public void testDataStatisticsEventHandlingWithNullValue(StatisticsType type) throws Exception { +try (DataStatisticsCoordinator dataStatisticsCoordinator = createCoordinator(type)) { + dataStatisticsCoordinator.start(); + tasksReady(dataStatisticsCoordinator); + + SortKey NullSortKey = Fixtures.SORT_KEY.copy(); Review Comment: @pvary I have change it , please check it, thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
pvary commented on code in PR #11662: URL: https://github.com/apache/iceberg/pull/11662#discussion_r1860967334 ## flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsCoordinator.java: ## @@ -152,6 +152,80 @@ public void testDataStatisticsEventHandling(StatisticsType type) throws Exceptio } } + @ParameterizedTest + @EnumSource(StatisticsType.class) + public void testDataStatisticsEventHandlingWithNullValue(StatisticsType type) throws Exception { +try (DataStatisticsCoordinator dataStatisticsCoordinator = createCoordinator(type)) { + dataStatisticsCoordinator.start(); + tasksReady(dataStatisticsCoordinator); + + SortKey NullSortKey = Fixtures.SORT_KEY.copy(); Review Comment: nit: The variable name should start with small letter ## flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsCoordinator.java: ## @@ -152,6 +152,80 @@ public void testDataStatisticsEventHandling(StatisticsType type) throws Exceptio } } + @ParameterizedTest + @EnumSource(StatisticsType.class) + public void testDataStatisticsEventHandlingWithNullValue(StatisticsType type) throws Exception { +try (DataStatisticsCoordinator dataStatisticsCoordinator = createCoordinator(type)) { + dataStatisticsCoordinator.start(); + tasksReady(dataStatisticsCoordinator); + + SortKey NullSortKey = Fixtures.SORT_KEY.copy(); Review Comment: nit: The variable name should start with lower case letter -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
Guosmilesmile commented on PR #11662: URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2504108856 @pvary I have also added a unit test for the scenario of building from the table to the sink, which includes data containing null values. Could you please review it as well? Thank you very much! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
Guosmilesmile commented on PR #11662: URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2503954677 @pvary I have added test cases for DataStatisticsOperator and DataStatisticsCoordinator, specifically for the null scenario. I verified the ProcessElement and EventHand scenarios, simulating the process of data being processed by the operation and receiving statistical information that contains null values for handling. Could you please take a look and see if this is feasible? Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
Guosmilesmile commented on PR #11662: URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2503806486 @pvary Thank you very much for your guidance. I have made the changes according to your suggestions. Please take a look and see if they are feasible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
pvary commented on PR #11662: URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2503849860 Thanks for the changes @Guosmilesmile! One question remains from my side: > It would be nice to have an end2end test for null values too. > Currently we only tests that the statistics are collected correctly, but there might be some issues when applying the stats. It would be nice to have a test for this case too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
pvary commented on PR #11662: URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2503747694 @Guosmilesmile: Thanks for the changes. Left some comments, but started the tests to see if this change cause any other test failures. Please remove the 1.19, 1.18 changes for now - we usually cherry pick the changes to the other versions after the original PR for the main versions has been merged. This is better for the reviewer (smaller number of files), and better for the contributor (if there is a change request during the review, they don't have to keep the different versions in sync) It would be nice to have an end2end test for null values too. Currently we only tests that the statistics are collected correctly, but there might be some issues when applying the stats. It would be nice to have a test for this case too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
pvary commented on code in PR #11662: URL: https://github.com/apache/iceberg/pull/11662#discussion_r1860565686 ## flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java: ## @@ -136,6 +137,27 @@ public void testProcessElement(StatisticsType type) throws Exception { } } + @ParameterizedTest + @EnumSource(StatisticsType.class) + public void testProcessElementWithNull(StatisticsType type) throws Exception { + DataStatisticsOperator operator = createOperator(type, Fixtures.NUM_SUBTASKS); + try (OneInputStreamOperatorTestHarness testHarness = + createHarness(operator)) { + StateInitializationContext stateContext = getStateContext(); + operator.initializeState(stateContext); + operator.processElement(new StreamRecord<>(GenericRowData.of(null, 5))); + operator.processElement(new StreamRecord<>(GenericRowData.of(StringData.fromString("a"), 3))); + + DataStatistics localStatistics = operator.localStatistics(); + SortKeySerializer sortKeySerializer = new SortKeySerializer(Fixtures.SCHEMA, Fixtures.SORT_ORDER); + DataStatisticsSerializer taskStatisticsSerializer = new DataStatisticsSerializer(sortKeySerializer); + DataOutputSerializer outputView = new DataOutputSerializer(1024); + + taskStatisticsSerializer.serialize(localStatistics, outputView); + testHarness.endInput(); Review Comment: We should check if the resulting statistics is correct -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
pvary commented on code in PR #11662: URL: https://github.com/apache/iceberg/pull/11662#discussion_r1860563456 ## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java: ## @@ -124,6 +124,13 @@ public void serialize(SortKey record, DataOutputView target) throws IOException for (int i = 0; i < size; ++i) { int fieldId = transformedFields[i].fieldId(); Type.TypeID typeId = transformedFields[i].type().typeId(); + Object value = record.get(i, Object.class); + if (value == null) { +target.writeBoolean(true); +continue; + } else { +target.writeBoolean(false); + } Review Comment: nit: newline after closing a block and starting another statement ## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java: ## @@ -192,6 +199,11 @@ public SortKey deserialize(SortKey reuse, DataInputView source) throws IOExcepti reuse.size(), size); for (int i = 0; i < size; ++i) { + boolean isNull = source.readBoolean(); + if (isNull) { +reuse.set(i, null); +continue; + } Review Comment: nit: newline after closing a block and starting another statement -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
Guosmilesmile commented on PR #11662: URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2503066336 @pvary Yes, you are right. We changed our approach to handle null values instead of filtering them out. Before serialization, we added a flag for each field, where true indicates that the field is null. During serialization and deserialization, we read the flag first, which allows us to correctly identify null. The related code has been updated; please take a look to see if it is feasible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
Guosmilesmile commented on PR #11662: URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2502780003 @ConeyLiu yes,I have added UT to cover the relevant changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]
ConeyLiu commented on PR #11662: URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2502609743 Thanks for the contributions. Should we better handle the null value instead of skipping it? Could you also add the UT to cover these changes? cc @stevenzwu @pvary -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org