[jira] [Updated] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-25 Thread Vinoth Chandar (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth Chandar updated HUDI-625:

Status: Open  (was: New)

> Address performance concerns on DiskBasedMap.get() during upsert of thin 
> records
> 
>
> Key: HUDI-625
> URL: https://issues.apache.org/jira/browse/HUDI-625
> Project: Apache Hudi (incubating)
>  Issue Type: Improvement
>  Components: Performance, Writer Core
>Reporter: Vinoth Chandar
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.5.2
>
> Attachments: image-2020-02-20-23-34-24-155.png, 
> image-2020-02-20-23-34-27-466.png, image-2020-02-21-15-35-56-637.png, 
> image-2020-02-24-08-15-48-615.png, image-2020-02-24-08-17-33-739.png
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> [https://github.com/apache/incubator-hudi/issues/1328]
>  
>  So what's going on here is that each entry (single data field) is estimated 
> to be around 500-750 bytes in memory and things spill a lot... 
> {code:java}
> 20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 
> for 3675605,HoodieRecord{key=HoodieKey { recordKey=3675605 
> partitionPath=default}, currentLocation='HoodieRecordLocation 
> {instantTime=20200220225748, fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', 
> newLocation='HoodieRecordLocation {instantTime=20200220225921, 
> fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
>  
> {code:java}
> INFO HoodieMergeHandle: Number of entries in MemoryBasedMap => 150875
> Total size in bytes of MemoryBasedMap => 83886580
> Number of entries in DiskBasedMap => 2849125
> Size of file spilled to disk => 1067101739 {code}
> h2. Reproduce steps
>  
> {code:java}
> export SPARK_HOME=/home/dockeradmin/hudi/spark-2.4.4-bin-hadoop2.7
> ${SPARK_HOME}/bin/spark-shell \
> --executor-memory 6G \
> --packages 
> org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
>  \
> --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
> {code}
>  
> {code:java}
> val HUDI_FORMAT = "org.apache.hudi"
> val TABLE_NAME = "hoodie.table.name"
> val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
> val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
> val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
> val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
> val UPSERT_OPERATION_OPT_VAL = "upsert"
> val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
> val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
> val config = Map(
> "table_name" -> "example_table",
> "target" -> "file:///tmp/example_table/",
> "primary_key" ->  "id",
> "sort_key" -> "id"
> )
> val readPath = config("target") + "/*"val json_data = (1 to 400).map(i => 
> "{\"id\":" + i + "}")
> val jsonRDD = spark.sparkContext.parallelize(json_data, 2)
> val df1 = spark.read.json(jsonRDD)
> println(s"${df1.count()} records in source 1")
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).
>   option(BULK_INSERT_PARALLELISM, 1).
>   mode("Overwrite").
>   
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
>  records in Hudi table")
> // Runs very slow
> df1.limit(300).write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   save(config("target"))
> // Runs very slow
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
>  records in Hudi table")
> {code}
>  
>  
>  
> h2. *Analysis*
> h3. *Upsert (400 entries)*
> {code:java}
> WARN HoodieMergeHandle: 
> Number of entries in MemoryBasedMap => 150875 
> Total size in bytes of MemoryBasedMap => 83886580 
> Number of entries in DiskBasedMap => 3849125 
> Size of file spilled to disk => 1443046132
> {code}
> h3. Hang stackstrace (DiskBasedMap#get)
>  
> {code:java}
> "pool-21-thread-2" Id=696 cpuUsage=98% RUNNABLE
> at java.util.zip.ZipFile.getEntry(Native 

[jira] [Updated] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-25 Thread Vinoth Chandar (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth Chandar updated HUDI-625:

Fix Version/s: (was: 0.6.0)
   0.5.2

> Address performance concerns on DiskBasedMap.get() during upsert of thin 
> records
> 
>
> Key: HUDI-625
> URL: https://issues.apache.org/jira/browse/HUDI-625
> Project: Apache Hudi (incubating)
>  Issue Type: Improvement
>  Components: Performance, Writer Core
>Reporter: Vinoth Chandar
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.5.2
>
> Attachments: image-2020-02-20-23-34-24-155.png, 
> image-2020-02-20-23-34-27-466.png, image-2020-02-21-15-35-56-637.png, 
> image-2020-02-24-08-15-48-615.png, image-2020-02-24-08-17-33-739.png
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> [https://github.com/apache/incubator-hudi/issues/1328]
>  
>  So what's going on here is that each entry (single data field) is estimated 
> to be around 500-750 bytes in memory and things spill a lot... 
> {code:java}
> 20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 
> for 3675605,HoodieRecord{key=HoodieKey { recordKey=3675605 
> partitionPath=default}, currentLocation='HoodieRecordLocation 
> {instantTime=20200220225748, fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', 
> newLocation='HoodieRecordLocation {instantTime=20200220225921, 
> fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
>  
> {code:java}
> INFO HoodieMergeHandle: Number of entries in MemoryBasedMap => 150875
> Total size in bytes of MemoryBasedMap => 83886580
> Number of entries in DiskBasedMap => 2849125
> Size of file spilled to disk => 1067101739 {code}
> h2. Reproduce steps
>  
> {code:java}
> export SPARK_HOME=/home/dockeradmin/hudi/spark-2.4.4-bin-hadoop2.7
> ${SPARK_HOME}/bin/spark-shell \
> --executor-memory 6G \
> --packages 
> org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
>  \
> --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
> {code}
>  
> {code:java}
> val HUDI_FORMAT = "org.apache.hudi"
> val TABLE_NAME = "hoodie.table.name"
> val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
> val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
> val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
> val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
> val UPSERT_OPERATION_OPT_VAL = "upsert"
> val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
> val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
> val config = Map(
> "table_name" -> "example_table",
> "target" -> "file:///tmp/example_table/",
> "primary_key" ->  "id",
> "sort_key" -> "id"
> )
> val readPath = config("target") + "/*"val json_data = (1 to 400).map(i => 
> "{\"id\":" + i + "}")
> val jsonRDD = spark.sparkContext.parallelize(json_data, 2)
> val df1 = spark.read.json(jsonRDD)
> println(s"${df1.count()} records in source 1")
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).
>   option(BULK_INSERT_PARALLELISM, 1).
>   mode("Overwrite").
>   
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
>  records in Hudi table")
> // Runs very slow
> df1.limit(300).write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   save(config("target"))
> // Runs very slow
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
>  records in Hudi table")
> {code}
>  
>  
>  
> h2. *Analysis*
> h3. *Upsert (400 entries)*
> {code:java}
> WARN HoodieMergeHandle: 
> Number of entries in MemoryBasedMap => 150875 
> Total size in bytes of MemoryBasedMap => 83886580 
> Number of entries in DiskBasedMap => 3849125 
> Size of file spilled to disk => 1443046132
> {code}
> h3. Hang stackstrace (DiskBasedMap#get)
>  
> {code:java}
> "pool-21-thread-2" Id=696 cpuUsage=98% RUNNABLE
> at 

[jira] [Updated] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-23 Thread lamber-ken (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated HUDI-625:

Attachment: image-2020-02-24-08-15-48-615.png

> Address performance concerns on DiskBasedMap.get() during upsert of thin 
> records
> 
>
> Key: HUDI-625
> URL: https://issues.apache.org/jira/browse/HUDI-625
> Project: Apache Hudi (incubating)
>  Issue Type: Improvement
>  Components: Performance, Writer Core
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.6.0
>
> Attachments: image-2020-02-20-23-34-24-155.png, 
> image-2020-02-20-23-34-27-466.png, image-2020-02-21-15-35-56-637.png, 
> image-2020-02-24-08-15-48-615.png
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> [https://github.com/apache/incubator-hudi/issues/1328]
>  
>  So what's going on here is that each entry (single data field) is estimated 
> to be around 500-750 bytes in memory and things spill a lot... 
> {code:java}
> 20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 
> for 3675605,HoodieRecord{key=HoodieKey { recordKey=3675605 
> partitionPath=default}, currentLocation='HoodieRecordLocation 
> {instantTime=20200220225748, fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', 
> newLocation='HoodieRecordLocation {instantTime=20200220225921, 
> fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
>  
> {code:java}
> INFO HoodieMergeHandle: Number of entries in MemoryBasedMap => 150875
> Total size in bytes of MemoryBasedMap => 83886580
> Number of entries in DiskBasedMap => 2849125
> Size of file spilled to disk => 1067101739 {code}
> h2. Reproduce steps
>  
> {code:java}
> export SPARK_HOME=/home/dockeradmin/hudi/spark-2.4.4-bin-hadoop2.7
> ${SPARK_HOME}/bin/spark-shell \
> --executor-memory 6G \
> --packages 
> org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
>  \
> --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
> {code}
>  
> {code:java}
> val HUDI_FORMAT = "org.apache.hudi"
> val TABLE_NAME = "hoodie.table.name"
> val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
> val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
> val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
> val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
> val UPSERT_OPERATION_OPT_VAL = "upsert"
> val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
> val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
> val config = Map(
> "table_name" -> "example_table",
> "target" -> "file:///tmp/example_table/",
> "primary_key" ->  "id",
> "sort_key" -> "id"
> )
> val readPath = config("target") + "/*"val json_data = (1 to 400).map(i => 
> "{\"id\":" + i + "}")
> val jsonRDD = spark.sparkContext.parallelize(json_data, 2)
> val df1 = spark.read.json(jsonRDD)
> println(s"${df1.count()} records in source 1")
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).
>   option(BULK_INSERT_PARALLELISM, 1).
>   mode("Overwrite").
>   
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
>  records in Hudi table")
> // Runs very slow
> df1.limit(300).write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   save(config("target"))
> // Runs very slow
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
>  records in Hudi table")
> {code}
>  
>  
>  
> h2. *Analysis*
> h3. *Upsert (400 entries)*
> {code:java}
> WARN HoodieMergeHandle: 
> Number of entries in MemoryBasedMap => 150875 
> Total size in bytes of MemoryBasedMap => 83886580 
> Number of entries in DiskBasedMap => 3849125 
> Size of file spilled to disk => 1443046132
> {code}
> h3. Hang stackstrace (DiskBasedMap#get)
>  
> {code:java}
> "pool-21-thread-2" Id=696 cpuUsage=98% RUNNABLE
> at java.util.zip.ZipFile.getEntry(Native Method)
> at 

[jira] [Updated] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated HUDI-625:

Labels: pull-request-available  (was: )

> Address performance concerns on DiskBasedMap.get() during upsert of thin 
> records
> 
>
> Key: HUDI-625
> URL: https://issues.apache.org/jira/browse/HUDI-625
> Project: Apache Hudi (incubating)
>  Issue Type: Improvement
>  Components: Performance, Writer Core
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
>  Labels: pull-request-available
> Fix For: 0.6.0
>
> Attachments: image-2020-02-20-23-34-24-155.png, 
> image-2020-02-20-23-34-27-466.png, image-2020-02-21-15-35-56-637.png
>
>
> [https://github.com/apache/incubator-hudi/issues/1328]
>  
>  So what's going on here is that each entry (single data field) is estimated 
> to be around 500-750 bytes in memory and things spill a lot... 
> {code:java}
> 20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 
> for 3675605,HoodieRecord{key=HoodieKey { recordKey=3675605 
> partitionPath=default}, currentLocation='HoodieRecordLocation 
> {instantTime=20200220225748, fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', 
> newLocation='HoodieRecordLocation {instantTime=20200220225921, 
> fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
>  
> {code:java}
> INFO HoodieMergeHandle: Number of entries in MemoryBasedMap => 150875
> Total size in bytes of MemoryBasedMap => 83886580
> Number of entries in DiskBasedMap => 2849125
> Size of file spilled to disk => 1067101739 {code}
> h2. Reproduce steps
>  
> {code:java}
> export SPARK_HOME=/home/dockeradmin/hudi/spark-2.4.4-bin-hadoop2.7
> ${SPARK_HOME}/bin/spark-shell \
> --executor-memory 6G \
> --packages 
> org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
>  \
> --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
> {code}
>  
> {code:java}
> val HUDI_FORMAT = "org.apache.hudi"
> val TABLE_NAME = "hoodie.table.name"
> val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
> val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
> val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
> val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
> val UPSERT_OPERATION_OPT_VAL = "upsert"
> val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
> val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
> val config = Map(
> "table_name" -> "example_table",
> "target" -> "file:///tmp/example_table/",
> "primary_key" ->  "id",
> "sort_key" -> "id"
> )
> val readPath = config("target") + "/*"val json_data = (1 to 400).map(i => 
> "{\"id\":" + i + "}")
> val jsonRDD = spark.sparkContext.parallelize(json_data, 2)
> val df1 = spark.read.json(jsonRDD)
> println(s"${df1.count()} records in source 1")
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).
>   option(BULK_INSERT_PARALLELISM, 1).
>   mode("Overwrite").
>   
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
>  records in Hudi table")
> // Runs very slow
> df1.limit(300).write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   save(config("target"))
> // Runs very slow
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
>  records in Hudi table")
> {code}
>  
>  
>  
> h2. *Analysis*
> h3. *Upsert (400 entries)*
> {code:java}
> WARN HoodieMergeHandle: 
> Number of entries in MemoryBasedMap => 150875 
> Total size in bytes of MemoryBasedMap => 83886580 
> Number of entries in DiskBasedMap => 3849125 
> Size of file spilled to disk => 1443046132
> {code}
> h3. Hang stackstrace (DiskBasedMap#get)
>  
> {code:java}
> "pool-21-thread-2" Id=696 cpuUsage=98% RUNNABLE
> at java.util.zip.ZipFile.getEntry(Native Method)
> at java.util.zip.ZipFile.getEntry(ZipFile.java:310)
> -  locked 

[jira] [Updated] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-21 Thread Vinoth Chandar (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth Chandar updated HUDI-625:

Description: 
[https://github.com/apache/incubator-hudi/issues/1328]

 

 So what's going on here is that each entry (single data field) is estimated to 
be around 500-750 bytes in memory and things spill a lot... 
{code:java}
20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 for 
3675605,HoodieRecord{key=HoodieKey { recordKey=3675605 partitionPath=default}, 
currentLocation='HoodieRecordLocation {instantTime=20200220225748, 
fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', 
newLocation='HoodieRecordLocation {instantTime=20200220225921, 
fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
 
{code:java}
INFO HoodieMergeHandle: Number of entries in MemoryBasedMap => 150875
Total size in bytes of MemoryBasedMap => 83886580
Number of entries in DiskBasedMap => 2849125
Size of file spilled to disk => 1067101739 {code}
h2. Reproduce steps

 
{code:java}
export SPARK_HOME=/home/dockeradmin/hudi/spark-2.4.4-bin-hadoop2.7
${SPARK_HOME}/bin/spark-shell \
--executor-memory 6G \
--packages 
org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
{code}
 
{code:java}
val HUDI_FORMAT = "org.apache.hudi"
val TABLE_NAME = "hoodie.table.name"
val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
val UPSERT_OPERATION_OPT_VAL = "upsert"
val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
val config = Map(
"table_name" -> "example_table",
"target" -> "file:///tmp/example_table/",
"primary_key" ->  "id",
"sort_key" -> "id"
)
val readPath = config("target") + "/*"val json_data = (1 to 400).map(i => 
"{\"id\":" + i + "}")
val jsonRDD = spark.sparkContext.parallelize(json_data, 2)
val df1 = spark.read.json(jsonRDD)

println(s"${df1.count()} records in source 1")

df1.write.format(HUDI_FORMAT).
  option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
  option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
  option(TABLE_NAME, config("table_name")).
  option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).
  option(BULK_INSERT_PARALLELISM, 1).
  mode("Overwrite").
  
save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
 records in Hudi table")

// Runs very slow
df1.limit(300).write.format(HUDI_FORMAT).
  option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
  option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
  option(TABLE_NAME, config("table_name")).
  option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
  option(UPSERT_PARALLELISM, 20).
  mode("Append").
  save(config("target"))

// Runs very slow
df1.write.format(HUDI_FORMAT).
  option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
  option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
  option(TABLE_NAME, config("table_name")).
  option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
  option(UPSERT_PARALLELISM, 20).
  mode("Append").
  
save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
 records in Hudi table")
{code}
 

 

 
h2. *Analysis*
h3. *Upsert (400 entries)*
{code:java}
WARN HoodieMergeHandle: 
Number of entries in MemoryBasedMap => 150875 
Total size in bytes of MemoryBasedMap => 83886580 
Number of entries in DiskBasedMap => 3849125 
Size of file spilled to disk => 1443046132
{code}
h3. Hang stackstrace (DiskBasedMap#get)

 
{code:java}
"pool-21-thread-2" Id=696 cpuUsage=98% RUNNABLE
at java.util.zip.ZipFile.getEntry(Native Method)
at java.util.zip.ZipFile.getEntry(ZipFile.java:310)
-  locked java.util.jar.JarFile@1fc27ed4
at java.util.jar.JarFile.getEntry(JarFile.java:240)
at java.util.jar.JarFile.getJarEntry(JarFile.java:223)
at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1005)
at sun.misc.URLClassPath.getResource(URLClassPath.java:212)
at java.net.URLClassLoader$1.run(URLClassLoader.java:365)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
-  locked java.lang.Object@28f65251
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
-  locked 
scala.reflect.internal.util.ScalaClassLoader$URLClassLoader@a353dff
at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
-  locked 

[jira] [Updated] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-21 Thread Vinoth Chandar (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth Chandar updated HUDI-625:

Description: 
[https://github.com/apache/incubator-hudi/issues/1328]

 

 So what's going on here is that each entry (single data field) is estimated to 
be around 500-750 bytes in memory and things spill a lot... 
{code:java}
20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 for 
3675605,HoodieRecord{key=HoodieKey { recordKey=3675605 partitionPath=default}, 
currentLocation='HoodieRecordLocation {instantTime=20200220225748, 
fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', 
newLocation='HoodieRecordLocation {instantTime=20200220225921, 
fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
 
{code:java}
INFO HoodieMergeHandle: Number of entries in MemoryBasedMap => 150875Total size 
in bytes of MemoryBasedMap => 83886580Number of entries in DiskBasedMap => 
2849125Size of file spilled to disk => 1067101739 {code}
h2. Reproduce steps

 
{code:java}
export SPARK_HOME=/home/dockeradmin/hudi/spark-2.4.4-bin-hadoop2.7
${SPARK_HOME}/bin/spark-shell \
--executor-memory 6G \
--packages 
org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
{code}
 
{code:java}
val HUDI_FORMAT = "org.apache.hudi"
val TABLE_NAME = "hoodie.table.name"
val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
val UPSERT_OPERATION_OPT_VAL = "upsert"
val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
val config = Map(
"table_name" -> "example_table",
"target" -> "file:///tmp/example_table/",
"primary_key" ->  "id",
"sort_key" -> "id"
)
val readPath = config("target") + "/*"val json_data = (1 to 400).map(i => 
"{\"id\":" + i + "}")
val jsonRDD = spark.sparkContext.parallelize(json_data, 2)
val df1 = spark.read.json(jsonRDD)

println(s"${df1.count()} records in source 1")

df1.write.format(HUDI_FORMAT).
  option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
  option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
  option(TABLE_NAME, config("table_name")).
  option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).
  option(BULK_INSERT_PARALLELISM, 1).
  mode("Overwrite").
  
save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
 records in Hudi table")

// Runs very slow
df1.limit(300).write.format(HUDI_FORMAT).
  option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
  option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
  option(TABLE_NAME, config("table_name")).
  option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
  option(UPSERT_PARALLELISM, 20).
  mode("Append").
  save(config("target"))

// Runs very slow
df1.write.format(HUDI_FORMAT).
  option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
  option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
  option(TABLE_NAME, config("table_name")).
  option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
  option(UPSERT_PARALLELISM, 20).
  mode("Append").
  
save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
 records in Hudi table")
{code}
 

 

 
h2. *Analysis*
h3. *Upsert (400 entries)*
{code:java}
WARN HoodieMergeHandle: 
Number of entries in MemoryBasedMap => 150875 
Total size in bytes of MemoryBasedMap => 83886580 
Number of entries in DiskBasedMap => 3849125 
Size of file spilled to disk => 1443046132
{code}
h3. Hang stackstrace (DiskBasedMap#get)

 
{code:java}
"pool-21-thread-2" Id=696 cpuUsage=98% RUNNABLE
at java.util.zip.ZipFile.getEntry(Native Method)
at java.util.zip.ZipFile.getEntry(ZipFile.java:310)
-  locked java.util.jar.JarFile@1fc27ed4
at java.util.jar.JarFile.getEntry(JarFile.java:240)
at java.util.jar.JarFile.getJarEntry(JarFile.java:223)
at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1005)
at sun.misc.URLClassPath.getResource(URLClassPath.java:212)
at java.net.URLClassLoader$1.run(URLClassLoader.java:365)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
-  locked java.lang.Object@28f65251
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
-  locked 
scala.reflect.internal.util.ScalaClassLoader$URLClassLoader@a353dff
at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
-  locked 

[jira] [Updated] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-20 Thread lamber-ken (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated HUDI-625:

Description: 
[https://github.com/apache/incubator-hudi/issues/1328]

 

 So what's going on here is that each entry (single data field) is estimated to 
be around 500-750 bytes in memory and things spill a lot... 
{code:java}
20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 for 
3675605,HoodieRecord{key=HoodieKey { recordKey=3675605 partitionPath=default}, 
currentLocation='HoodieRecordLocation {instantTime=20200220225748, 
fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', 
newLocation='HoodieRecordLocation {instantTime=20200220225921, 
fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
 
h2. Reproduce steps

 
{code:java}
export SPARK_HOME=/home/dockeradmin/hudi/spark-2.4.4-bin-hadoop2.7
${SPARK_HOME}/bin/spark-shell \
--executor-memory 6G \
--packages 
org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
{code}
 
{code:java}
val HUDI_FORMAT = "org.apache.hudi"
val TABLE_NAME = "hoodie.table.name"
val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
val UPSERT_OPERATION_OPT_VAL = "upsert"
val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
val config = Map(
"table_name" -> "example_table",
"target" -> "file:///tmp/example_table/",
"primary_key" ->  "id",
"sort_key" -> "id"
)
val readPath = config("target") + "/*"val json_data = (1 to 400).map(i => 
"{\"id\":" + i + "}")
val jsonRDD = spark.sparkContext.parallelize(json_data, 2)
val df1 = spark.read.json(jsonRDD)

println(s"${df1.count()} records in source 1")

df1.write.format(HUDI_FORMAT).
  option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
  option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
  option(TABLE_NAME, config("table_name")).
  option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).
  option(BULK_INSERT_PARALLELISM, 1).
  mode("Overwrite").
  
save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
 records in Hudi table")

// Runs very slow
df1.limit(300).write.format(HUDI_FORMAT).
  option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
  option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
  option(TABLE_NAME, config("table_name")).
  option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
  option(UPSERT_PARALLELISM, 20).
  mode("Append").
  save(config("target"))

// Runs very slow
df1.write.format(HUDI_FORMAT).
  option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
  option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
  option(TABLE_NAME, config("table_name")).
  option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
  option(UPSERT_PARALLELISM, 20).
  mode("Append").
  
save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
 records in Hudi table")
{code}
 

 

 
h2. *Analysis*
h3. *Upsert (400 entries)*
{code:java}
WARN HoodieMergeHandle: 
Number of entries in MemoryBasedMap => 150875 
Total size in bytes of MemoryBasedMap => 83886580 
Number of entries in DiskBasedMap => 3849125 
Size of file spilled to disk => 1443046132
{code}
h3. Hang stackstrace (DiskBasedMap#get)

 
{code:java}
"pool-21-thread-2" Id=696 cpuUsage=98% RUNNABLE
at java.util.zip.ZipFile.getEntry(Native Method)
at java.util.zip.ZipFile.getEntry(ZipFile.java:310)
-  locked java.util.jar.JarFile@1fc27ed4
at java.util.jar.JarFile.getEntry(JarFile.java:240)
at java.util.jar.JarFile.getJarEntry(JarFile.java:223)
at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1005)
at sun.misc.URLClassPath.getResource(URLClassPath.java:212)
at java.net.URLClassLoader$1.run(URLClassLoader.java:365)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
-  locked java.lang.Object@28f65251
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
-  locked 
scala.reflect.internal.util.ScalaClassLoader$URLClassLoader@a353dff
at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
-  locked com.esotericsoftware.reflectasm.AccessClassLoader@2c7122e2
at 
com.esotericsoftware.reflectasm.AccessClassLoader.loadClass(AccessClassLoader.java:92)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at 

[jira] [Updated] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-20 Thread lamber-ken (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated HUDI-625:

Attachment: image-2020-02-21-15-35-56-637.png

> Address performance concerns on DiskBasedMap.get() during upsert of thin 
> records
> 
>
> Key: HUDI-625
> URL: https://issues.apache.org/jira/browse/HUDI-625
> Project: Apache Hudi (incubating)
>  Issue Type: Improvement
>  Components: Performance, Writer Core
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
> Fix For: 0.6.0
>
> Attachments: image-2020-02-20-23-34-24-155.png, 
> image-2020-02-20-23-34-27-466.png, image-2020-02-21-15-35-56-637.png
>
>
> [https://github.com/apache/incubator-hudi/issues/1328]
>  
>  So what's going on here is that each entry (single data field) is estimated 
> to be around 500-750 bytes in memory and things spill a lot... 
> {code:java}
> 20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 
> for 3675605,HoodieRecord{key=HoodieKey { recordKey=3675605 
> partitionPath=default}, currentLocation='HoodieRecordLocation 
> {instantTime=20200220225748, fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', 
> newLocation='HoodieRecordLocation {instantTime=20200220225921, 
> fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
>  
> This is not too far from reality 
> !image-2020-02-20-23-34-27-466.png|width=952,height=58!
> !image-2020-02-20-23-34-24-155.png|width=975,height=19!
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-20 Thread lamber-ken (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lamber-ken updated HUDI-625:

Description: 
[https://github.com/apache/incubator-hudi/issues/1328]

 

 So what's going on here is that each entry (single data field) is estimated to 
be around 500-750 bytes in memory and things spill a lot... 
{code:java}
20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 for 
3675605,HoodieRecord{key=HoodieKey { recordKey=3675605 partitionPath=default}, 
currentLocation='HoodieRecordLocation {instantTime=20200220225748, 
fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', 
newLocation='HoodieRecordLocation {instantTime=20200220225921, 
fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
 
h2. Reproduce steps

 
{code:java}
export SPARK_HOME=/home/dockeradmin/hudi/spark-2.4.4-bin-hadoop2.7
${SPARK_HOME}/bin/spark-shell \
--executor-memory 6G \
--packages 
org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
{code}
 
{code:java}
val HUDI_FORMAT = "org.apache.hudi"
val TABLE_NAME = "hoodie.table.name"
val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
val UPSERT_OPERATION_OPT_VAL = "upsert"
val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
val config = Map(
"table_name" -> "example_table",
"target" -> "file:///tmp/example_table/",
"primary_key" ->  "id",
"sort_key" -> "id"
)
val readPath = config("target") + "/*"val json_data = (1 to 400).map(i => 
"{\"id\":" + i + "}")
val jsonRDD = spark.sparkContext.parallelize(json_data, 2)
val df1 = spark.read.json(jsonRDD)

println(s"${df1.count()} records in source 1")

df1.write.format(HUDI_FORMAT).
  option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
  option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
  option(TABLE_NAME, config("table_name")).
  option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).
  option(BULK_INSERT_PARALLELISM, 1).
  mode("Overwrite").
  
save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
 records in Hudi table")

// Runs very slow
df1.limit(300).write.format(HUDI_FORMAT).
  option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
  option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
  option(TABLE_NAME, config("table_name")).
  option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
  option(UPSERT_PARALLELISM, 20).
  mode("Append").
  save(config("target"))

// Runs very slow
df1.write.format(HUDI_FORMAT).
  option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
  option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
  option(TABLE_NAME, config("table_name")).
  option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
  option(UPSERT_PARALLELISM, 20).
  mode("Append").
  
save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
 records in Hudi table")
{code}
 

 

 
h2. *Analysis*
h3. *Upsert (400 entries)*
{code:java}
WARN HoodieMergeHandle: 
Number of entries in MemoryBasedMap => 150875 
Total size in bytes of MemoryBasedMap => 83886580 
Number of entries in DiskBasedMap => 3849125 
Size of file spilled to disk => 1443046132
{code}
h3. Hang stackstrace (DiskBasedMap#get)

 
{code:java}
"pool-21-thread-2" Id=696 cpuUsage=98% RUNNABLE
at java.util.zip.ZipFile.getEntry(Native Method)
at java.util.zip.ZipFile.getEntry(ZipFile.java:310)
-  locked java.util.jar.JarFile@1fc27ed4
at java.util.jar.JarFile.getEntry(JarFile.java:240)
at java.util.jar.JarFile.getJarEntry(JarFile.java:223)
at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1005)
at sun.misc.URLClassPath.getResource(URLClassPath.java:212)
at java.net.URLClassLoader$1.run(URLClassLoader.java:365)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
-  locked java.lang.Object@28f65251
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
-  locked 
scala.reflect.internal.util.ScalaClassLoader$URLClassLoader@a353dff
at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
-  locked com.esotericsoftware.reflectasm.AccessClassLoader@2c7122e2
at 
com.esotericsoftware.reflectasm.AccessClassLoader.loadClass(AccessClassLoader.java:92)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at 

[jira] [Updated] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records

2020-02-20 Thread Vinoth Chandar (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth Chandar updated HUDI-625:

Summary: Address performance concerns on DiskBasedMap.get() during upsert 
of thin records  (was: Address performance concerns on DiskBasedMap.get() 
during upsert of small workload )

> Address performance concerns on DiskBasedMap.get() during upsert of thin 
> records
> 
>
> Key: HUDI-625
> URL: https://issues.apache.org/jira/browse/HUDI-625
> Project: Apache Hudi (incubating)
>  Issue Type: Improvement
>  Components: Performance, Writer Core
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
> Fix For: 0.6.0
>
> Attachments: image-2020-02-20-23-34-24-155.png, 
> image-2020-02-20-23-34-27-466.png
>
>
> [https://github.com/apache/incubator-hudi/issues/1328]
>  
>  So what's going on here is that each entry (single data field) is estimated 
> to be around 500-750 bytes in memory and things spill a lot... 
> {code:java}
> 20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 
> for 3675605,HoodieRecord{key=HoodieKey { recordKey=3675605 
> partitionPath=default}, currentLocation='HoodieRecordLocation 
> {instantTime=20200220225748, fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', 
> newLocation='HoodieRecordLocation {instantTime=20200220225921, 
> fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
>  
> This is not too far from reality 
> !image-2020-02-20-23-34-27-466.png|width=952,height=58!
> !image-2020-02-20-23-34-24-155.png|width=975,height=19!
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)