[GitHub] [hudi] Sugamber commented on issue #2637: [SUPPORT] - Partial Update : update few columns of a table

2021-04-03 Thread GitBox


Sugamber commented on issue #2637:
URL: https://github.com/apache/hudi/issues/2637#issuecomment-812973626


   @n3nash  I'm waiting for below pull request to merge. Please let me know if 
we can close once pull request merged.
   https://github.com/apache/hudi/pull/2666


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] Sugamber commented on issue #2637: [SUPPORT] - Partial Update : update few columns of a table

2021-03-31 Thread GitBox


Sugamber commented on issue #2637:
URL: https://github.com/apache/hudi/issues/2637#issuecomment-810995008


   Thank you!!!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] Sugamber commented on issue #2637: [SUPPORT] - Partial Update : update few columns of a table

2021-03-24 Thread GitBox


Sugamber commented on issue #2637:
URL: https://github.com/apache/hudi/issues/2637#issuecomment-805713856


   @nsivabalan  Do we have any timeline for this pull request ?
   Pull request 1- https://github.com/apache/hudi/pull/1929/
   Pull request 2- https://github.com/apache/hudi/pull/2666
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] Sugamber commented on issue #2637: [SUPPORT] - Partial Update : update few columns of a table

2021-03-24 Thread GitBox


Sugamber commented on issue #2637:
URL: https://github.com/apache/hudi/issues/2637#issuecomment-805645355


   There is an open pull request for partial update for CoW table.
   https://github.com/apache/hudi/pull/1929
   
   It looks like my use case .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] Sugamber commented on issue #2637: [SUPPORT] - Partial Update : update few columns of a table

2021-03-23 Thread GitBox


Sugamber commented on issue #2637:
URL: https://github.com/apache/hudi/issues/2637#issuecomment-805503994


   @n3nash  Please confirm if this use case can be achieved . If yes, provide 
few inputs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] Sugamber commented on issue #2637: [SUPPORT] - Partial Update : update few columns of a table

2021-03-23 Thread GitBox


Sugamber commented on issue #2637:
URL: https://github.com/apache/hudi/issues/2637#issuecomment-804748373


   Can this use case be achieved using Hudi?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] Sugamber commented on issue #2637: [SUPPORT] - Partial Update : update few columns of a table

2021-03-22 Thread GitBox


Sugamber commented on issue #2637:
URL: https://github.com/apache/hudi/issues/2637#issuecomment-804639531


   public class PartialColumnUpdate implements 
HoodieRecordPayload {
   private static final Logger logger = 
Logger.getLogger(PartialColumnUpdate.class);
   private byte[] recordBytes;
   private Schema schema;
   private Comparable orderingVal;
   
   
   public PartialColumnUpdate(GenericRecord genericRecord, Comparable 
orderingVal) {
   logger.info("Inside two parameter cons");
   try {
   if (genericRecord != null) {
   this.recordBytes = 
HoodieAvroUtils.avroToBytes(genericRecord);
   this.schema = genericRecord.getSchema();
   this.orderingVal = orderingVal;
   } else {
   this.recordBytes = new byte[0];
   }
   } catch (Exception io) {
   throw new RuntimeException("Cannot convert record to bytes ", 
io);
   }
   }
   
   public PartialColumnUpdate(Option record) {
   this(record.isPresent() ? record.get() : null, 0);
   }
   
   @Override
   public PartialColumnUpdate preCombine(PartialColumnUpdate anotherRecord) 
{
   logger.info("Inside PreCombine");
   logger.info("preCombine => " + anotherRecord);
   logger.info("another_ordering value" + anotherRecord.orderingVal);
   logger.info("another_ schema value" + anotherRecord.schema);
   logger.info("another_ record bytes value" + 
anotherRecord.recordBytes);
   if (anotherRecord.orderingVal.compareTo(orderingVal) > 0) {
   return anotherRecord;
   } else {
   return this;
   }
   }
   
   
   @Override
   public Option combineAndGetUpdateValue(IndexedRecord 
indexedRecord, Schema currentSchema) throws IOException {
   logger.info("Inside combineAndGetUpdateValue");
   logger.info("current schema" + currentSchema);
   logger.info("combineUpdate - >" + Option.of(indexedRecord));
   getInsertValue(currentSchema);
   return Option.empty();
   }
   
   @Override
   public Option getInsertValue(Schema schema) throws 
IOException {
   logger.info("Inside getInsertValue");
   if (recordBytes.length == 0) {
   return Option.empty();
   }
   IndexedRecord indexedRecord = 
HoodieAvroUtils.bytesToAvro(recordBytes, schema);
   if (isDeleteRecord((GenericRecord) indexedRecord)) {
   return Option.empty();
   } else {
   return Option.of(indexedRecord);
   }
   }
   
   protected boolean isDeleteRecord(GenericRecord genericRecord) {
   final String isDeleteKey = "_hoodie_is_deleted";
   if (genericRecord.getSchema().getField(isDeleteKey) == null) {
   return false;
   }
   Object deleteMarker = genericRecord.get(isDeleteKey);
   return (deleteMarker instanceof Boolean && (boolean) deleteMarker);
   }
   }


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] Sugamber commented on issue #2637: [SUPPORT] - Partial Update : update few columns of a table

2021-03-22 Thread GitBox


Sugamber commented on issue #2637:
URL: https://github.com/apache/hudi/issues/2637#issuecomment-804636424


   I have created one class after implementing HoodieRecordPayload. We have 
three methods for which we have to write our logic.
   1. preCombine
   2. combineAndGetUpdateValue
   3. getInsertValue
   @n3nash  As per your above explanation,  preCombine would provide the 
current record which is coming in incremental load and combineAndGetUpdateValue 
will provide the latest records from hoodie table.
   Please correct me if my understanding is incorrect.
   
   In my use case , I'm only getting few columns out of 20 in incremental data. 
preCombine method does not have any schema details.
   For Example - Hudi table  built with 20 columns. Now, requirement is to 
update only 3 columns and only these columns data is coming from incremental 
data feeds along with RECORDKEY_FIELD_OPT_KEY,PARTITIONPATH_FIELD_OPT_KEY and 
PRECOMBINE_FIELD_OPT_KEY column.
   I have implemented the class as below. Please let me know in which method, 
I'll be getting  full schema of the table.
   
   `public class PartialColumnUpdate implements 
HoodieRecordPayload {
   private static final Logger logger = 
Logger.getLogger(PartialColumnUpdate.class);
   private byte[] recordBytes;
   private Schema schema;
   private Comparable orderingVal;
   
   
   public PartialColumnUpdate(GenericRecord genericRecord, Comparable 
orderingVal) {
   logger.info("Inside two parameter cons");
   try {
   if (genericRecord != null) {
   this.recordBytes = 
HoodieAvroUtils.avroToBytes(genericRecord);
   this.schema = genericRecord.getSchema();
   this.orderingVal = orderingVal;
   } else {
   this.recordBytes = new byte[0];
   }
   } catch (Exception io) {
   throw new RuntimeException("Cannot convert record to bytes ", 
io);
   }
   }
   
   public PartialColumnUpdate(Option record) {
   this(record.isPresent() ? record.get() : null, 0);
   }
   
   @Override
   public PartialColumnUpdate preCombine(PartialColumnUpdate anotherRecord) 
{
   logger.info("Inside PreCombine");
   logger.info("preCombine => " + anotherRecord);
   logger.info("another_ordering value" + anotherRecord.orderingVal);
   logger.info("another_ schema value" + anotherRecord.schema);
   logger.info("another_ record bytes value" + 
anotherRecord.recordBytes);
   if (anotherRecord.orderingVal.compareTo(orderingVal) > 0) {
   return anotherRecord;
   } else {
   return this;
   }
   }
   
   
   @Override
   public Option combineAndGetUpdateValue(IndexedRecord 
indexedRecord, Schema currentSchema) throws IOException {
   logger.info("Inside combineAndGetUpdateValue");
   logger.info("current schema" + currentSchema);
   logger.info("combineUpdate - >" + Option.of(indexedRecord));
   getInsertValue(currentSchema);
   return Option.empty();
   }
   
   @Override
   public Option getInsertValue(Schema schema) throws 
IOException {
   logger.info("Inside getInsertValue");
   if (recordBytes.length == 0) {
   return Option.empty();
   }
   IndexedRecord indexedRecord = 
HoodieAvroUtils.bytesToAvro(recordBytes, schema);
   if (isDeleteRecord((GenericRecord) indexedRecord)) {
   return Option.empty();
   } else {
   return Option.of(indexedRecord);
   }
   }
   
   protected boolean isDeleteRecord(GenericRecord genericRecord) {
   final String isDeleteKey = "_hoodie_is_deleted";
   if (genericRecord.getSchema().getField(isDeleteKey) == null) {
   return false;
   }
   Object deleteMarker = genericRecord.get(isDeleteKey);
   return (deleteMarker instanceof Boolean && (boolean) deleteMarker);
   }
   }`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] Sugamber commented on issue #2637: [SUPPORT] - Partial Update : update few columns of a table

2021-03-22 Thread GitBox


Sugamber commented on issue #2637:
URL: https://github.com/apache/hudi/issues/2637#issuecomment-804608999


   @nsivabalan , I had created shaded jar and it was causing the issues as few 
dependencies version were conflicting. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] Sugamber commented on issue #2637: [SUPPORT] - Partial Update : update few columns of a table

2021-03-22 Thread GitBox


Sugamber commented on issue #2637:
URL: https://github.com/apache/hudi/issues/2637#issuecomment-804002453


   I'm able to resolve class not found exception.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] Sugamber commented on issue #2637: [SUPPORT] - Partial Update : update few columns of a table

2021-03-15 Thread GitBox


Sugamber commented on issue #2637:
URL: https://github.com/apache/hudi/issues/2637#issuecomment-799211234


   @nsivabalan  Yes, 
   I have added the jar file in both driver and executor class path.
   `spark-submit --jars 
/path/lib/orders-poc-1.0.41-SNAPSHOT-shaded.jar,/path/hudi-support-jars/org.apache.avro_avro-1.8.2.jar,/path/hudi-support-jars/spark-avro_2.11-2.4.4.jar,/path/hudi-support-jars/hudi-spark-bundle_2.11-0.7.0.jar
 --master yarn --deploy-mode cluster --num-executors 2 --executor-cores 4 
--executor-memory 8g --driver-memory=8g --queue=default --conf 
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf 
spark.driver.extraClassPath=org.apache.avro_avro-1.8.2.jar:spark-avro_2.11-2.4.4.jar:hudi-spark-bundle_2.11-0.7.0.jar:/path/lib/orders-poc-1.0.41-SNAPSHOT-shaded.jar
 --conf 
spark.executor.extraClassPath=org.apache.avro_avro-1.8.2.jar:spark-avro_2.11-2.4.4.jar:hudi-spark-bundle_2.11-0.7.0.jar:/path/lib/orders-poc-1.0.41-SNAPSHOT-shaded.jar
 --files /path/hive-site.xml,/path/resources/hudiConf.conf --class 
com.app.workflows.RecordPartialUpdate 
lib/orders-poc-1.0.41-SNAPSHOT-shaded.jar/`
   
   I'm able to find class name in jar using linux command.
   `find /path/orders-poc-1.0.41-SNAPSHOT-shaded.jar|xargs grep 
CustomRecordUpdate
   Binary file /path/orders-poc-1.0.41-SNAPSHOT-shaded.jar matches`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] Sugamber commented on issue #2637: [SUPPORT] - Partial Update : update few columns of a table

2021-03-11 Thread GitBox


Sugamber commented on issue #2637:
URL: https://github.com/apache/hudi/issues/2637#issuecomment-797307517


   @n3nash  How can we pass custom class name? I copied same class as 
CustomRecordUpdate and tried to set during save. It throws class not found 
exception.
   ` writeDF.write.format("org.apache.hudi")
 .options(hudiConfMap)
 .option("hoodie.datasource.write.payload.class", 
classOf[CustomRecordUpdate].getName)
 .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "col1")
 .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "col2")
 .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "col3")
 .option(HoodieWriteConfig.TABLE_NAME, tableName.toString)
 .mode(SaveMode.Append)
 .save(tablePath.toString)`
   
   I tried all three ways of scala api but non of them worked.
   `classOf[CustomRecordUpdate].getCanonicalName
   classOf[CustomRecordUpdate].getName  
classOf[CustomRecordUpdate].getSimpleName`
   
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org