All,

This causes below error:

 However if i replace JavaHiveContext with Hive Context (see the commented code 
below) and replace JavaClass with CaseClass (Scala) the same code works ok. Any 
reason why this could be happening ?


ava.lang.ArithmeticException: / by zero

at 
parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:99)

at 
parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:92)

at parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:64)

at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)

at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)

at 
org.apache.spark.sql.parquet.InsertIntoParquetTable.org$apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1(ParquetTableOperations.scala:300)

at 
org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:318)

at 
org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:318)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)

at org.apache.spark.scheduler.Task.run(Task.scala:54)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:744)

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.api.java.JavaHiveContext
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.{Logging, SparkConf}

================================================================

object SparkStreamingToParquet extends Logging {

  /**
   *
   * @param args
   * @throws Exception
   */
  def main(args: Array[String]) {
//    if (args.length < 6) {
//      logInfo("Please provide valid parameters: <hdfsFilesLocation: 
hdfs://ip:8020/user/hdfs/--/> <IMPALAtableloc hdfs://ip:8020/user/hive/--/> "
//        + "<tablename> <ex:  com.philips.BeanClassName> <appname> <no of 
cores> <checkpoint-dire>");
//      logInfo("make user you give full folder path with '/' at the end i.e 
/user/hdfs/abc/");
//      System.exit(1);
//    }


    val CHECKPOINT_DIR = "hdfs://127.0.0.1:5555/user/checkpoint/" //args(6)


    val jssc: StreamingContext = StreamingContext.getOrCreate(CHECKPOINT_DIR, 
()=>{
      createContext(args)
    })

    jssc.start
    jssc.awaitTermination
  }


  def createContext(args:Array[String]): StreamingContext = {

     val CHECKPOINT_DIR = "hdfs://127.0.0.1:5555/user/checkpoint" //args(6)

    val sparkConf: SparkConf = new SparkConf()

    val HDFS_URI = "hdfs://127.0.0.1:5555" 
//sparkConf.get("spark.philips.hdfsuri");

    val HDFS_FILE_LOC = HDFS_URI+"/user/logs/" //args(0); // for streaming
    val IMPALA_TABLE_LOC = HDFS_URI+ "/user/impala/" //args(1); // impala table 
location
    val TEMP_TABLE_NAME = "temp_json" //args(2); // temp table name for hive

    // context
    val  BEAN_CLASS_NAME = "Person" //args(3);
    val  SPARK_APP_NAME  = "Monitor" //args(4);

    sparkConf.setAppName(SPARK_APP_NAME).setMaster("local[2]")

    var noOfCores = "3";

//    if(args.length>=6){
//      noOfCores= args(5);
//    }

   sparkConf.set("spark.cores.max", noOfCores);

   val jssc: StreamingContext = new StreamingContext(sparkConf, new 
Duration(30000))

    val stream = jssc.textFileStream(HDFS_FILE_LOC)

    stream.foreachRDD(rdd => {
        if(rdd!=null && rdd.count()>0) {

          val hcontext = new JavaHiveContext(rdd.sparkContext)
          
hcontext.createParquetFile(Class.forName(BEAN_CLASS_NAME),IMPALA_TABLE_LOC,true,org.apache.spark.deploy.SparkHadoopUtil.get.conf).registerTempTable(TEMP_TABLE_NAME);
         
//hcontext.createParquetFile[Person(IMPALA_TABLE_LOC,true,org.apache.spark.deploy.SparkHadoopUtil.get.conf).registerTempTable(TEMP_TABLE_NAME);
          val schRdd = hcontext.jsonRDD(rdd)
          schRdd.insertInto(TEMP_TABLE_NAME)
       }
    })

    jssc.checkpoint(CHECKPOINT_DIR)
    jssc
  }
}



________________________________
The information contained in this message may be confidential and legally 
protected under applicable law. The message is intended solely for the 
addressee(s). If you are not the intended recipient, you are hereby notified 
that any use, forwarding, dissemination, or reproduction of this message is 
strictly prohibited and may be unlawful. If you are not the intended recipient, 
please contact the sender by return e-mail and destroy all copies of the 
original message.

Reply via email to