hi guys:     I got a question that  my SparkStreaming APP can not loading data 
into SparkSQL table in. Here is my code:
    val conf = new SparkConf().setAppName("KafkaStreaming for " + 
topics).setMaster("spark://master60:7077")
    val storageLevel = StorageLevel.DISK_ONLY
    val ssc = new StreamingContext(conf, Seconds(batchInterval.toInt))
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    //Receiver-based 
    val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, 
storageLevel)

    kafkaStream.foreachRDD { rdd =>
      val x = rdd.count()
      println(s"================processing $x records=================")
      rdd.collect().foreach(println)
      val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
      import sqlContext.implicits._
      val logRDD = 
sqlContext.read.json(rdd.values).select("payload").map(_.mkString)
      val logRDD2 = logRDD.map(_.split(',')).map { x =>
        NginxLog(x(0).trim().toFloat.toInt,
          x(1).trim(),
          x(2).trim(),
          x(3).trim(),
          x(4).trim(),
          x(5).trim(),
          x(6).trim(),
          x(7).trim(),
          x(8).trim(),
          x(9).trim(),
          x(10).trim())
      }
      val recDF = logRDD2.toDF
      recDF.printSchema()

      val hc = new org.apache.spark.sql.hive.HiveContext(rdd.sparkContext)
      val index = rdd.id
      recDF.write.parquet(s"/etl/tables/nginxlog/${topicNO}/${index}")
      hc.sql("CREATE TABLE IF NOT EXISTS nginxlog(msec Int,remote_addr 
String,u_domain String,u_url String,u_title String,u_referrer String,u_sh 
String,u_sw String,u_cd String,u_lang String,u_utrace String) STORED AS 
PARQUET")      hc.sql(s"LOAD DATA INPATH 
'/etl/tables/nginxlog/${topicNO}/${index}' INTO TABLE nginxlog")    }

There isn't any exception during running my APP. however, except the data in 
the first batch could be loaded into table nginxlog, all other batches can not 
be successfully loaded.I can not understand the reason of this kind of 
behavior. Is that my (hc)hivecontext issue?
PS.my spark cluster version: 1.6.1




--------------------------------

 

Thanks&Best regards!
San.Luo

Reply via email to