[GitHub] [hudi] Sugamber edited a comment on issue #2637: [SUPPORT] - Partial Update : update few columns of a table
Sugamber edited a comment on issue #2637: URL: https://github.com/apache/hudi/issues/2637#issuecomment-805645355 There is an open pull request for partial update for CoW table. https://github.com/apache/hudi/pull/1929 It looks like my use case is similar to this . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] Sugamber edited a comment on issue #2637: [SUPPORT] - Partial Update : update few columns of a table
Sugamber edited a comment on issue #2637: URL: https://github.com/apache/hudi/issues/2637#issuecomment-804636424 I have created one class after implementing HoodieRecordPayload. We have three methods for which we have to write our logic. 1. preCombine 2. combineAndGetUpdateValue 3. getInsertValue @n3nash As per your above explanation, preCombine would provide the current record which is coming in incremental load and combineAndGetUpdateValue will provide the latest records from hoodie table. Please correct me if my understanding is incorrect. In my use case , I'm only getting few columns out of 20 in incremental data. I have built schema in constructor as preCombine method does not have any schema details. For Example - Hudi table built with 20 columns. Now, requirement is to update only 3 columns and only these columns data is coming from incremental data feeds along with RECORDKEY_FIELD_OPT_KEY,PARTITIONPATH_FIELD_OPT_KEY and PRECOMBINE_FIELD_OPT_KEY column. I have implemented the class as below. Please let me know in which method, I'll be getting full schema of the table. I also found that getInsertValue method is getting invoked even this is not called in combineAndGetUpdate method. I would like to know more about these three methods and its flow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] Sugamber edited a comment on issue #2637: [SUPPORT] - Partial Update : update few columns of a table
Sugamber edited a comment on issue #2637: URL: https://github.com/apache/hudi/issues/2637#issuecomment-804748373 Can this use case be achieved using Hudi as target schema and incremental schema are not same? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] Sugamber edited a comment on issue #2637: [SUPPORT] - Partial Update : update few columns of a table
Sugamber edited a comment on issue #2637: URL: https://github.com/apache/hudi/issues/2637#issuecomment-804636424 I have created one class after implementing HoodieRecordPayload. We have three methods for which we have to write our logic. 1. preCombine 2. combineAndGetUpdateValue 3. getInsertValue @n3nash As per your above explanation, preCombine would provide the current record which is coming in incremental load and combineAndGetUpdateValue will provide the latest records from hoodie table. Please correct me if my understanding is incorrect. In my use case , I'm only getting few columns out of 20 in incremental data. I have built schema in constructor as preCombine method does not have any schema details. For Example - Hudi table built with 20 columns. Now, requirement is to update only 3 columns and only these columns data is coming from incremental data feeds along with RECORDKEY_FIELD_OPT_KEY,PARTITIONPATH_FIELD_OPT_KEY and PRECOMBINE_FIELD_OPT_KEY column. I have implemented the class as below. Please let me know in which method, I'll be getting full schema of the table. I also found that getInsertValue method is getting invoked even this is not called in combineAndGetUpdate method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] Sugamber edited a comment on issue #2637: [SUPPORT] - Partial Update : update few columns of a table
Sugamber edited a comment on issue #2637: URL: https://github.com/apache/hudi/issues/2637#issuecomment-804636424 I have created one class after implementing HoodieRecordPayload. We have three methods for which we have to write our logic. 1. preCombine 2. combineAndGetUpdateValue 3. getInsertValue @n3nash As per your above explanation, preCombine would provide the current record which is coming in incremental load and combineAndGetUpdateValue will provide the latest records from hoodie table. Please correct me if my understanding is incorrect. In my use case , I'm only getting few columns out of 20 in incremental data. preCombine method does not have any schema details. For Example - Hudi table built with 20 columns. Now, requirement is to update only 3 columns and only these columns data is coming from incremental data feeds along with RECORDKEY_FIELD_OPT_KEY,PARTITIONPATH_FIELD_OPT_KEY and PRECOMBINE_FIELD_OPT_KEY column. I have implemented the class as below. Please let me know in which method, I'll be getting full schema of the table. I also found that getInsertValue method is getting invoked even this is not called under combineAndGetUpdate method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] Sugamber edited a comment on issue #2637: [SUPPORT] - Partial Update : update few columns of a table
Sugamber edited a comment on issue #2637: URL: https://github.com/apache/hudi/issues/2637#issuecomment-804636424 I have created one class after implementing HoodieRecordPayload. We have three methods for which we have to write our logic. 1. preCombine 2. combineAndGetUpdateValue 3. getInsertValue @n3nash As per your above explanation, preCombine would provide the current record which is coming in incremental load and combineAndGetUpdateValue will provide the latest records from hoodie table. Please correct me if my understanding is incorrect. In my use case , I'm only getting few columns out of 20 in incremental data. preCombine method does not have any schema details. For Example - Hudi table built with 20 columns. Now, requirement is to update only 3 columns and only these columns data is coming from incremental data feeds along with RECORDKEY_FIELD_OPT_KEY,PARTITIONPATH_FIELD_OPT_KEY and PRECOMBINE_FIELD_OPT_KEY column. I have implemented the class as below. Please let me know in which method, I'll be getting full schema of the table. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] Sugamber edited a comment on issue #2637: [SUPPORT] - Partial Update : update few columns of a table
Sugamber edited a comment on issue #2637: URL: https://github.com/apache/hudi/issues/2637#issuecomment-804636424 I have created one class after implementing HoodieRecordPayload. We have three methods for which we have to write our logic. 1. preCombine 2. combineAndGetUpdateValue 3. getInsertValue @n3nash As per your above explanation, preCombine would provide the current record which is coming in incremental load and combineAndGetUpdateValue will provide the latest records from hoodie table. Please correct me if my understanding is incorrect. In my use case , I'm only getting few columns out of 20 in incremental data. preCombine method does not have any schema details. For Example - Hudi table built with 20 columns. Now, requirement is to update only 3 columns and only these columns data is coming from incremental data feeds along with RECORDKEY_FIELD_OPT_KEY,PARTITIONPATH_FIELD_OPT_KEY and PRECOMBINE_FIELD_OPT_KEY column. I have implemented the class as below. Please let me know in which method, I'll be getting full schema of the table. `public class PartialColumnUpdate implements HoodieRecordPayload { private static final Logger logger = Logger.getLogger(PartialColumnUpdate.class); private byte[] recordBytes; private Schema schema; private Comparable orderingVal; public PartialColumnUpdate(GenericRecord genericRecord, Comparable orderingVal) { logger.info("Inside two parameter cons"); try { if (genericRecord != null) { this.recordBytes = HoodieAvroUtils.avroToBytes(genericRecord); this.schema = genericRecord.getSchema(); this.orderingVal = orderingVal; } else { this.recordBytes = new byte[0]; } } catch (Exception io) { throw new RuntimeException("Cannot convert record to bytes ", io); } } public PartialColumnUpdate(Option record) { this(record.isPresent() ? record.get() : null, 0); } @Override public PartialColumnUpdate preCombine(PartialColumnUpdate anotherRecord) { logger.info("Inside PreCombine"); logger.info("preCombine => " + anotherRecord); logger.info("another_ordering value" + anotherRecord.orderingVal); logger.info("another_ schema value" + anotherRecord.schema); logger.info("another_ record bytes value" + anotherRecord.recordBytes); if (anotherRecord.orderingVal.compareTo(orderingVal) > 0) { return anotherRecord; } else { return this; } } @Override public Option combineAndGetUpdateValue(IndexedRecord indexedRecord, Schema currentSchema) throws IOException { logger.info("Inside combineAndGetUpdateValue"); logger.info("current schema" + currentSchema); logger.info("combineUpdate - >" + Option.of(indexedRecord)); getInsertValue(currentSchema); return Option.empty(); } @Override public Option getInsertValue(Schema schema) throws IOException { logger.info("Inside getInsertValue"); if (recordBytes.length == 0) { return Option.empty(); } IndexedRecord indexedRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema); if (isDeleteRecord((GenericRecord) indexedRecord)) { return Option.empty(); } else { return Option.of(indexedRecord); } } protected boolean isDeleteRecord(GenericRecord genericRecord) { final String isDeleteKey = "_hoodie_is_deleted"; if (genericRecord.getSchema().getField(isDeleteKey) == null) { return false; } Object deleteMarker = genericRecord.get(isDeleteKey); return (deleteMarker instanceof Boolean && (boolean) deleteMarker); } }` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] Sugamber edited a comment on issue #2637: [SUPPORT] - Partial Update : update few columns of a table
Sugamber edited a comment on issue #2637: URL: https://github.com/apache/hudi/issues/2637#issuecomment-804636424 I have created one class after implementing HoodieRecordPayload. We have three methods for which we have to write our logic. 1. preCombine 2. combineAndGetUpdateValue 3. getInsertValue @n3nash As per your above explanation, preCombine would provide the current record which is coming in incremental load and combineAndGetUpdateValue will provide the latest records from hoodie table. Please correct me if my understanding is incorrect. In my use case , I'm only getting few columns out of 20 in incremental data. preCombine method does not have any schema details. For Example - Hudi table built with 20 columns. Now, requirement is to update only 3 columns and only these columns data is coming from incremental data feeds along with RECORDKEY_FIELD_OPT_KEY,PARTITIONPATH_FIELD_OPT_KEY and PRECOMBINE_FIELD_OPT_KEY column. I have implemented the class as below. Please let me know in which method, I'll be getting full schema of the table. ` public class PartialColumnUpdate implements HoodieRecordPayload { private static final Logger logger = Logger.getLogger(PartialColumnUpdate.class); private byte[] recordBytes; private Schema schema; private Comparable orderingVal; public PartialColumnUpdate(GenericRecord genericRecord, Comparable orderingVal) { logger.info("Inside two parameter cons"); try { if (genericRecord != null) { this.recordBytes = HoodieAvroUtils.avroToBytes(genericRecord); this.schema = genericRecord.getSchema(); this.orderingVal = orderingVal; } else { this.recordBytes = new byte[0]; } } catch (Exception io) { throw new RuntimeException("Cannot convert record to bytes ", io); } } public PartialColumnUpdate(Option record) { this(record.isPresent() ? record.get() : null, 0); } @Override public PartialColumnUpdate preCombine(PartialColumnUpdate anotherRecord) { logger.info("Inside PreCombine"); logger.info("preCombine => " + anotherRecord); logger.info("another_ordering value" + anotherRecord.orderingVal); logger.info("another_ schema value" + anotherRecord.schema); logger.info("another_ record bytes value" + anotherRecord.recordBytes); if (anotherRecord.orderingVal.compareTo(orderingVal) > 0) { return anotherRecord; } else { return this; } } @Override public Option combineAndGetUpdateValue(IndexedRecord indexedRecord, Schema currentSchema) throws IOException { logger.info("Inside combineAndGetUpdateValue"); logger.info("current schema" + currentSchema); logger.info("combineUpdate - >" + Option.of(indexedRecord)); getInsertValue(currentSchema); return Option.empty(); } @Override public Option getInsertValue(Schema schema) throws IOException { logger.info("Inside getInsertValue"); if (recordBytes.length == 0) { return Option.empty(); } IndexedRecord indexedRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema); if (isDeleteRecord((GenericRecord) indexedRecord)) { return Option.empty(); } else { return Option.of(indexedRecord); } } protected boolean isDeleteRecord(GenericRecord genericRecord) { final String isDeleteKey = "_hoodie_is_deleted"; if (genericRecord.getSchema().getField(isDeleteKey) == null) { return false; } Object deleteMarker = genericRecord.get(isDeleteKey); return (deleteMarker instanceof Boolean && (boolean) deleteMarker); } } ` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org