[GitHub] [hudi] danny0405 commented on a change in pull request #3285: [HUDI-1771] Propagate CDC format for hoodie

2021-07-28 Thread GitBox


danny0405 commented on a change in pull request #3285:
URL: https://github.com/apache/hudi/pull/3285#discussion_r672894012



##
File path: 
hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
##
@@ -175,13 +176,32 @@ void testReadWithDeletes() throws Exception {
 
 List result = readData(inputFormat);
 
-final String actual = TestData.rowDataToString(result);
+final String actual = TestData.rowDataToString(result, true);
 final String expected = "["
-+ "id1,Danny,24,1970-01-01T00:00:00.001,par1, "
-+ "id2,Stephen,34,1970-01-01T00:00:00.002,par1, "
-+ "id3,null,null,null,null, "
-+ "id5,null,null,null,null, "
-+ "id9,null,null,null,null]";
++ "+I(id1,Danny,24,1970-01-01T00:00:00.001,par1), "
++ "+I(id2,Stephen,34,1970-01-01T00:00:00.002,par1), "
++ "-D(id3,Julian,53,1970-01-01T00:00:00.003,par2), "
++ "-D(id5,Sophia,18,1970-01-01T00:00:00.005,par3), "
++ "-D(id9,Jane,19,1970-01-01T00:00:00.006,par3)]";
+assertThat(actual, is(expected));
+  }
+
+  @Test
+  void testReadWithDeletesCOW() throws Exception {
+beforeEach(HoodieTableType.COPY_ON_WRITE);
+
+// write another commit to read again
+TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);
+
+InputFormat inputFormat = this.tableSource.getInputFormat();
+assertThat(inputFormat, instanceOf(CopyOnWriteInputFormat.class));
+
+List result = readData(inputFormat);
+
+final String actual = TestData.rowDataToString(result, true);
+final String expected = "["

Review comment:
   I think spark may need to set up the change flag column correctly when 
write into hoodie, but actually spark `InternalRow` does not support builtin 
change flag, user  need to specify a column explicitly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #3285: [HUDI-1771] Propagate CDC format for hoodie

2021-07-21 Thread GitBox


danny0405 commented on a change in pull request #3285:
URL: https://github.com/apache/hudi/pull/3285#discussion_r674454632



##
File path: 
hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
##
@@ -615,24 +621,25 @@ public boolean reachedEnd() throws IOException {
   while (logKeysIterator.hasNext()) {
 final String curKey = logKeysIterator.next();
 if (!keyToSkip.contains(curKey)) {
-  Option insertAvroRecord = getInsetValue(curKey);
+  final HoodieRecord record = logRecords.get(curKey);
+  Option insertAvroRecord = getInsetValue(record);
   if (insertAvroRecord.isPresent()) {
 // the record is a DELETE if insertAvroRecord not present, skipping
-GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
+GenericRecord avroRecord = buildAvroRecordBySchema(
 insertAvroRecord.get(),
 requiredSchema,
 requiredPos,
 recordBuilder);
-this.currentRecord = (RowData) 
avroToRowDataConverter.convert(requiredAvroRecord);
+this.currentRecord = (RowData) 
avroToRowDataConverter.convert(avroRecord);
+
this.currentRecord.setRowKind(FormatUtils.getRowKind(insertAvroRecord.get(), 
this.operationPos));
 return false;
   }
 }
   }
   return true;
 }
 
-private Option getInsetValue(String curKey) throws 
IOException {
-  final HoodieRecord record = logRecords.get(curKey);
+private Option getInsetValue(HoodieRecord record) throws 
IOException {

Review comment:
   Seems unnecessary change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #3285: [HUDI-1771] Propagate CDC format for hoodie

2021-07-20 Thread GitBox


danny0405 commented on a change in pull request #3285:
URL: https://github.com/apache/hudi/pull/3285#discussion_r672894012



##
File path: 
hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
##
@@ -175,13 +176,32 @@ void testReadWithDeletes() throws Exception {
 
 List result = readData(inputFormat);
 
-final String actual = TestData.rowDataToString(result);
+final String actual = TestData.rowDataToString(result, true);
 final String expected = "["
-+ "id1,Danny,24,1970-01-01T00:00:00.001,par1, "
-+ "id2,Stephen,34,1970-01-01T00:00:00.002,par1, "
-+ "id3,null,null,null,null, "
-+ "id5,null,null,null,null, "
-+ "id9,null,null,null,null]";
++ "+I(id1,Danny,24,1970-01-01T00:00:00.001,par1), "
++ "+I(id2,Stephen,34,1970-01-01T00:00:00.002,par1), "
++ "-D(id3,Julian,53,1970-01-01T00:00:00.003,par2), "
++ "-D(id5,Sophia,18,1970-01-01T00:00:00.005,par3), "
++ "-D(id9,Jane,19,1970-01-01T00:00:00.006,par3)]";
+assertThat(actual, is(expected));
+  }
+
+  @Test
+  void testReadWithDeletesCOW() throws Exception {
+beforeEach(HoodieTableType.COPY_ON_WRITE);
+
+// write another commit to read again
+TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);
+
+InputFormat inputFormat = this.tableSource.getInputFormat();
+assertThat(inputFormat, instanceOf(CopyOnWriteInputFormat.class));
+
+List result = readData(inputFormat);
+
+final String actual = TestData.rowDataToString(result, true);
+final String expected = "["

Review comment:
   I think spark may need to set up the change flag column correctly when 
write into hoodie, but actually spark `InternalRow` does not support builtin 
change flag, use may need to specify a column explicitly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #3285: [HUDI-1771] Propagate CDC format for hoodie

2021-07-20 Thread GitBox


danny0405 commented on a change in pull request #3285:
URL: https://github.com/apache/hudi/pull/3285#discussion_r672172626



##
File path: 
hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
##
@@ -624,6 +634,14 @@ public boolean reachedEnd() throws IOException {
   return true;
 }
 
+private Option getInsetValue(String curKey) throws 
IOException {
+  final HoodieRecord record = logRecords.get(curKey);
+  if (HoodieOperation.isDelete(record.getOperation())) {
+return Option.empty();

Review comment:
   Put `!emitDelete` in the front.

##
File path: 
hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
##
@@ -175,13 +176,32 @@ void testReadWithDeletes() throws Exception {
 
 List result = readData(inputFormat);
 
-final String actual = TestData.rowDataToString(result);
+final String actual = TestData.rowDataToString(result, true);
 final String expected = "["
-+ "id1,Danny,24,1970-01-01T00:00:00.001,par1, "
-+ "id2,Stephen,34,1970-01-01T00:00:00.002,par1, "
-+ "id3,null,null,null,null, "
-+ "id5,null,null,null,null, "
-+ "id9,null,null,null,null]";
++ "+I(id1,Danny,24,1970-01-01T00:00:00.001,par1), "
++ "+I(id2,Stephen,34,1970-01-01T00:00:00.002,par1), "
++ "-D(id3,Julian,53,1970-01-01T00:00:00.003,par2), "
++ "-D(id5,Sophia,18,1970-01-01T00:00:00.005,par3), "
++ "-D(id9,Jane,19,1970-01-01T00:00:00.006,par3)]";
+assertThat(actual, is(expected));
+  }
+
+  @Test
+  void testReadWithDeletesCOW() throws Exception {
+beforeEach(HoodieTableType.COPY_ON_WRITE);
+
+// write another commit to read again
+TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);
+
+InputFormat inputFormat = this.tableSource.getInputFormat();
+assertThat(inputFormat, instanceOf(CopyOnWriteInputFormat.class));
+
+List result = readData(inputFormat);
+
+final String actual = TestData.rowDataToString(result, true);
+final String expected = "["

Review comment:
   I think spark may need to set up the change flag column correctly when 
write into hoodie, but actually spark `InternalRow` does not support builtin 
change flag, use may need to specify a column explicitly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #3285: [HUDI-1771] Propagate CDC format for hoodie

2021-07-20 Thread GitBox


danny0405 commented on a change in pull request #3285:
URL: https://github.com/apache/hudi/pull/3285#discussion_r672894012



##
File path: 
hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
##
@@ -175,13 +176,32 @@ void testReadWithDeletes() throws Exception {
 
 List result = readData(inputFormat);
 
-final String actual = TestData.rowDataToString(result);
+final String actual = TestData.rowDataToString(result, true);
 final String expected = "["
-+ "id1,Danny,24,1970-01-01T00:00:00.001,par1, "
-+ "id2,Stephen,34,1970-01-01T00:00:00.002,par1, "
-+ "id3,null,null,null,null, "
-+ "id5,null,null,null,null, "
-+ "id9,null,null,null,null]";
++ "+I(id1,Danny,24,1970-01-01T00:00:00.001,par1), "
++ "+I(id2,Stephen,34,1970-01-01T00:00:00.002,par1), "
++ "-D(id3,Julian,53,1970-01-01T00:00:00.003,par2), "
++ "-D(id5,Sophia,18,1970-01-01T00:00:00.005,par3), "
++ "-D(id9,Jane,19,1970-01-01T00:00:00.006,par3)]";
+assertThat(actual, is(expected));
+  }
+
+  @Test
+  void testReadWithDeletesCOW() throws Exception {
+beforeEach(HoodieTableType.COPY_ON_WRITE);
+
+// write another commit to read again
+TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);
+
+InputFormat inputFormat = this.tableSource.getInputFormat();
+assertThat(inputFormat, instanceOf(CopyOnWriteInputFormat.class));
+
+List result = readData(inputFormat);
+
+final String actual = TestData.rowDataToString(result, true);
+final String expected = "["

Review comment:
   I think spark may need to set up the change flag column correctly when 
write into hoodie, but actually spark `InternalRow` does not support builtin 
change flag, use may need to specify a column explicitly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #3285: [HUDI-1771] Propagate CDC format for hoodie

2021-07-19 Thread GitBox


danny0405 commented on a change in pull request #3285:
URL: https://github.com/apache/hudi/pull/3285#discussion_r672172626



##
File path: 
hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
##
@@ -624,6 +634,14 @@ public boolean reachedEnd() throws IOException {
   return true;
 }
 
+private Option getInsetValue(String curKey) throws 
IOException {
+  final HoodieRecord record = logRecords.get(curKey);
+  if (HoodieOperation.isDelete(record.getOperation())) {
+return Option.empty();

Review comment:
   Put `!emitDelete` in the front.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] danny0405 commented on a change in pull request #3285: [HUDI-1771] Propagate CDC format for hoodie

2021-07-19 Thread GitBox


danny0405 commented on a change in pull request #3285:
URL: https://github.com/apache/hudi/pull/3285#discussion_r672027021



##
File path: 
hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
##
@@ -44,6 +46,28 @@
   private FormatUtils() {
   }
 
+  public static RowKind getRowKind(IndexedRecord record, int index) {
+if (index == -1) {
+  return RowKind.INSERT;

Review comment:
   Add comment for the method: Returns the RowKind of the given record, 
never null.
   Returns `RowKind.INSERT` when the given field index not found.

##
File path: hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
##
@@ -312,14 +318,18 @@ private DataType getProducedDataType() {
 TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
 final Schema tableAvroSchema;
 try {
-  tableAvroSchema = schemaUtil.getTableAvroSchema();
+  // always includes the change flag
+  tableAvroSchema = schemaUtil.getTableAvroSchema(true);

Review comment:
   Seems unnecessary change.

##
File path: 
hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
##
@@ -132,14 +133,20 @@
*/
   private boolean emitDelete;
 
+  /**
+   * Position of the hoodie cdc operation.
+   */
+  private int operationPos;
+

Review comment:
   Move to `MergeOnReadTableState`.

##
File path: 
hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
##
@@ -639,8 +657,12 @@ public void close() throws IOException {
 private Option mergeRowWithLog(
 RowData curRow,
 String curKey) throws IOException {
+  final HoodieRecord record = logRecords.get(curKey);
+  if (HoodieOperation.isDelete(record.getOperation())) {
+return Option.empty();
+  }

Review comment:
   We need to emit the deletes when flag `emitDelete` is true.

##
File path: hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
##
@@ -312,14 +318,18 @@ private DataType getProducedDataType() {
 TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
 final Schema tableAvroSchema;
 try {
-  tableAvroSchema = schemaUtil.getTableAvroSchema();
+  // always includes the change flag
+  tableAvroSchema = schemaUtil.getTableAvroSchema(true);
 } catch (Exception e) {
   throw new HoodieException("Get table avro schema error", e);
 }
 final DataType rowDataType = 
AvroSchemaConverter.convertToDataType(tableAvroSchema);
 final RowType rowType = (RowType) rowDataType.getLogicalType();
 final RowType requiredRowType = (RowType) 
getProducedDataType().notNull().getLogicalType();
 
+final Schema.Field operationField = 
tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD);
+final int operationPos = operationField == null ? -1 : 
operationField.pos();

Review comment:
   We can move the `operationPos` into the `MergeOnReadTableState` to 
simplify the logic.

##
File path: 
hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
##
@@ -624,6 +634,14 @@ public boolean reachedEnd() throws IOException {
   return true;
 }
 
+private Option getInsetValue(String curKey) throws 
IOException {
+  final HoodieRecord record = logRecords.get(curKey);
+  if (HoodieOperation.isDelete(record.getOperation())) {
+return Option.empty();

Review comment:
   We need to emit the deletes when flag `emitDelete` is true.

##
File path: hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
##
@@ -196,7 +198,11 @@ public boolean isBounded() {
 
   @Override
   public ChangelogMode getChangelogMode() {
-return ChangelogMode.insertOnly();
+if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
+  return ChangelogModes.FULL;
+} else {
+  return ChangelogMode.insertOnly();
+}

Review comment:
   ```java
   return conf.getBoolean(FlinkOptions.READ_AS_STREAMING)
   ? ChangelogModes.FULL
   : ChangelogMode.insertOnly();
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org