Re: 回复:Spark-submit Problems

2016-10-15 Thread Mekal Zheng
Show me your code

2016年10月16日 +0800 08:24 hxfeng <980548...@qq.com>,写道:
> show you pi.py code and what is the exception message?
>
>
> -- 原始邮件 --
> 发件人: "Tobi Bosede";;
> 发送时间: 2016年10月16日(星期天) 上午8:04
> 收件人: "user";
>
> 主题: Spark-submit Problems
>
>
> Hi everyone,
>
> I am having problems submitting an app through spark-submit when the master 
> is not "local". However the pi.py example which comes with Spark works with 
> any master. I believe my script has the same structure as pi.py, but for some 
> reason my script is not as flexible. Specifically, the failure occurs when 
> count() is called. Count is the first action in the script. Also, Spark 
> complains that is is losing executors however, interactively in Jupyter, 
> everything works perfectly with any master passed to spark conf.
>
> Does anyone know what might be happening? Is there anywhere I can look up the 
> requirements for spark-submit scripts?
>
> Thanks,
> Tobi
>
>



Re: scala.MatchError on stand-alone cluster mode

2016-07-17 Thread Mekal Zheng
Hi, Rishabh Bhardwaj, Saisai Shao,

Thx for your help. I hava found that the key reason is I forgot to upload
the jar package to all of the node in cluster, so after the master
distributed the job and selected one node as the driver,  the driver can
not find the jar package and throw an exception.

-- 
Mekal Zheng
Sent with Airmail

发件人: Rishabh Bhardwaj <rbnex...@gmail.com> <rbnex...@gmail.com>
回复: Rishabh Bhardwaj <rbnex...@gmail.com> <rbnex...@gmail.com>
日期: July 15, 2016 at 17:28:43
至: Saisai Shao <sai.sai.s...@gmail.com> <sai.sai.s...@gmail.com>
抄送: Mekal Zheng <mekal.zh...@gmail.com> <mekal.zh...@gmail.com>, spark users
<user@spark.apache.org> <user@spark.apache.org>
主题:  Re: scala.MatchError on stand-alone cluster mode

Hi Mekal,
It may be a scala version mismatch error,kindly check whether you are
running both (your streaming app and spark cluster ) on 2.10 scala or 2.11.

Thanks,
Rishabh.

On Fri, Jul 15, 2016 at 1:38 PM, Saisai Shao <sai.sai.s...@gmail.com> wrote:

> The error stack is throwing from your code:
>
> Caused by: scala.MatchError: [Ljava.lang.String;@68d279ec (of class
> [Ljava.lang.String;)
> at com.jd.deeplog.LogAggregator$.main(LogAggregator.scala:29)
> at com.jd.deeplog.LogAggregator.main(LogAggregator.scala)
>
> I think you should debug the code yourself, it may not be the problem of
> Spark.
>
> On Fri, Jul 15, 2016 at 3:17 PM, Mekal Zheng <mekal.zh...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I have a Spark Streaming job written in Scala and is running well on
>> local and client mode, but when I submit it on cluster mode, the driver
>> reported an error shown as below.
>> Is there anyone know what is wrong here?
>> pls help me!
>>
>> the Job CODE is after
>>
>> 16/07/14 17:28:21 DEBUG ByteBufUtil:
>> -Dio.netty.threadLocalDirectBufferSize: 65536
>> 16/07/14 17:28:21 DEBUG NetUtil: Loopback interface: lo (lo,
>> 0:0:0:0:0:0:0:1%lo)
>> 16/07/14 17:28:21 DEBUG NetUtil: /proc/sys/net/core/somaxconn: 32768
>> 16/07/14 17:28:21 DEBUG TransportServer: Shuffle server started on port
>> :43492
>> 16/07/14 17:28:21 INFO Utils: Successfully started service 'Driver' on
>> port 43492.
>> 16/07/14 17:28:21 INFO WorkerWatcher: Connecting to worker spark://
>> Worker@172.20.130.98:23933
>> 16/07/14 17:28:21 DEBUG TransportClientFactory: Creating new connection
>> to /172.20.130.98:23933
>> Exception in thread "main" java.lang.reflect.InvocationTargetException
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at
>> org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
>> at
>> org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
>> Caused by: scala.MatchError: [Ljava.lang.String;@68d279ec (of class
>> [Ljava.lang.String;)
>> at com.jd.deeplog.LogAggregator$.main(LogAggregator.scala:29)
>> at com.jd.deeplog.LogAggregator.main(LogAggregator.scala)
>> ... 6 more
>>
>> ==
>> Job CODE:
>>
>> object LogAggregator {
>>
>>   val batchDuration = Seconds(5)
>>
>>   def main(args:Array[String]) {
>>
>> val usage =
>>   """Usage: LogAggregator 
>> 
>> |  logFormat: fieldName:fieldRole[,fieldName:fieldRole] each field 
>> must have both name and role
>> |  logFormat.role: can be key|avg|enum|sum|ignore
>>   """.stripMargin
>>
>> if (args.length < 9) {
>>   System.err.println(usage)
>>   System.exit(1)
>> }
>>
>> val Array(zkQuorum, group, topics, numThreads, logFormat, logSeparator, 
>> batchDuration, destType, destPath) = args
>>
>> println("Start streaming calculation...")
>>
>> val conf = new SparkConf().setAppName("LBHaproxy-LogAggregator")
>> val ssc = new StreamingContext(conf, Seconds(batchDuration.toInt))
>>
>> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>>
>> val lines = KafkaUtils.createStream(ssc, zkQuorum, group, 
>> topicMap).map(_._2)
>>
>> val logFields = logFormat.split(",").map(field => {
>>   val fld = field.split(":")
>>   if (fld.s

scala.MatchError on stand-alone cluster mode

2016-07-15 Thread Mekal Zheng
  case "enum" => {
val enumList = logSegment.value.asInstanceOf[List[(String, Int)]]
val enumJson = enumList.groupBy(_._1).map(el =>
el._2.reduce((e1, e2) => (e1._1, e1._2.toString.toInt +
e2._2.toString.toInt)))
JSONObject(enumJson)
  }
  case _ => logSegment.value
}
(logFieldName, fieldValue)
  })

  logContent + ("count" -> count)
})

if (destType == "hbase") {

  val hbaseQuorum = "localhost"
  val hbaseClientPort = "2181"
  val hbaseStore = new HBaseStore(hbaseQuorum, hbaseClientPort,
keys.toList, "tb_", true)

  val jobConf = hbaseStore.jobConf()

  aggResults.foreachRDD((rdd, time) => {
rdd.map(record => {
  val logPut = hbaseStore.convert(record, time)
  (new ImmutableBytesWritable, logPut)
}).saveAsHadoopDataset(jobConf)
  })
    } else if (destType == "file") {
  aggResults.foreachRDD((rdd, time) => {
rdd.foreach(record => {
  val res = record + ("timestamp" -> time.milliseconds)
  io.File(destPath).appendAll(res.toString() + "\n")
})
  })
}

ssc.start()
ssc.awaitTermination()
  }


-- 
Mekal Zheng
Sent with Airmail