Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-12-10 Thread via GitHub


Guosmilesmile commented on PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2533696010

   > Thanks @Guosmilesmile for the PR, and @mxm and @stevenzwu for the review!
   > 
   > @Guosmilesmile please prepare the backport commits to the other Flink 
versions.
   > 
   > Thanks, Peter
   
   Thank you all for your reviews @mxm @stevenzwu @pvary . 
   
   @pvary The related backport has been submitted, see #11745. Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-12-10 Thread via GitHub


pvary commented on PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2532940822

   Thanks @Guosmilesmile for the PR, and @mxm and @stevenzwu for the review!
   
   @Guosmilesmile please prepare the backport commits to the other Flink 
versions.
   
   Thanks, Peter 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-12-10 Thread via GitHub


pvary merged PR #11662:
URL: https://github.com/apache/iceberg/pull/11662


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-12-09 Thread via GitHub


pvary commented on PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2530496119

   @stevenzwu: any last minute comments?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-12-09 Thread via GitHub


Guosmilesmile commented on code in PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#discussion_r1876608916


##
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java:
##
@@ -52,8 +52,32 @@ class SortKeySerializer extends TypeSerializer {
   private final int size;
   private final Types.NestedField[] transformedFields;
 
+  private int version = SortKeySerializerSnapshot.CURRENT_VERSION;
+
   private transient SortKey sortKey;
 
+  SortKeySerializer(Schema schema, SortOrder sortOrder, int version) {
+this.version = version;
+this.schema = schema;
+this.sortOrder = sortOrder;
+this.size = sortOrder.fields().size();
+
+this.transformedFields = new Types.NestedField[size];
+for (int i = 0; i < size; ++i) {
+  SortField sortField = sortOrder.fields().get(i);
+  Types.NestedField sourceField = schema.findField(sortField.sourceId());
+  Type resultType = 
sortField.transform().getResultType(sourceField.type());
+  Types.NestedField transformedField =
+  Types.NestedField.of(
+  sourceField.fieldId(),
+  sourceField.isOptional(),
+  sourceField.name(),
+  resultType,
+  sourceField.doc());
+  transformedFields[i] = transformedField;
+}

Review Comment:
   @pvary Yes, it's feasible. I've made the adjustments. Could you please take 
another look to see if there are any other issues? Thanks !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-12-09 Thread via GitHub


pvary commented on code in PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#discussion_r1876362946


##
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java:
##
@@ -52,8 +52,32 @@ class SortKeySerializer extends TypeSerializer {
   private final int size;
   private final Types.NestedField[] transformedFields;
 
+  private int version = SortKeySerializerSnapshot.CURRENT_VERSION;
+
   private transient SortKey sortKey;
 
+  SortKeySerializer(Schema schema, SortOrder sortOrder, int version) {
+this.version = version;
+this.schema = schema;
+this.sortOrder = sortOrder;
+this.size = sortOrder.fields().size();
+
+this.transformedFields = new Types.NestedField[size];
+for (int i = 0; i < size; ++i) {
+  SortField sortField = sortOrder.fields().get(i);
+  Types.NestedField sourceField = schema.findField(sortField.sourceId());
+  Type resultType = 
sortField.transform().getResultType(sourceField.type());
+  Types.NestedField transformedField =
+  Types.NestedField.of(
+  sourceField.fieldId(),
+  sourceField.isOptional(),
+  sourceField.name(),
+  resultType,
+  sourceField.doc());
+  transformedFields[i] = transformedField;
+}

Review Comment:
   Could this work?
   ```
 SortKeySerializer(Schema schema, SortOrder sortOrder) {
   this(schema, sortOrder, SortKeySerializerSnapshot.CURRENT_VERSION);
 }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-12-09 Thread via GitHub


Guosmilesmile commented on code in PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#discussion_r1876059320


##
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java:
##
@@ -52,8 +52,32 @@ class SortKeySerializer extends TypeSerializer {
   private final int size;
   private final Types.NestedField[] transformedFields;
 
+  private int version = SortKeySerializerSnapshot.CURRENT_VERSION;
+
   private transient SortKey sortKey;
 
+  SortKeySerializer(Schema schema, SortOrder sortOrder, int version) {
+this.version = version;
+this.schema = schema;
+this.sortOrder = sortOrder;
+this.size = sortOrder.fields().size();
+
+this.transformedFields = new Types.NestedField[size];
+for (int i = 0; i < size; ++i) {
+  SortField sortField = sortOrder.fields().get(i);
+  Types.NestedField sourceField = schema.findField(sortField.sourceId());
+  Type resultType = 
sortField.transform().getResultType(sourceField.type());
+  Types.NestedField transformedField =
+  Types.NestedField.of(
+  sourceField.fieldId(),
+  sourceField.isOptional(),
+  sourceField.name(),
+  resultType,
+  sourceField.doc());
+  transformedFields[i] = transformedField;
+}

Review Comment:
   @pvary Thank you very much for your suggestion. Since all member variables 
are final, they can only be initialized through the constructor. Most of the 
initialization for SortKeySerializer uses the latest version, and only when 
restoring the serializer do we need to pass in the version number during 
initialization. Since the variables are final, I cannot extract a common 
method. Therefore, I added a new constructor. 
   
   Another option is to remove the newly added constructor and use a setVersion 
method for injection. Because the version number is only needed for 
initialization in one place.
   
   Which approach do you think is more appropriate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-12-09 Thread via GitHub


Guosmilesmile commented on code in PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#discussion_r1876059320


##
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java:
##
@@ -52,8 +52,32 @@ class SortKeySerializer extends TypeSerializer {
   private final int size;
   private final Types.NestedField[] transformedFields;
 
+  private int version = SortKeySerializerSnapshot.CURRENT_VERSION;
+
   private transient SortKey sortKey;
 
+  SortKeySerializer(Schema schema, SortOrder sortOrder, int version) {
+this.version = version;
+this.schema = schema;
+this.sortOrder = sortOrder;
+this.size = sortOrder.fields().size();
+
+this.transformedFields = new Types.NestedField[size];
+for (int i = 0; i < size; ++i) {
+  SortField sortField = sortOrder.fields().get(i);
+  Types.NestedField sourceField = schema.findField(sortField.sourceId());
+  Type resultType = 
sortField.transform().getResultType(sourceField.type());
+  Types.NestedField transformedField =
+  Types.NestedField.of(
+  sourceField.fieldId(),
+  sourceField.isOptional(),
+  sourceField.name(),
+  resultType,
+  sourceField.doc());
+  transformedFields[i] = transformedField;
+}

Review Comment:
   @pvary Thank you very much for your suggestion. Since all member variables 
are final, they can only be initialized through the constructor. Most of the 
initialization for SortKeySerializer uses the latest version, and only when 
restoring the serializer do we need to pass in the version number during 
initialization. Therefore, I added a new constructor. 
   
   Another option is to remove the newly added constructor and use a setVersion 
method for injection. Because the version number is only needed for 
initialization in one place.
   
   Which approach do you think is more appropriate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-12-09 Thread via GitHub


pvary commented on code in PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#discussion_r1876033356


##
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java:
##
@@ -52,8 +52,32 @@ class SortKeySerializer extends TypeSerializer {
   private final int size;
   private final Types.NestedField[] transformedFields;
 
+  private int version = SortKeySerializerSnapshot.CURRENT_VERSION;
+
   private transient SortKey sortKey;
 
+  SortKeySerializer(Schema schema, SortOrder sortOrder, int version) {
+this.version = version;
+this.schema = schema;
+this.sortOrder = sortOrder;
+this.size = sortOrder.fields().size();
+
+this.transformedFields = new Types.NestedField[size];
+for (int i = 0; i < size; ++i) {
+  SortField sortField = sortOrder.fields().get(i);
+  Types.NestedField sourceField = schema.findField(sortField.sourceId());
+  Type resultType = 
sortField.transform().getResultType(sourceField.type());
+  Types.NestedField transformedField =
+  Types.NestedField.of(
+  sourceField.fieldId(),
+  sourceField.isOptional(),
+  sourceField.name(),
+  resultType,
+  sourceField.doc());
+  transformedFields[i] = transformedField;
+}

Review Comment:
   This code part seems to be a duplicate for me?
   What is the reason for not using the same underlying method for the 2 
constructors?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-12-05 Thread via GitHub


Guosmilesmile commented on PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#issuecomment-252050

   > I think this looks good, but I'll defer to @stevenzwu and @pvary for the 
final approval.
   
   @mxm Thank you very much for taking the time to review my code. The relevant 
comments have been added. 
   
   @pvary @stevenzwu I would appreciate it if both of you could take a look for 
the final approval.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-12-05 Thread via GitHub


mxm commented on code in PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#discussion_r1871472107


##
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsUtil.java:
##
@@ -73,12 +73,25 @@ static byte[] serializeCompletedStatistics(
   }
 
   static CompletedStatistics deserializeCompletedStatistics(
-  byte[] bytes, TypeSerializer statisticsSerializer) {
+  byte[] bytes, CompletedStatisticsSerializer statisticsSerializer) {
 try {
   DataInputDeserializer input = new DataInputDeserializer(bytes);
-  return statisticsSerializer.deserialize(input);
-} catch (IOException e) {
-  throw new UncheckedIOException("Fail to deserialize aggregated 
statistics", e);
+  CompletedStatistics completedStatistics = 
statisticsSerializer.deserialize(input);
+  if (!completedStatistics.isValid()) {
+throw new RuntimeException("Fail to deserialize aggregated 
statistics,change to v1");
+  }
+
+  return completedStatistics;
+} catch (Exception e) {
+  try {
+statisticsSerializer.changeSortKeySerializerVersion(1);
+DataInputDeserializer input = new DataInputDeserializer(bytes);
+CompletedStatistics deserialize = 
statisticsSerializer.deserialize(input);
+statisticsSerializer.changeSortKeySerializerVersionLatest();
+return deserialize;
+  } catch (IOException ioException) {
+throw new UncheckedIOException("Fail to deserialize aggregated 
statistics", ioException);
+  }

Review Comment:
   We could add a comment here, explaining why switching versions was necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-12-04 Thread via GitHub


Guosmilesmile commented on code in PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#discussion_r1870045831


##
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsUtil.java:
##
@@ -73,12 +73,24 @@ static byte[] serializeCompletedStatistics(
   }
 
   static CompletedStatistics deserializeCompletedStatistics(
-  byte[] bytes, TypeSerializer statisticsSerializer) {
+  byte[] bytes, CompletedStatisticsSerializer statisticsSerializer) {
 try {
   DataInputDeserializer input = new DataInputDeserializer(bytes);
-  return statisticsSerializer.deserialize(input);
-} catch (IOException e) {
-  throw new UncheckedIOException("Fail to deserialize aggregated 
statistics", e);
+  CompletedStatistics completedStatistics = 
statisticsSerializer.deserialize(input);
+  if (!completedStatistics.isValid()) {
+throw new RuntimeException("Fail to deserialize aggregated 
statistics,change to v1");
+  }
+  return completedStatistics;

Review Comment:
   @pvary Thank you very much for your suggestions. I have made the 
corresponding changes based on your advice



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-12-04 Thread via GitHub


pvary commented on code in PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#discussion_r1869958746


##
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsUtil.java:
##
@@ -73,12 +73,24 @@ static byte[] serializeCompletedStatistics(
   }
 
   static CompletedStatistics deserializeCompletedStatistics(
-  byte[] bytes, TypeSerializer statisticsSerializer) {
+  byte[] bytes, CompletedStatisticsSerializer statisticsSerializer) {
 try {
   DataInputDeserializer input = new DataInputDeserializer(bytes);
-  return statisticsSerializer.deserialize(input);
-} catch (IOException e) {
-  throw new UncheckedIOException("Fail to deserialize aggregated 
statistics", e);
+  CompletedStatistics completedStatistics = 
statisticsSerializer.deserialize(input);
+  if (!completedStatistics.isValid()) {
+throw new RuntimeException("Fail to deserialize aggregated 
statistics,change to v1");
+  }
+  return completedStatistics;

Review Comment:
   nit: newline before this line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-12-04 Thread via GitHub


Guosmilesmile commented on PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2517084700

   @mxm @pvary @stevenzwu Thank you all very much for your guidance. I would 
appreciate it if you could take another look and let me know what needs to be 
done next to keep things moving forward.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-12-02 Thread via GitHub


Guosmilesmile commented on PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2511169797

   @mxm Thank you very much for your suggestions. I have made the necessary 
modifications, and I appreciate you taking the time out of your busy schedule 
to review it again. I am very grateful for your guidance.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-12-02 Thread via GitHub


Guosmilesmile commented on PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2511130152

   @mxm Thank you very much for your suggestions. I have made the necessary 
modifications, and I appreciate you taking the time out of your busy schedule 
to review it again. I am very grateful for your guidance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-12-02 Thread via GitHub


mxm commented on code in PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#discussion_r1865523100


##
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java:
##
@@ -310,10 +366,16 @@ public void writeSnapshot(DataOutputView out) throws 
IOException {
 @Override
 public void readSnapshot(int readVersion, DataInputView in, ClassLoader 
userCodeClassLoader)
 throws IOException {
-  if (readVersion == 1) {
-readV1(in);
-  } else {
-throw new IllegalArgumentException("Unknown read version: " + 
readVersion);
+  switch (readVersion) {
+case 1:
+  readV1(in);
+  this.version = 1;
+  break;
+case 2:
+  readV1(in);

Review Comment:
   Can we remove the version suffix in the method name? It handles both 
versions.



##
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/CompletedStatisticsSerializer.java:
##
@@ -93,15 +126,35 @@ public void serialize(CompletedStatistics record, 
DataOutputView target) throws
 
   @Override
   public CompletedStatistics deserialize(DataInputView source) throws 
IOException {
+long checkpointId = source.readLong();
+changeSortKeySerializerVersion(source.readInt());

Review Comment:
   AFAIK We were not writing the serializer version before this change. So not 
sure this works when restoring the serializer to read old data.



##
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsUtil.java:
##
@@ -73,12 +73,17 @@ static byte[] serializeCompletedStatistics(
   }
 
   static CompletedStatistics deserializeCompletedStatistics(
-  byte[] bytes, TypeSerializer statisticsSerializer) {
+  byte[] bytes, CompletedStatisticsSerializer statisticsSerializer) {
 try {
   DataInputDeserializer input = new DataInputDeserializer(bytes);
   return statisticsSerializer.deserialize(input);
-} catch (IOException e) {
-  throw new UncheckedIOException("Fail to deserialize aggregated 
statistics", e);
+} catch (Exception e) {
+  try {
+DataInputDeserializer input = new DataInputDeserializer(bytes);
+return statisticsSerializer.deserializeV1(input);
+  } catch (IOException ioException) {
+throw new UncheckedIOException("Fail to deserialize aggregated 
statistics", ioException);
+  }

Review Comment:
   I see, we're retrying here in case the restore fails.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-12-02 Thread via GitHub


mxm commented on PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2511041771

   Thanks for the update @Guosmilesmile! Unfortunately, we don't have a way to 
encode the serializer version for all serializers, so a best-effort approach to 
retry with a different serializer version on restore failure may be the best we 
can do. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-11-29 Thread via GitHub


Guosmilesmile commented on PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2507769750

   Hi @mxm! Thank you very much for your suggestions. Following your advice, I 
added a version number to SortKeySerializer through restoreSerializer, 
implementing different serialization methods for v1 and v2. Based on the 
performance in the test job, the TaskManager chieve state compatibility. 
However, the JobManager fails to recover due to the DataStatisticsCoordinator, 
which leads to a failure to start because the SortKeySerializer used in 
CompletedStatisticsSerializer and GlobalStatisticsSerializer is instantiated 
directly. This results in a failure when trying to parse the v1 state with the 
v2 version. Additionally, these two serializers do not change after the 
Coordinator is initialized. Given this scenario, do you have any good 
suggestions?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-11-29 Thread via GitHub


mxm commented on code in PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#discussion_r1863289530


##
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java:
##
@@ -310,10 +324,16 @@ public void writeSnapshot(DataOutputView out) throws 
IOException {
 @Override
 public void readSnapshot(int readVersion, DataInputView in, ClassLoader 
userCodeClassLoader)
 throws IOException {
-  if (readVersion == 1) {
-readV1(in);
-  } else {
-throw new IllegalArgumentException("Unknown read version: " + 
readVersion);
+  switch (readVersion) {
+case 1:
+  throw new UnsupportedOperationException(
+  String.format(
+  "No longer supported version [%s]. Please upgrade first . ", 
readVersion));
+case 2:
+  readV2(in);
+  break;

Review Comment:
   Reading the snapshot isn't any different for V1/V2. The `readV1()` or 
`readV2()` methods are identical. We can just keep the current one. However, we 
need to save the serializer version as an `int` field which we can read in 
`restoreSerializer()` to pass it into the serializer, just like the schema and 
the sort order. We can then serialize/deserialize depending on the version used.
   
   For example, in the serializer we would do:
   ```
   if (version == 1) {
 // Don't read/write null indicator
   } else {
 // Read/write null indicator
   }



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-11-29 Thread via GitHub


mxm commented on code in PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#discussion_r1863289530


##
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java:
##
@@ -310,10 +324,16 @@ public void writeSnapshot(DataOutputView out) throws 
IOException {
 @Override
 public void readSnapshot(int readVersion, DataInputView in, ClassLoader 
userCodeClassLoader)
 throws IOException {
-  if (readVersion == 1) {
-readV1(in);
-  } else {
-throw new IllegalArgumentException("Unknown read version: " + 
readVersion);
+  switch (readVersion) {
+case 1:
+  throw new UnsupportedOperationException(
+  String.format(
+  "No longer supported version [%s]. Please upgrade first . ", 
readVersion));
+case 2:
+  readV2(in);
+  break;

Review Comment:
   Reading the snapshot isn't any different for V1/V2. The `readV1()` or 
`readV2()` methods are identical. We can just keep the current one. However, we 
need to save the serializer version as an `int` field which we can read in 
`restoreSerializer()`, just like the schema and the sort order. We can then 
serialize/deserialize depending on the version used.
   
   For example, in the serializer we would do:
   ```
   if (version == 1) {
 // Don't read/write null indicator
   } else {
 // Read/write null indicator
   }



##
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java:
##
@@ -324,6 +344,10 @@ public TypeSerializerSchemaCompatibility 
resolveSchemaCompatibility(
 return TypeSerializerSchemaCompatibility.incompatible();
   }
 
+  if (oldSerializerSnapshot.getCurrentVersion() != 
this.getCurrentVersion()) {
+return TypeSerializerSchemaCompatibility.incompatible();
+  }

Review Comment:
   ```suggestion
 if (oldSerializerSnapshot.getCurrentVersion() == 1 && 
this.getCurrentVersion() == 2) {
   return TypeSerializerSchemaCompatibility.compatibleAfterMigration();
 }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-11-28 Thread via GitHub


Guosmilesmile commented on PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2507255659

   @mxm I'm sorry, I'm a bit confused and would like to ask for your advice. 
Which part do I need to modify?
   
   Based on the current modifications, simply change 
TypeSerializerSchemaCompatibility.incompatible() to compatibleAfterMigration().
   Create a new NullAbleSortKeySerializer, keep the SortKeySerializer, replace 
all calls to SortKeySerializer, and in resolveSchemaCompatibility, return 
compatibleAfterMigration() after checking the type of oldSerializerSnapshot.
   Which option should I choose? Please provide detailed advice. Thank you very 
much.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-11-28 Thread via GitHub


mxm commented on PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2507208103

   @Guosmilesmile Ideally, we want to return compatibleAfterMigration() and 
make sure the old and new serializer can be instantiated. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-11-28 Thread via GitHub


Guosmilesmile commented on PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2506978587

   @mxm @stevenzwu @pvary Thank you all for your suggestions. I have submitted 
a version that mainly modifies the SortKeySerializerSnapshot and implements 
version detection. If the version does not match, it will return the 
corresponding message to avoid parsing exceptions and data errors when 
restoring from an old state to a new state. I would appreciate it if you could 
take a look and see if this modification is feasible


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-11-28 Thread via GitHub


Guosmilesmile commented on PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2505917957

   @mxm Thank you very much for your suggestions. I need to add a version check 
in SortKeySerializerSnapshot. If the state is restored from an old version, I 
will directly return TypeSerializerSchemaCompatibility.incompatible(). Is this 
approach feasible?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-11-28 Thread via GitHub


mxm commented on code in PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#discussion_r1861944660


##
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java:
##
@@ -124,6 +124,14 @@ public void serialize(SortKey record, DataOutputView 
target) throws IOException
 for (int i = 0; i < size; ++i) {
   int fieldId = transformedFields[i].fieldId();
   Type.TypeID typeId = transformedFields[i].type().typeId();
+  Object value = record.get(i, Object.class);

Review Comment:
   Nulls are not allowed everywhere in Flink. Primitive types generally don't 
support null. In other places, Flink uses boolean flags to encode nulls, e.g. 
in PojoSerializer: 
https://github.com/apache/flink/blob/b9c92371dba07b6bfe4368d7b6d7f7c575b4c603/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L392
   
   KeyBy operations are not allowed on null values.
   
   +1 to Steven's suggestion to version the serializer. We already have 
[SortKeySerializerSnapshot](https://github.com/apache/iceberg/blob/163e2068f96f139632488f36928bf443c9be326f/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java#L278).
 We just need to add a version check for the new boolean serializer because 
this is an incompatible change which requires serializer migration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-11-27 Thread via GitHub


Guosmilesmile commented on PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2505426989

   @pvary Because a new flag has been added, the testSerializationSize test 
case needs to be modified to reflect the corresponding size value + 1byte for 
the flag . It has been adjusted. I have run the tests locally myself, and all 
unit tests have passedand I would appreciate it if you could start the tests 
again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-11-27 Thread via GitHub


pvary commented on code in PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#discussion_r1861570865


##
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java:
##
@@ -124,6 +124,14 @@ public void serialize(SortKey record, DataOutputView 
target) throws IOException
 for (int i = 0; i < size; ++i) {
   int fieldId = transformedFields[i].fieldId();
   Type.TypeID typeId = transformedFields[i].type().typeId();
+  Object value = record.get(i, Object.class);

Review Comment:
   @stevenzwu: What do you think about a serializer version, something like the 
SimpleVersionedSerializer uses. Maybe don't store the version for every key, 
but inherit the version from the statistics object?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-11-27 Thread via GitHub


Guosmilesmile commented on code in PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#discussion_r1861359801


##
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java:
##
@@ -124,6 +124,14 @@ public void serialize(SortKey record, DataOutputView 
target) throws IOException
 for (int i = 0; i < size; ++i) {
   int fieldId = transformedFields[i].fieldId();
   Type.TypeID typeId = transformedFields[i].type().typeId();
+  Object value = record.get(i, Object.class);

Review Comment:
   @stevenzwu Can we accept this method to fix the bug for now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-11-27 Thread via GitHub


Guosmilesmile commented on code in PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#discussion_r1861436171


##
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java:
##
@@ -124,6 +124,14 @@ public void serialize(SortKey record, DataOutputView 
target) throws IOException
 for (int i = 0; i < size; ++i) {
   int fieldId = transformedFields[i].fieldId();
   Type.TypeID typeId = transformedFields[i].type().typeId();
+  Object value = record.get(i, Object.class);

Review Comment:
   @stevenzwu  I have looked at the current implementation in Flink, and the 
serialization for null values references the implementation of 
NullableSerializer. The underlying approach is also to add a boolean flag at 
the beginning of the field to distinguish, which is quite similar to the 
approach in this PR. Can we use the current method to fix this issue for now, 
or are there any other better handling methods?
   
   
   ```java
   NullableSerializer extends TypeSerializer
   
   @Override
   public void serialize(T record, DataOutputView target) throws 
IOException {
   if (record == null) {
   target.writeBoolean(true);
   target.write(padding);
   } else {
   target.writeBoolean(false);
   originalSerializer.serialize(record, target);
   }
   }
   
   @Override
   public T deserialize(DataInputView source) throws IOException {
   boolean isNull = deserializeNull(source);
   return isNull ? null : originalSerializer.deserialize(source);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-11-27 Thread via GitHub


Guosmilesmile commented on code in PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#discussion_r1861359801


##
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java:
##
@@ -124,6 +124,14 @@ public void serialize(SortKey record, DataOutputView 
target) throws IOException
 for (int i = 0; i < size; ++i) {
   int fieldId = transformedFields[i].fieldId();
   Type.TypeID typeId = transformedFields[i].type().typeId();
+  Object value = record.get(i, Object.class);

Review Comment:
   @stevenzwu Can we accept this method to fix the bug for now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-11-27 Thread via GitHub


Guosmilesmile commented on code in PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#discussion_r1861359801


##
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java:
##
@@ -124,6 +124,14 @@ public void serialize(SortKey record, DataOutputView 
target) throws IOException
 for (int i = 0; i < size; ++i) {
   int fieldId = transformedFields[i].fieldId();
   Type.TypeID typeId = transformedFields[i].type().typeId();
+  Object value = record.get(i, Object.class);

Review Comment:
   Can we accept this method to fix the bug for now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-11-27 Thread via GitHub


stevenzwu commented on code in PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#discussion_r1861329534


##
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java:
##
@@ -124,6 +124,14 @@ public void serialize(SortKey record, DataOutputView 
target) throws IOException
 for (int i = 0; i < size; ++i) {
   int fieldId = transformedFields[i].fieldId();
   Type.TypeID typeId = transformedFields[i].type().typeId();
+  Object value = record.get(i, Object.class);

Review Comment:
   this will add a boolean flag overhead for every field. but I can't think of 
any better alternative. so this is fine to me.
   
   However, ideally we need to figure out how to evolve it in a compatible way. 
probably leverage `TypeSerializerSnapshot`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-11-27 Thread via GitHub


Guosmilesmile commented on PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2504787892

   @pvary I'm sorry, I just re-uploaded the missing parts. Please help me 
trigger the test again. I apologize for the inconvenience.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-11-27 Thread via GitHub


Guosmilesmile commented on code in PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#discussion_r1860984047


##
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsCoordinator.java:
##
@@ -152,6 +152,80 @@ public void testDataStatisticsEventHandling(StatisticsType 
type) throws Exceptio
 }
   }
 
+  @ParameterizedTest
+  @EnumSource(StatisticsType.class)
+  public void testDataStatisticsEventHandlingWithNullValue(StatisticsType 
type) throws Exception {
+try (DataStatisticsCoordinator dataStatisticsCoordinator = 
createCoordinator(type)) {
+  dataStatisticsCoordinator.start();
+  tasksReady(dataStatisticsCoordinator);
+
+  SortKey NullSortKey = Fixtures.SORT_KEY.copy();

Review Comment:
   @pvary I'm sorry, there was a little mistake just now. Please try again. 
Thank you very much.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-11-27 Thread via GitHub


Guosmilesmile commented on code in PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#discussion_r1860974486


##
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsCoordinator.java:
##
@@ -152,6 +152,80 @@ public void testDataStatisticsEventHandling(StatisticsType 
type) throws Exceptio
 }
   }
 
+  @ParameterizedTest
+  @EnumSource(StatisticsType.class)
+  public void testDataStatisticsEventHandlingWithNullValue(StatisticsType 
type) throws Exception {
+try (DataStatisticsCoordinator dataStatisticsCoordinator = 
createCoordinator(type)) {
+  dataStatisticsCoordinator.start();
+  tasksReady(dataStatisticsCoordinator);
+
+  SortKey NullSortKey = Fixtures.SORT_KEY.copy();

Review Comment:
   @pvary I have change it , please check it, thank you 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-11-27 Thread via GitHub


pvary commented on code in PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#discussion_r1860967334


##
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsCoordinator.java:
##
@@ -152,6 +152,80 @@ public void testDataStatisticsEventHandling(StatisticsType 
type) throws Exceptio
 }
   }
 
+  @ParameterizedTest
+  @EnumSource(StatisticsType.class)
+  public void testDataStatisticsEventHandlingWithNullValue(StatisticsType 
type) throws Exception {
+try (DataStatisticsCoordinator dataStatisticsCoordinator = 
createCoordinator(type)) {
+  dataStatisticsCoordinator.start();
+  tasksReady(dataStatisticsCoordinator);
+
+  SortKey NullSortKey = Fixtures.SORT_KEY.copy();

Review Comment:
   nit: The variable name should start with small letter



##
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsCoordinator.java:
##
@@ -152,6 +152,80 @@ public void testDataStatisticsEventHandling(StatisticsType 
type) throws Exceptio
 }
   }
 
+  @ParameterizedTest
+  @EnumSource(StatisticsType.class)
+  public void testDataStatisticsEventHandlingWithNullValue(StatisticsType 
type) throws Exception {
+try (DataStatisticsCoordinator dataStatisticsCoordinator = 
createCoordinator(type)) {
+  dataStatisticsCoordinator.start();
+  tasksReady(dataStatisticsCoordinator);
+
+  SortKey NullSortKey = Fixtures.SORT_KEY.copy();

Review Comment:
   nit: The variable name should start with lower case letter



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-11-27 Thread via GitHub


Guosmilesmile commented on PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2504108856

   @pvary I have also added a unit test for the scenario of building from the 
table to the sink, which includes data containing null values. Could you please 
review it as well? Thank you very much!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-11-27 Thread via GitHub


Guosmilesmile commented on PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2503954677

   @pvary I have added test cases for DataStatisticsOperator and 
DataStatisticsCoordinator, specifically for the null scenario. I verified the 
ProcessElement and EventHand scenarios, simulating the process of data being 
processed by the operation and receiving statistical information that contains 
null values for handling. Could you please take a look and see if this is 
feasible? Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-11-27 Thread via GitHub


Guosmilesmile commented on PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2503806486

   @pvary  Thank you very much for your guidance. I have made the changes 
according to your suggestions. Please take a look and see if they are feasible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-11-27 Thread via GitHub


pvary commented on PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2503849860

   Thanks for the changes @Guosmilesmile!
   One question remains from my side:
   
   > It would be nice to have an end2end test for null values too.
   > Currently we only tests that the statistics are collected correctly, but 
there might be some issues when applying the stats. It would be nice to have a 
test for this case too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-11-27 Thread via GitHub


pvary commented on PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2503747694

   @Guosmilesmile: Thanks for the changes. Left some comments, but started the 
tests to see if this change cause any other test failures.
   
   Please remove the 1.19, 1.18 changes for now - we usually cherry pick the 
changes to the other versions after the original PR for the main versions has 
been merged. This is better for the reviewer (smaller number of files), and 
better for the contributor (if there is a change request during the review, 
they don't have to keep the different versions in sync)
   
   It would be nice to have an end2end test for null values too.
   Currently we only tests that the statistics are collected correctly, but 
there might be some issues when applying the stats. It would be nice to have a 
test for this case too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-11-27 Thread via GitHub


pvary commented on code in PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#discussion_r1860565686


##
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java:
##
@@ -136,6 +137,27 @@ public void testProcessElement(StatisticsType type) throws 
Exception {
 }
   }
 
+  @ParameterizedTest
+  @EnumSource(StatisticsType.class)
+  public void testProcessElementWithNull(StatisticsType type) throws Exception 
{
+  DataStatisticsOperator operator = createOperator(type, 
Fixtures.NUM_SUBTASKS);
+  try (OneInputStreamOperatorTestHarness 
testHarness =
+ createHarness(operator)) {
+  StateInitializationContext stateContext = getStateContext();
+  operator.initializeState(stateContext);
+  operator.processElement(new StreamRecord<>(GenericRowData.of(null, 
5)));
+  operator.processElement(new 
StreamRecord<>(GenericRowData.of(StringData.fromString("a"), 3)));
+
+  DataStatistics localStatistics = operator.localStatistics();
+  SortKeySerializer sortKeySerializer = new 
SortKeySerializer(Fixtures.SCHEMA, Fixtures.SORT_ORDER);
+  DataStatisticsSerializer taskStatisticsSerializer = new 
DataStatisticsSerializer(sortKeySerializer);
+  DataOutputSerializer outputView = new DataOutputSerializer(1024);
+
+  taskStatisticsSerializer.serialize(localStatistics, outputView);
+  testHarness.endInput();

Review Comment:
   We should check if the resulting statistics is correct



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-11-27 Thread via GitHub


pvary commented on code in PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#discussion_r1860563456


##
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java:
##
@@ -124,6 +124,13 @@ public void serialize(SortKey record, DataOutputView 
target) throws IOException
 for (int i = 0; i < size; ++i) {
   int fieldId = transformedFields[i].fieldId();
   Type.TypeID typeId = transformedFields[i].type().typeId();
+  Object value = record.get(i, Object.class);
+  if (value == null) {
+target.writeBoolean(true);
+continue;
+  } else {
+target.writeBoolean(false);
+  }

Review Comment:
   nit: newline after closing a block and starting another statement



##
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java:
##
@@ -192,6 +199,11 @@ public SortKey deserialize(SortKey reuse, DataInputView 
source) throws IOExcepti
 reuse.size(),
 size);
 for (int i = 0; i < size; ++i) {
+  boolean isNull = source.readBoolean();
+  if (isNull) {
+reuse.set(i, null);
+continue;
+  }

Review Comment:
   nit: newline after closing a block and starting another statement



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-11-26 Thread via GitHub


Guosmilesmile commented on PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2503066336

   @pvary  Yes, you are right. We changed our approach to handle null values 
instead of filtering them out. Before serialization, we added a flag for each 
field, where true indicates that the field is null. During serialization and 
deserialization, we read the flag first, which allows us to correctly identify 
null. The related code has been updated; please take a look to see if it is 
feasible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-11-26 Thread via GitHub


Guosmilesmile commented on PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2502780003

   @ConeyLiu yes,I have added UT to cover the relevant changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



Re: [PR] Flink: Fix range distribution npe when value is null [iceberg]

2024-11-26 Thread via GitHub


ConeyLiu commented on PR #11662:
URL: https://github.com/apache/iceberg/pull/11662#issuecomment-2502609743

   Thanks for the contributions. Should we better handle the null value instead 
of skipping it? Could you also add the UT to cover these changes?
   cc @stevenzwu @pvary 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org