[GitHub] [incubator-hudi] adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi

2020-02-14 Thread GitBox
adamjoneill edited a comment on issue #1325: presto - querying nested object in 
parquet file created by hudi
URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-586284798
 
 
   @vinothchandar I've managed to reproduce with a simple spark.parallelize() 
example.
   
   test.scala
   ```
   import org.apache.spark._
   import org.apache.spark.sql._
   import org.apache.spark.sql.functions.{month, year, col, dayofmonth}
   import org.apache.spark.storage.StorageLevel
   import org.apache.spark.streaming.{Milliseconds, StreamingContext}
   import org.apache.spark.streaming.Duration
   import org.apache.spark.streaming.kinesis._
   import org.apache.spark.streaming.kinesis.KinesisInputDStream
   import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
   import org.apache.hudi.DataSourceWriteOptions
   import org.apache.hudi.config.HoodieWriteConfig
   import org.apache.hudi.hive.MultiPartKeysValueExtractor
   
   import org.apache.spark.sql.types._
   import org.apache.spark.sql.DataFrame
   
   case class Bar(id: Int, name: String)
   
   // choose one of the following
   case class Foo(id: Int, bar: Bar) // with simple
   // case class Foo(bar: Bar) // withOUT simple
   
   case class Root(id: Int, foos: Array[Foo])
   
   object HudiScalaStreamHelloWorld {
   
   def main(args: Array[String]): Unit = {  
   val appName = "ScalaStreamExample"
   val batchInterval = Milliseconds(2000)
   val spark = SparkSession
   .builder()
   .appName(appName)
   .getOrCreate()
   
   val sparkContext = spark.sparkContext
   val streamingContext = new StreamingContext(sparkContext, 
batchInterval)
   
   import spark.implicits._
   val sc = sparkContext
   
   
   // choose one of the following
   val dataFrame = sc.parallelize(Seq(Root(1, Array(Foo(1, Bar(1, 
"OneBar")).toDF() // with simple
   // val dataFrame = sc.parallelize(Seq(Root(1, Array(Foo(Bar(1, 
"OneBar")).toDF() // withOUT simple
   
   dataFrame.printSchema()
   
   val hudiTableName = "order"
   val hudiTablePath = "s3://xxx/path/" + hudiTableName
   
   // Set up our Hudi Data Source Options
   val hudiOptions = Map[String,String](
   DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
   HoodieWriteConfig.TABLE_NAME -> hudiTableName, 
   DataSourceWriteOptions.OPERATION_OPT_KEY ->
   DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, 
   DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "id")
   
   
dataFrame.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath)
   }
   }
   
   ```
   
   deploy.sh
   ```
   sbt clean
   sbt package
   
   aws s3 cp ./target/scala-2.11/simple-project_2.11-1.0.jar 
s3://./simple-project_2.11-1.0.jar
   
   aws emr add-steps --cluster-id j-AZQBZK81NAFT --steps 
Type=spark,Name=SimpleHudiTest,Args=[\
   --deploy-mode,cluster,\
   --master,yarn,\
   
--packages,\'org.apache.hudi:hudi-spark-bundle:0.5.0-incubating,org.apache.spark:spark-avro_2.11:2.4.4\',\
   --conf,spark.yarn.submit.waitAppCompletion=false,\
   --conf,yarn.log-aggregation-enable=true,\
   --conf,spark.dynamicAllocation.enabled=true,\
   --conf,spark.cores.max=4,\
   --conf,spark.network.timeout=300,\
   --conf,spark.serializer=org.apache.spark.serializer.KryoSerializer,\
   --conf,spark.sql.hive.convertMetastoreParquet=false,\
   --class,HudiScalaStreamHelloWorld,\
   s3://.xxx/simple-project_2.11-1.0.jar\
   ],ActionOnFailure=CONTINUE
   ```
   
   build.sbt
   ```
   name := "Simple Project"
   
   version := "1.0"
   
   scalaVersion := "2.11.12"
   
   libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4"
   libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.4"
   libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % 
"2.4.4"
   libraryDependencies += "org.apache.hudi" % "hudi-spark-bundle" % 
"0.5.0-incubating"
   
   
   scalacOptions := Seq("-unchecked", "-deprecation")
   
   ```
   
   AWS glue job runs over the output s3 directory. 
   
   From the presto EMR instance the result when simple object included on the 
array item:
   ```
   presto:schema> select * from default;
_hoodie_commit_time | _hoodie_commit_seqno | _hoodie_record_key | 
_hoodie_partition_path |  _hoodie_file_name 
 | id |   foos
   
-+--+++-++---
20200214112552  | 20200214112552_0_1   | 1  | default   
 | 

[GitHub] [incubator-hudi] adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi

2020-02-14 Thread GitBox
adamjoneill edited a comment on issue #1325: presto - querying nested object in 
parquet file created by hudi
URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-586284798
 
 
   @vinothchandar I've managed to reproduce with a simple spark.parallelize() 
example.
   
   test.scala
   ```
   import org.apache.spark._
   import org.apache.spark.sql._
   import org.apache.spark.sql.functions.{month, year, col, dayofmonth}
   import org.apache.spark.storage.StorageLevel
   import org.apache.spark.streaming.{Milliseconds, StreamingContext}
   import org.apache.spark.streaming.Duration
   import org.apache.spark.streaming.kinesis._
   import org.apache.spark.streaming.kinesis.KinesisInputDStream
   import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
   import org.apache.hudi.DataSourceWriteOptions
   import org.apache.hudi.config.HoodieWriteConfig
   import org.apache.hudi.hive.MultiPartKeysValueExtractor
   
   import org.apache.spark.sql.types._
   import org.apache.spark.sql.DataFrame
   
   
   case class Bar(id: Int, name: String)
   
   // Uncomment following section based on example
   
   // START - Simple object included in array item
   case class Foo(id: Int, bar: Bar) // foo with simple object
   // END - Simple object included in array item
   
   // START - Simple object not present in array item
   // missing the id: Int property
   // case class Foo(bar: Bar) // foo without simple object
   // END - Simple object not present in array item
   
   
   case class Root(id: Int, foos: Array[Foo])
   
   object HudiScalaStreamHelloWorld {
   
   def main(args: Array[String]): Unit = {  
   val appName = "ScalaStreamExample"
   val batchInterval = Milliseconds(2000)
   val spark = SparkSession
   .builder()
   .appName(appName)
   .getOrCreate()
   
   val sparkContext = spark.sparkContext
   val streamingContext = new StreamingContext(sparkContext, 
batchInterval)
   
   import spark.implicits._
   val sc = sparkContext
   
   case class Bar(id: Int, name: String)
   
   // Uncomment following section based on example
   
   // START - Simple object included in array item
   // with simple item on foo in array
   // val dataFrame = sc.parallelize(Seq(Root(1, Array(Foo(1, Bar(1, 
"OneBar")).toDF()
   // END - Simple object included in array item
   
   // START - Simple object not present in array item
   // without simple item on foo in array
   val dataFrame = sc.parallelize(Seq(Root(1, Array(Foo(Bar(1, 
"OneBar")).toDF()
   // END - Simple object not present in array item
   
   dataFrame.printSchema()
   
   val hudiTableName = "order"
   val hudiTablePath = "s3://xxx-/path/" + hudiTableName
   
   // Set up our Hudi Data Source Options
   val hudiOptions = Map[String,String](
   DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
   HoodieWriteConfig.TABLE_NAME -> hudiTableName, 
   DataSourceWriteOptions.OPERATION_OPT_KEY ->
   DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, 
   DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "id")
   
   
dataFrame.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath)
   }
   }
   ```
   
   deploy.sh
   ```
   sbt clean
   sbt package
   
   aws s3 cp ./target/scala-2.11/simple-project_2.11-1.0.jar 
s3://./simple-project_2.11-1.0.jar
   
   aws emr add-steps --cluster-id j-AZQBZK81NAFT --steps 
Type=spark,Name=SimpleHudiTest,Args=[\
   --deploy-mode,cluster,\
   --master,yarn,\
   
--packages,\'org.apache.hudi:hudi-spark-bundle:0.5.0-incubating,org.apache.spark:spark-avro_2.11:2.4.4\',\
   --conf,spark.yarn.submit.waitAppCompletion=false,\
   --conf,yarn.log-aggregation-enable=true,\
   --conf,spark.dynamicAllocation.enabled=true,\
   --conf,spark.cores.max=4,\
   --conf,spark.network.timeout=300,\
   --conf,spark.serializer=org.apache.spark.serializer.KryoSerializer,\
   --conf,spark.sql.hive.convertMetastoreParquet=false,\
   --class,HudiScalaStreamHelloWorld,\
   s3://.xxx/simple-project_2.11-1.0.jar\
   ],ActionOnFailure=CONTINUE
   ```
   
   build.sbt
   ```
   name := "Simple Project"
   
   version := "1.0"
   
   scalaVersion := "2.11.12"
   
   libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4"
   libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.4"
   libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % 
"2.4.4"
   libraryDependencies += "org.apache.hudi" % "hudi-spark-bundle" % 
"0.5.0-incubating"
   
   
   scalacOptions := Seq("-unchecked", "-deprecation")
   
   ```
   
   AWS glue job runs over the output s3 directory. 
   
   From the presto EMR instance the result when simple 

[GitHub] [incubator-hudi] adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi

2020-02-14 Thread GitBox
adamjoneill edited a comment on issue #1325: presto - querying nested object in 
parquet file created by hudi
URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-586284798
 
 
   @vinothchandar I've managed to reproduce with a simple spark.parallelize() 
example.
   
   test.scala
   ```
   import org.apache.spark._
   import org.apache.spark.sql._
   import org.apache.spark.sql.functions.{month, year, col, dayofmonth}
   import org.apache.spark.storage.StorageLevel
   import org.apache.spark.streaming.{Milliseconds, StreamingContext}
   import org.apache.spark.streaming.Duration
   import org.apache.spark.streaming.kinesis._
   import org.apache.spark.streaming.kinesis.KinesisInputDStream
   import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
   import org.apache.hudi.DataSourceWriteOptions
   import org.apache.hudi.config.HoodieWriteConfig
   import org.apache.hudi.hive.MultiPartKeysValueExtractor
   
   import org.apache.spark.sql.types._
   import org.apache.spark.sql.DataFrame
   
   object HudiScalaStreamHelloWorld {
   
   def main(args: Array[String]): Unit = {  
   val appName = "ScalaStreamExample"
   val batchInterval = Milliseconds(2000)
   val spark = SparkSession
   .builder()
   .appName(appName)
   .getOrCreate()
   
   val sparkContext = spark.sparkContext
   val streamingContext = new StreamingContext(sparkContext, 
batchInterval)
   
   import spark.implicits._
   val sc = sparkContext
   
   case class Bar(id: Int, name: String)
   
   // Uncomment following section based on example
   
   // START - Simple object included in array item
   // case class Foo(id: Int, bar: Bar) // foo with simple object
   // END - Simple object included in array item
   
   // START - Simple object not present in array item
   // missing the id: Int property
   case class Foo(bar: Bar) // foo without simple object
   // END - Simple object not present in array item
   
   
   case class Root(id: Int, foos: Array[Foo])
   
   // Uncomment following section based on example
   
   // START - Simple object included in array item
   // with simple item on foo in array
   // val dataFrame = sc.parallelize(Seq(Root(1, Array(Foo(1, Bar(1, 
"OneBar")).toDF()
   // END - Simple object included in array item
   
   // START - Simple object not present in array item
   // without simple item on foo in array
   val dataFrame = sc.parallelize(Seq(Root(1, Array(Foo(Bar(1, 
"OneBar")).toDF()
   // END - Simple object not present in array item
   
   dataFrame.printSchema()
   
   val hudiTableName = "order"
   val hudiTablePath = "s3://xxx-/path/" + hudiTableName
   
   // Set up our Hudi Data Source Options
   val hudiOptions = Map[String,String](
   DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
   HoodieWriteConfig.TABLE_NAME -> hudiTableName, 
   DataSourceWriteOptions.OPERATION_OPT_KEY ->
   DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, 
   DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "id")
   
   
dataFrame.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath)
   }
   }
   ```
   
   deploy.sh
   ```
   sbt clean
   sbt package
   
   aws s3 cp ./target/scala-2.11/simple-project_2.11-1.0.jar 
s3://./simple-project_2.11-1.0.jar
   
   aws emr add-steps --cluster-id j-AZQBZK81NAFT --steps 
Type=spark,Name=SimpleHudiTest,Args=[\
   --deploy-mode,cluster,\
   --master,yarn,\
   
--packages,\'org.apache.hudi:hudi-spark-bundle:0.5.0-incubating,org.apache.spark:spark-avro_2.11:2.4.4\',\
   --conf,spark.yarn.submit.waitAppCompletion=false,\
   --conf,yarn.log-aggregation-enable=true,\
   --conf,spark.dynamicAllocation.enabled=true,\
   --conf,spark.cores.max=4,\
   --conf,spark.network.timeout=300,\
   --conf,spark.serializer=org.apache.spark.serializer.KryoSerializer,\
   --conf,spark.sql.hive.convertMetastoreParquet=false,\
   --class,HudiScalaStreamHelloWorld,\
   s3://.xxx/simple-project_2.11-1.0.jar\
   ],ActionOnFailure=CONTINUE
   ```
   
   build.sbt
   ```
   name := "Simple Project"
   
   version := "1.0"
   
   scalaVersion := "2.11.12"
   
   libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4"
   libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.4"
   libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % 
"2.4.4"
   libraryDependencies += "org.apache.hudi" % "hudi-spark-bundle" % 
"0.5.0-incubating"
   
   
   scalacOptions := Seq("-unchecked", "-deprecation")
   
   ```
   
   AWS glue job runs over the output s3 directory. 
   
   From the 

[GitHub] [incubator-hudi] adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi

2020-02-13 Thread GitBox
adamjoneill edited a comment on issue #1325: presto - querying nested object in 
parquet file created by hudi
URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-585777129
 
 
   Had some success, if i added a property to the object of simple type ie. not 
a nested object hudi saves the parquet and the presto query (select * and 
nested) appear to work.
   
   ```
   const valueToUse = {
 ...order,
 tickets: {
   items: [
 {
   dummyId: 2, // <-- added this property
   ticketType: {
 id: 258815,
   },
 },
   ],
 },
   };
   ```
   
   
   Just to be sure, i saved the json to parquet using
   
   ```
   dataFrame.withColumn("year", year(col("eventTimestamp")))
   .withColumn("month", month(col("eventTimestamp")))
   .withColumn("day", dayofmonth(col("eventTimestamp")))
   .write
   .mode(SaveMode.Append)
   .partitionBy(typeOfEventColumnName, "year", "month", 
"day")
   .parquet(rawOutPath)
   ```
   
   had AWS glue crawl over it to create a table. Then ran a query in presto 
against the table  and `select * from table` worked fine, whereas on the hudi 
parquet it failed 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi

2020-02-13 Thread GitBox
adamjoneill edited a comment on issue #1325: presto - querying nested object in 
parquet file created by hudi
URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-585777129
 
 
   Had some success, if i added a property to the object of simple type ie. not 
a nested object hudi saves the parquet and the presto query (select * and 
nested) appear to work.
   
   ```
   const valueToUse = {
 ...order,
 tickets: {
   items: [
 {
   dummyId: 2, // <-- added this property
   ticketType: {
 id: 258815,
   },
 },
   ],
 },
   };
   ```
   
   
   Just to be sure, i saved the rdd to parquet using
   
   ```
   dataFrame.withColumn("year", year(col("eventTimestamp")))
   .withColumn("month", month(col("eventTimestamp")))
   .withColumn("day", dayofmonth(col("eventTimestamp")))
   .write
   .mode(SaveMode.Append)
   .partitionBy(typeOfEventColumnName, "year", "month", 
"day")
   .parquet(rawOutPath)
   ```
   
   had AWS glue crawl over it to create a table. Then ran a query in presto 
against the table  and `select * from table` worked fine, whereas on the hudi 
parquet it failed 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi

2020-02-13 Thread GitBox
adamjoneill edited a comment on issue #1325: presto - querying nested object in 
parquet file created by hudi
URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-585777129
 
 
   Had some success, if i added a property to the object of simple type ie. not 
a nested object hudi saves the parquet and the presto query (select * and 
nested) appear to work.
   
   ```
   const valueToUse = {
 ...order,
 tickets: {
   items: [
 {
   dummyId: 2, // <-- added this property
   ticketType: {
 id: 258815,
   },
 },
   ],
 },
   };
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi

2020-02-13 Thread GitBox
adamjoneill edited a comment on issue #1325: presto - querying nested object in 
parquet file created by hudi
URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-585748045
 
 
   Futher update
   I've narrowed it down to the array on the nested object. 
   
   The following works when i place the following object on the kinesis stream
   
   ```
   const valueToUse = {
 ...order,
 tickets: {
   items: [],
 },
   };
   ```
   
   but when placed on the kinesis stream with an object in the array it fails
   
   ```
   const valueToUse = {
 ...order,
 tickets: {
   items: [
 {
   ticketType: {
 id: 258815,
   },
 },
   ],
 },
   };
   ```
   
   error message and stack trace when running `select * from table`
   
   ```
   Query 20200213_131029_00032_hej8h failed: No value present
   java.util.NoSuchElementException: No value present
at java.util.Optional.get(Optional.java:135)
at 
com.facebook.presto.parquet.reader.ParquetReader.readArray(ParquetReader.java:156)
at 
com.facebook.presto.parquet.reader.ParquetReader.readColumnChunk(ParquetReader.java:282)
at 
com.facebook.presto.parquet.reader.ParquetReader.readStruct(ParquetReader.java:193)
at 
com.facebook.presto.parquet.reader.ParquetReader.readColumnChunk(ParquetReader.java:276)
at 
com.facebook.presto.parquet.reader.ParquetReader.readStruct(ParquetReader.java:193)
at 
com.facebook.presto.parquet.reader.ParquetReader.readColumnChunk(ParquetReader.java:276)
at 
com.facebook.presto.parquet.reader.ParquetReader.readStruct(ParquetReader.java:193)
at 
com.facebook.presto.parquet.reader.ParquetReader.readColumnChunk(ParquetReader.java:276)
at 
com.facebook.presto.parquet.reader.ParquetReader.readBlock(ParquetReader.java:268)
at 
com.facebook.presto.hive.parquet.ParquetPageSource$ParquetBlockLoader.load(ParquetPageSource.java:247)
at 
com.facebook.presto.hive.parquet.ParquetPageSource$ParquetBlockLoader.load(ParquetPageSource.java:225)
at 
com.facebook.presto.spi.block.LazyBlock.assureLoaded(LazyBlock.java:283)
at 
com.facebook.presto.spi.block.LazyBlock.getLoadedBlock(LazyBlock.java:274)
at com.facebook.presto.spi.Page.getLoadedPage(Page.java:261)
at 
com.facebook.presto.operator.TableScanOperator.getOutput(TableScanOperator.java:254)
at com.facebook.presto.operator.Driver.processInternal(Driver.java:379)
at 
com.facebook.presto.operator.Driver.lambda$processFor$8(Driver.java:283)
at com.facebook.presto.operator.Driver.tryWithLock(Driver.java:675)
at com.facebook.presto.operator.Driver.processFor(Driver.java:276)
at 
com.facebook.presto.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1077)
at 
com.facebook.presto.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:162)
at 
com.facebook.presto.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:483)
at 
com.facebook.presto.$gen.Presto_0_22720200211_134743_1.run(Unknown Source)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi

2020-02-13 Thread GitBox
adamjoneill edited a comment on issue #1325: presto - querying nested object in 
parquet file created by hudi
URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-585748045
 
 
   Futher update
   I've narrowed it down to the array on the nested object. 
   
   The following works when i place the following object on the kinesis stream
   
   ```
   const valueToUse = {
 ...order,
 tickets: {
   items: [],
 },
   };
   ```
   
   but when placed on the kinesis stream with an object in the array it fails
   
   ```
   const valueToUse = {
 ...order,
 tickets: {
   items: [
 {
   ticketType: {
 id: 258815,
   },
 },
   ],
 },
   };
   ```
   
   error message and stack trace
   
   ```
   Query 20200213_131029_00032_hej8h failed: No value present
   java.util.NoSuchElementException: No value present
at java.util.Optional.get(Optional.java:135)
at 
com.facebook.presto.parquet.reader.ParquetReader.readArray(ParquetReader.java:156)
at 
com.facebook.presto.parquet.reader.ParquetReader.readColumnChunk(ParquetReader.java:282)
at 
com.facebook.presto.parquet.reader.ParquetReader.readStruct(ParquetReader.java:193)
at 
com.facebook.presto.parquet.reader.ParquetReader.readColumnChunk(ParquetReader.java:276)
at 
com.facebook.presto.parquet.reader.ParquetReader.readStruct(ParquetReader.java:193)
at 
com.facebook.presto.parquet.reader.ParquetReader.readColumnChunk(ParquetReader.java:276)
at 
com.facebook.presto.parquet.reader.ParquetReader.readStruct(ParquetReader.java:193)
at 
com.facebook.presto.parquet.reader.ParquetReader.readColumnChunk(ParquetReader.java:276)
at 
com.facebook.presto.parquet.reader.ParquetReader.readBlock(ParquetReader.java:268)
at 
com.facebook.presto.hive.parquet.ParquetPageSource$ParquetBlockLoader.load(ParquetPageSource.java:247)
at 
com.facebook.presto.hive.parquet.ParquetPageSource$ParquetBlockLoader.load(ParquetPageSource.java:225)
at 
com.facebook.presto.spi.block.LazyBlock.assureLoaded(LazyBlock.java:283)
at 
com.facebook.presto.spi.block.LazyBlock.getLoadedBlock(LazyBlock.java:274)
at com.facebook.presto.spi.Page.getLoadedPage(Page.java:261)
at 
com.facebook.presto.operator.TableScanOperator.getOutput(TableScanOperator.java:254)
at com.facebook.presto.operator.Driver.processInternal(Driver.java:379)
at 
com.facebook.presto.operator.Driver.lambda$processFor$8(Driver.java:283)
at com.facebook.presto.operator.Driver.tryWithLock(Driver.java:675)
at com.facebook.presto.operator.Driver.processFor(Driver.java:276)
at 
com.facebook.presto.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1077)
at 
com.facebook.presto.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:162)
at 
com.facebook.presto.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:483)
at 
com.facebook.presto.$gen.Presto_0_22720200211_134743_1.run(Unknown Source)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi

2020-02-12 Thread GitBox
adamjoneill edited a comment on issue #1325: presto - querying nested object in 
parquet file created by hudi
URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-585338502
 
 
   i've managed to narrow down the issue to the data that is coming off the 
kinesis stream.
   
   when i replace the data from the stream with some test data as follows
   
   with the following code:
   
   ```
if (!rdd.isEmpty()){
   val json = rdd.map(record=>new String(record))
   val dataFrame = spark.read.json(json)
   dataFrame.printSchema();
   dataFrame.show();
   
   
   val hudiTableName = "order"
   val hudiTablePath = path + hudiTableName
   
   val hudiOptions = Map[String,String](
   DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
   HoodieWriteConfig.TABLE_NAME -> hudiTableName, 
   DataSourceWriteOptions.OPERATION_OPT_KEY -> 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, 
   DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "id")
   
   // Write data into the Hudi dataset
 
dataFrame.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath)
   }
   ```
   
   i replaced
   
   ```
val dataFrame = spark.read.json(json)
   ```
   
   with
   
   ```
   val dataFrame = sparkContext.parallelize(Seq(Foo(1, Bar(1, "first")), 
Foo(2, Bar(2, "second".toDF()
   ```
   
   and the `select * from table` worked as well as nested query `select id, 
bar.id, bar.name from table`
   
   So at this stage it's looking like there's an issue with the data and how 
it's coming off the kinesis stream
   
   Update:
   I've pasted the data using `dataFrame.show()` from the rdd off the stream 
here:
   ```
   
++---++++-+
   |clientId|eventId|  eventTimestamp|  id|   order|  
typeOfEvent|
   
++---++++-+
   | 369| 115423|2020-02-12T15:54:...|34551840|[External, 
[[Aust...|order-created|
   
++---++++-+
   ```
   Taking the data off the stream but ignoring the nested column 'order' and 
the `select * from table` query works.
   ```
   val jsonFrame = spark.read.json(json)
   val dataFrame = jsonFrame.select("clientid", "eventId", "id", "typeOfEvent")
   ```
   
   Other than being nested, is there something about the order column that 
would make this happen?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi

2020-02-12 Thread GitBox
adamjoneill edited a comment on issue #1325: presto - querying nested object in 
parquet file created by hudi
URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-585338502
 
 
   i've managed to narrow down the issue to the data that is coming off the 
kinesis stream.
   
   when i replace the data from the stream with some test data as follows
   
   with the following code:
   
   ```
if (!rdd.isEmpty()){
   val json = rdd.map(record=>new String(record))
   val dataFrame = spark.read.json(json)
   dataFrame.printSchema();
   dataFrame.show();
   
   
   val hudiTableName = "order"
   val hudiTablePath = path + hudiTableName
   
   val hudiOptions = Map[String,String](
   DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
   HoodieWriteConfig.TABLE_NAME -> hudiTableName, 
   DataSourceWriteOptions.OPERATION_OPT_KEY -> 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, 
   DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "id")
   
   // Write data into the Hudi dataset
 
dataFrame.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath)
   }
   ```
   
   i replaced
   
   ```
val dataFrame = spark.read.json(json)
   ```
   
   with
   
   ```
   val dataFrame = sparkContext.parallelize(Seq(Foo(1, Bar(1, "first")), 
Foo(2, Bar(2, "second".toDF()
   ```
   
   and the `select * from table` worked as well as nested query `select id, 
bar.id, bar.name from table`
   
   So at this stage it's looking like there's an issue with the data and how 
it's coming off the kinesis stream
   
   Update:
   I've pasted the schema from the rdd off the stream here:
   ```
   
++---++++-+
   |clientId|eventId|  eventTimestamp|  id|   order|  
typeOfEvent|
   
++---++++-+
   | 369| 115423|2020-02-12T15:54:...|34551840|[External, 
[[Aust...|order-created|
   
++---++++-+
   ```
   Taking the data off the stream but ignoring the nested column 'order' and 
the `select * from table` query works.
   ```
   val jsonFrame = spark.read.json(json)
   val dataFrame = jsonFrame.select("clientid", "eventId", "id", "typeOfEvent")
   ```
   
   Other than being nested, is there something about the order column that 
would make this happen?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi

2020-02-12 Thread GitBox
adamjoneill edited a comment on issue #1325: presto - querying nested object in 
parquet file created by hudi
URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-585338502
 
 
   i've managed to narrow down the issue to the data that is coming off the 
kinesis stream.
   
   when i replace the data from the stream with some test data as follows
   
   with the following code:
   
   ```
if (!rdd.isEmpty()){
   val json = rdd.map(record=>new String(record))
   val dataFrame = spark.read.json(json)
   dataFrame.printSchema();
   dataFrame.show();
   }
   
   val hudiTableName = "order"
   val hudiTablePath = path + hudiTableName
   
   val hudiOptions = Map[String,String](
   DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
   HoodieWriteConfig.TABLE_NAME -> hudiTableName, 
   DataSourceWriteOptions.OPERATION_OPT_KEY -> 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, 
   DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "id")
   
   // Write data into the Hudi dataset
 
dataFrame.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath)
   ```
   
   i replaced
   
   ```
val dataFrame = spark.read.json(json)
   ```
   
   with
   
   ```
   val dataFrame = sparkContext.parallelize(Seq(Foo(1, Bar(1, "first")), 
Foo(2, Bar(2, "second".toDF()
   ```
   
   and the `select * from table` worked as well as nested query `select id, 
bar.id, bar.name from table`
   
   So at this stage it's looking like there's an issue with the data and how 
it's coming off the kinesis stream


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi

2020-02-12 Thread GitBox
adamjoneill edited a comment on issue #1325: presto - querying nested object in 
parquet file created by hudi
URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-585338502
 
 
   i've managed to narrow down the issue to the data that is coming off the 
kinesis stream.
   
   when i replace the data from the stream with some test data as follows
   
   with the following code:
   
   ```
if (!rdd.isEmpty()){
   val json = rdd.map(record=>new String(record))
   val dataFrame = spark.read.json(json)
   dataFrame.printSchema();
   dataFrame.show();
   
   
   val hudiTableName = "order"
   val hudiTablePath = path + hudiTableName
   
   val hudiOptions = Map[String,String](
   DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
   HoodieWriteConfig.TABLE_NAME -> hudiTableName, 
   DataSourceWriteOptions.OPERATION_OPT_KEY -> 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, 
   DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "id")
   
   // Write data into the Hudi dataset
 
dataFrame.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath)
   }
   ```
   
   i replaced
   
   ```
val dataFrame = spark.read.json(json)
   ```
   
   with
   
   ```
   val dataFrame = sparkContext.parallelize(Seq(Foo(1, Bar(1, "first")), 
Foo(2, Bar(2, "second".toDF()
   ```
   
   and the `select * from table` worked as well as nested query `select id, 
bar.id, bar.name from table`
   
   So at this stage it's looking like there's an issue with the data and how 
it's coming off the kinesis stream


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-hudi] adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi

2020-02-12 Thread GitBox
adamjoneill edited a comment on issue #1325: presto - querying nested object in 
parquet file created by hudi
URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-585128793
 
 
   thanks @lamber-ken 
   
   I've updated my deploy step to remove all references to 
org.apache.spark:spark-avro_2.11:2.4.4
   ```
   aws emr add-steps --cluster-id j-xx --steps 
Type=spark,Name=ScalaStream,Args=[\
   --deploy-mode,cluster,\
   --master,yarn,\
   
--jars,\'/usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-streaming-kinesis-asl-assembly.jar\',\
   --conf,spark.yarn.submit.waitAppCompletion=false,\
   --conf,yarn.log-aggregation-enable=true,\
   --conf,spark.dynamicAllocation.enabled=true,\
   --conf,spark.cores.max=4,\
   --conf,spark.network.timeout=300,\
   --conf,spark.serializer=org.apache.spark.serializer.KryoSerializer,\
   --conf,spark.sql.hive.convertMetastoreParquet=false,\
   --class,ScalaStream,\
   s3://./simple-project_2.11-1.0.jar\
   ],ActionOnFailure=CONTINUE
   ```
   
   and i receive the following error, where no hudi file is created for a 
streaming record
   
   ```
   20/02/12 09:56:45 INFO BlockManagerInfo: Removed broadcast_20_piece0 on 
ip-10-10-10-212.ap-southeast-2.compute.internal:33453 in memory (size: 7.9 KB, 
free: 2.6 GB)
   Exception in thread "streaming-job-executor-0" 
java.lang.NoClassDefFoundError: org/apache/spark/sql/avro/SchemaConverters$
at 
org.apache.hudi.AvroConversionUtils$.convertStructTypeToAvroSchema(AvroConversionUtils.scala:80)
at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:81)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91)
at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
at ScalaStream$.handleOrderCreated(stream.scala:39)
at ScalaStream$$anonfun$main$1.apply(stream.scala:110)
at ScalaStream$$anonfun$main$1.apply(stream.scala:82)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at