Re: Spark on YARN, HowTo kill executor or individual task?

2019-02-12 Thread Serega Sheypak
I tried a similar approach, it works well for user functions. but I need to
crash tasks or executor when spark application runs "repartition". I didn't
any away to inject "poison pill" into repartition call :(

пн, 11 февр. 2019 г. в 21:19, Vadim Semenov :

> something like this
>
> import org.apache.spark.TaskContext
> ds.map(r => {
>   val taskContext = TaskContext.get()
>   if (taskContext.partitionId == 1000) {
> throw new RuntimeException
>   }
>   r
> })
>
> On Mon, Feb 11, 2019 at 8:41 AM Serega Sheypak 
> wrote:
> >
> > I need to crash task which does repartition.
> >
> > пн, 11 февр. 2019 г. в 10:37, Gabor Somogyi :
> >>
> >> What blocks you to put if conditions inside the mentioned map function?
> >>
> >> On Mon, Feb 11, 2019 at 10:31 AM Serega Sheypak <
> serega.shey...@gmail.com> wrote:
> >>>
> >>> Yeah, but I don't need to crash entire app, I want to fail several
> tasks or executors and then wait for completion.
> >>>
> >>> вс, 10 февр. 2019 г. в 21:49, Gabor Somogyi  >:
> >>>>
> >>>> Another approach is adding artificial exception into the
> application's source code like this:
> >>>>
> >>>> val query = input.toDS.map(_ /
> 0).writeStream.format("console").start()
> >>>>
> >>>> G
> >>>>
> >>>>
> >>>> On Sun, Feb 10, 2019 at 9:36 PM Serega Sheypak <
> serega.shey...@gmail.com> wrote:
> >>>>>
> >>>>> Hi BR,
> >>>>> thanks for your reply. I want to mimic the issue and kill tasks at a
> certain stage. Killing executor is also an option for me.
> >>>>> I'm curious how do core spark contributors test spark fault
> tolerance?
> >>>>>
> >>>>>
> >>>>> вс, 10 февр. 2019 г. в 16:57, Gabor Somogyi <
> gabor.g.somo...@gmail.com>:
> >>>>>>
> >>>>>> Hi Serega,
> >>>>>>
> >>>>>> If I understand your problem correctly you would like to kill one
> executor only and the rest of the app has to be untouched.
> >>>>>> If that's true yarn -kill is not what you want because it stops the
> whole application.
> >>>>>>
> >>>>>> I've done similar thing when tested/testing Spark's HA features.
> >>>>>> - jps -vlm | grep
> "org.apache.spark.executor.CoarseGrainedExecutorBackend.*applicationid"
> >>>>>> - kill -9 pidofoneexecutor
> >>>>>>
> >>>>>> Be aware if it's a multi-node cluster check whether at least one
> process runs on a specific node(it's not required).
> >>>>>> Happy killing...
> >>>>>>
> >>>>>> BR,
> >>>>>> G
> >>>>>>
> >>>>>>
> >>>>>> On Sun, Feb 10, 2019 at 4:19 PM Jörn Franke 
> wrote:
> >>>>>>>
> >>>>>>> yarn application -kill applicationid ?
> >>>>>>>
> >>>>>>> > Am 10.02.2019 um 13:30 schrieb Serega Sheypak <
> serega.shey...@gmail.com>:
> >>>>>>> >
> >>>>>>> > Hi there!
> >>>>>>> > I have weird issue that appears only when tasks fail at specific
> stage. I would like to imitate failure on my own.
> >>>>>>> > The plan is to run problematic app and then kill entire executor
> or some tasks when execution reaches certain stage.
> >>>>>>> >
> >>>>>>> > Is it do-able?
> >>>>>>>
> >>>>>>>
> -
> >>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>>>>>>
>
>
> --
> Sent from my iPhone
>


Re: Spark on YARN, HowTo kill executor or individual task?

2019-02-11 Thread Serega Sheypak
I need to crash task which does repartition.

пн, 11 февр. 2019 г. в 10:37, Gabor Somogyi :

> What blocks you to put if conditions inside the mentioned map function?
>
> On Mon, Feb 11, 2019 at 10:31 AM Serega Sheypak 
> wrote:
>
>> Yeah, but I don't need to crash entire app, I want to fail several tasks
>> or executors and then wait for completion.
>>
>> вс, 10 февр. 2019 г. в 21:49, Gabor Somogyi :
>>
>>> Another approach is adding artificial exception into the application's
>>> source code like this:
>>>
>>> val query = input.toDS.map(_ / 0).writeStream.format("console").start()
>>>
>>> G
>>>
>>>
>>> On Sun, Feb 10, 2019 at 9:36 PM Serega Sheypak 
>>> wrote:
>>>
>>>> Hi BR,
>>>> thanks for your reply. I want to mimic the issue and kill tasks at a
>>>> certain stage. Killing executor is also an option for me.
>>>> I'm curious how do core spark contributors test spark fault tolerance?
>>>>
>>>>
>>>> вс, 10 февр. 2019 г. в 16:57, Gabor Somogyi >>> >:
>>>>
>>>>> Hi Serega,
>>>>>
>>>>> If I understand your problem correctly you would like to kill one
>>>>> executor only and the rest of the app has to be untouched.
>>>>> If that's true yarn -kill is not what you want because it stops the
>>>>> whole application.
>>>>>
>>>>> I've done similar thing when tested/testing Spark's HA features.
>>>>> - jps -vlm | grep
>>>>> "org.apache.spark.executor.CoarseGrainedExecutorBackend.*applicationid"
>>>>> - kill -9 pidofoneexecutor
>>>>>
>>>>> Be aware if it's a multi-node cluster check whether at least one
>>>>> process runs on a specific node(it's not required).
>>>>> Happy killing...
>>>>>
>>>>> BR,
>>>>> G
>>>>>
>>>>>
>>>>> On Sun, Feb 10, 2019 at 4:19 PM Jörn Franke 
>>>>> wrote:
>>>>>
>>>>>> yarn application -kill applicationid ?
>>>>>>
>>>>>> > Am 10.02.2019 um 13:30 schrieb Serega Sheypak <
>>>>>> serega.shey...@gmail.com>:
>>>>>> >
>>>>>> > Hi there!
>>>>>> > I have weird issue that appears only when tasks fail at specific
>>>>>> stage. I would like to imitate failure on my own.
>>>>>> > The plan is to run problematic app and then kill entire executor or
>>>>>> some tasks when execution reaches certain stage.
>>>>>> >
>>>>>> > Is it do-able?
>>>>>>
>>>>>> -
>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>>
>>>>>>


Re: Spark on YARN, HowTo kill executor or individual task?

2019-02-11 Thread Serega Sheypak
Yeah, but I don't need to crash entire app, I want to fail several tasks or
executors and then wait for completion.

вс, 10 февр. 2019 г. в 21:49, Gabor Somogyi :

> Another approach is adding artificial exception into the application's
> source code like this:
>
> val query = input.toDS.map(_ / 0).writeStream.format("console").start()
>
> G
>
>
> On Sun, Feb 10, 2019 at 9:36 PM Serega Sheypak 
> wrote:
>
>> Hi BR,
>> thanks for your reply. I want to mimic the issue and kill tasks at a
>> certain stage. Killing executor is also an option for me.
>> I'm curious how do core spark contributors test spark fault tolerance?
>>
>>
>> вс, 10 февр. 2019 г. в 16:57, Gabor Somogyi :
>>
>>> Hi Serega,
>>>
>>> If I understand your problem correctly you would like to kill one
>>> executor only and the rest of the app has to be untouched.
>>> If that's true yarn -kill is not what you want because it stops the
>>> whole application.
>>>
>>> I've done similar thing when tested/testing Spark's HA features.
>>> - jps -vlm | grep
>>> "org.apache.spark.executor.CoarseGrainedExecutorBackend.*applicationid"
>>> - kill -9 pidofoneexecutor
>>>
>>> Be aware if it's a multi-node cluster check whether at least one process
>>> runs on a specific node(it's not required).
>>> Happy killing...
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Sun, Feb 10, 2019 at 4:19 PM Jörn Franke 
>>> wrote:
>>>
>>>> yarn application -kill applicationid ?
>>>>
>>>> > Am 10.02.2019 um 13:30 schrieb Serega Sheypak <
>>>> serega.shey...@gmail.com>:
>>>> >
>>>> > Hi there!
>>>> > I have weird issue that appears only when tasks fail at specific
>>>> stage. I would like to imitate failure on my own.
>>>> > The plan is to run problematic app and then kill entire executor or
>>>> some tasks when execution reaches certain stage.
>>>> >
>>>> > Is it do-able?
>>>>
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>>


Re: Spark on YARN, HowTo kill executor or individual task?

2019-02-10 Thread Serega Sheypak
Hi BR,
thanks for your reply. I want to mimic the issue and kill tasks at a
certain stage. Killing executor is also an option for me.
I'm curious how do core spark contributors test spark fault tolerance?


вс, 10 февр. 2019 г. в 16:57, Gabor Somogyi :

> Hi Serega,
>
> If I understand your problem correctly you would like to kill one executor
> only and the rest of the app has to be untouched.
> If that's true yarn -kill is not what you want because it stops the whole
> application.
>
> I've done similar thing when tested/testing Spark's HA features.
> - jps -vlm | grep
> "org.apache.spark.executor.CoarseGrainedExecutorBackend.*applicationid"
> - kill -9 pidofoneexecutor
>
> Be aware if it's a multi-node cluster check whether at least one process
> runs on a specific node(it's not required).
> Happy killing...
>
> BR,
> G
>
>
> On Sun, Feb 10, 2019 at 4:19 PM Jörn Franke  wrote:
>
>> yarn application -kill applicationid ?
>>
>> > Am 10.02.2019 um 13:30 schrieb Serega Sheypak > >:
>> >
>> > Hi there!
>> > I have weird issue that appears only when tasks fail at specific stage.
>> I would like to imitate failure on my own.
>> > The plan is to run problematic app and then kill entire executor or
>> some tasks when execution reaches certain stage.
>> >
>> > Is it do-able?
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Spark on YARN, HowTo kill executor or individual task?

2019-02-10 Thread Serega Sheypak
Hi there!
I have weird issue that appears only when tasks fail at specific stage. I
would like to imitate failure on my own.
The plan is to run problematic app and then kill entire executor or some
tasks when execution reaches certain stage.

Is it do-able?


Spark 2.x duplicates output when task fails at "repartition" stage. Checkpointing is enabled before repartition.

2019-02-05 Thread Serega Sheypak
Hi, I have spark job that produces duplicates when one or tasks from
repartition stage fails.
Here is simplified code.

sparkContext.setCheckpointDir("hdfs://path-to-checkpoint-dir")

*val *inputRDDs: List[RDD[String]] = *List*.*empty *// an RDD per input dir


*val *updatedRDDs = inputRDDs.map{ inputRDD => // some stuff happens here

  inputRDD

.filter(*???*)

 .map(*???*)

}


*val *unionOfUpdatedRDDs = sparkContext.union(updatedRDDs)

unionOfUpdatedRDDs.checkpoint() // id didn't help


unionOfUpdatedRDDs

  .repartition(42) // task failed here,

  .saveAsNewAPIHadoopFile("/path") // task failed here too.

// what really causes duplicates in output?


Re: Spark on Yarn, is it possible to manually blacklist nodes before running spark job?

2019-01-23 Thread Serega Sheypak
Hi Imran,
here is my usecase
There is 1K nodes cluster and jobs have performance degradation because of
a single node. It's rather hard to convince Cluster Ops to decommission
node because of "performance degradation". Imagine 10 dev teams chase
single ops team for valid reason (node has problems) or because code has a
bug or data is skewed or spots on the sun. We can't just decommission node
because random dev complains.

Simple solution:
- rerun failed / delayed job and blacklist "problematic" node in advance.
- Report about the problem if job works w/o anomalies.
- ops collect complains about node and start to decommission it when
"complains threshold" is reached. It's a rather low probability that many
loosely coupled teams with loosely coupled jobs complain about a single
node.


Results
- Ops are not spammed with a random requests from devs
- devs are not blocked because of the really bad node.
- it's very cheap for everyone to "blacklist" node during job submission
w/o doing anything to node.
- it's very easy to automate such behavior. Many teams use 100500 kinds of
workflow runners and the strategy is dead simple (depends on SLA of
course).
  - Just re-run failed job excluding nodes with failed tasks (if number of
nodes is reasonable)
  - Kill stuck job if it runs longer than XXX minutes and re-start
excluding nodes with long-running tasks.



ср, 23 янв. 2019 г. в 23:09, Imran Rashid :

> Serga, can you explain a bit more why you want this ability?
> If the node is really bad, wouldn't you want to decomission the NM
> entirely?
> If you've got heterogenous resources, than nodelabels seem like they would
> be more appropriate -- and I don't feel great about adding workarounds for
> the node-label limitations into blacklisting.
>
> I don't want to be stuck supporting a configuration with too limited a use
> case.
>
> (may be better to move discussion to
> https://issues.apache.org/jira/browse/SPARK-26688 so its better archived,
> I'm responding here in case you aren't watching that issue)
>
> On Tue, Jan 22, 2019 at 6:09 AM Jörn Franke  wrote:
>
>> You can try with Yarn node labels:
>>
>> https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/NodeLabel.html
>>
>> Then you can whitelist nodes.
>>
>> Am 19.01.2019 um 00:20 schrieb Serega Sheypak :
>>
>> Hi, is there any possibility to tell Scheduler to blacklist specific
>> nodes in advance?
>>
>>


Re: Spark on Yarn, is it possible to manually blacklist nodes before running spark job?

2019-01-21 Thread Serega Sheypak
Hi Apiros, thanks for your reply.

Is it this one: https://github.com/apache/spark/pull/23223 ?
Can I try to reach you through Cloudera Support portal?

пн, 21 янв. 2019 г. в 20:06, attilapiros :

> Hello, I was working on this area last year (I have developed the
> YarnAllocatorBlacklistTracker) and if you haven't found any solution for
> your problem I can introduce a new config which would contain a sequence of
> always blacklisted nodes. This way blacklisting would improve a bit again
> :)
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark on Yarn, is it possible to manually blacklist nodes before running spark job?

2019-01-20 Thread Serega Sheypak
Thanks, so I'll check YARN.
Does anyone know if Spark-on-Yarn plans to expose such functionality?

сб, 19 янв. 2019 г. в 18:04, Felix Cheung :

> To clarify, yarn actually supports excluding node right when requesting
> resources. It’s spark that doesn’t provide a way to populate such a
> blacklist.
>
> If you can change yarn config, the equivalent is node label:
> https://hadoop.apache.org/docs/r2.7.4/hadoop-yarn/hadoop-yarn-site/NodeLabel.html
>
>
>
> --
> *From:* Li Gao 
> *Sent:* Saturday, January 19, 2019 8:43 AM
> *To:* Felix Cheung
> *Cc:* Serega Sheypak; user
> *Subject:* Re: Spark on Yarn, is it possible to manually blacklist nodes
> before running spark job?
>
> on yarn it is impossible afaik. on kubernetes you can use taints to keep
> certain nodes outside of spark
>
> On Fri, Jan 18, 2019 at 9:35 PM Felix Cheung 
> wrote:
>
>> Not as far as I recall...
>>
>>
>> --
>> *From:* Serega Sheypak 
>> *Sent:* Friday, January 18, 2019 3:21 PM
>> *To:* user
>> *Subject:* Spark on Yarn, is it possible to manually blacklist nodes
>> before running spark job?
>>
>> Hi, is there any possibility to tell Scheduler to blacklist specific
>> nodes in advance?
>>
>


Spark on Yarn, is it possible to manually blacklist nodes before running spark job?

2019-01-18 Thread Serega Sheypak
Hi, is there any possibility to tell Scheduler to blacklist specific nodes
in advance?


Kill spark executor when spark runs specific stage

2018-07-04 Thread Serega Sheypak
Hi, I'm running spark on YARN. My code is very simple. I want to kill one
executor when "data.repartition(10)" is executed. Ho can I do it in easy
way?


val data = sc.sequenceFile[NullWritable, BytesWritable](inputPath)
.map { case (key, value) =>
Data.fromBytes(value)
}

process = data.repartition(10) // kill one executor here
process.map { d =>
val data = d.toByteArray
(new AvroKey(ByteBuffer.wrap(data)), NullWritable.get())
}
.saveAsNewAPIHadoopFile[AvroKeyOutputFormat[ByteBuffer]](outputPath)


Re: how "hour" function in Spark SQL is supposed to work?

2018-03-20 Thread Serega Sheypak
Ok, this one works:

.withColumn("hour", hour(from_unixtime(typedDataset.col("ts") / 1000)))



2018-03-20 22:43 GMT+01:00 Serega Sheypak <serega.shey...@gmail.com>:

> Hi, any updates? Looks like some API inconsistency or bug..?
>
> 2018-03-17 13:09 GMT+01:00 Serega Sheypak <serega.shey...@gmail.com>:
>
>> > Not sure why you are dividing by 1000. from_unixtime expects a long type
>> It expects seconds, I have milliseconds.
>>
>>
>>
>> 2018-03-12 6:16 GMT+01:00 vermanurag <anurag.ve...@fnmathlogic.com>:
>>
>>> Not sure why you are dividing by 1000. from_unixtime expects a long type
>>> which is time in milliseconds from reference date.
>>>
>>> The following should work:
>>>
>>> val ds = dataset.withColumn("hour",hour(from_unixtime(dataset.col("ts
>>> "
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>


Re: how "hour" function in Spark SQL is supposed to work?

2018-03-20 Thread Serega Sheypak
Hi, any updates? Looks like some API inconsistency or bug..?

2018-03-17 13:09 GMT+01:00 Serega Sheypak <serega.shey...@gmail.com>:

> > Not sure why you are dividing by 1000. from_unixtime expects a long type
> It expects seconds, I have milliseconds.
>
>
>
> 2018-03-12 6:16 GMT+01:00 vermanurag <anurag.ve...@fnmathlogic.com>:
>
>> Not sure why you are dividing by 1000. from_unixtime expects a long type
>> which is time in milliseconds from reference date.
>>
>> The following should work:
>>
>> val ds = dataset.withColumn("hour",hour(from_unixtime(dataset.col("
>> ts"
>>
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Run spark 2.2 on yarn as usual java application

2018-03-19 Thread Serega Sheypak
Hi Jörn, thanks for your reply
Oozie starts ooze java action as single "long running" MapReduce Mapper.
This mapper is responsible for calling main class. Main class belongs to
user and this main class starts spark job.
yarn-cluster is not an option for me. I have to do something special to
maintain "run away" driver. Imagine I want to kill the spark job. I can
just kill oozie workflow, it will kill spawned mapper with main class with
driver inside it.
It won't happen in yarn-cluster mode, since driver is not running in the
process "managed" by oozie.


2018-03-19 13:41 GMT+01:00 Jörn Franke <jornfra...@gmail.com>:

> Maybe you should better run it in yarn cluster mode. Yarn client would
> start the driver on the oozie server.
>
> On 19. Mar 2018, at 12:58, Serega Sheypak <serega.shey...@gmail.com>
> wrote:
>
> I'm trying to run it as Oozie java action and reduce env dependency. The
> only thing I need is Hadoop Configuration to talk to hdfs and yarn.
> Spark submit is a shell thing. Trying to do all from jvm.
> Oozie java action starts main class which inststiates SparkConf and
> session. It works well in local mode but throws exception when I try to run
> spark as yarn-client
>
> пн, 19 марта 2018 г. в 7:16, Jacek Laskowski <ja...@japila.pl>:
>
>> Hi,
>>
>> What's the deployment process then (if not using spark-submit)? How is
>> the AM deployed? Why would you want to skip spark-submit?
>>
>> Jacek
>>
>> On 19 Mar 2018 00:20, "Serega Sheypak" <serega.shey...@gmail.com> wrote:
>>
>>> Hi, Is it even possible to run spark on yarn as usual java application?
>>> I've built jat using maven with spark-yarn dependency and I manually
>>> populate SparkConf with all hadoop properties.
>>> SparkContext fails to start with exception:
>>>
>>>1. Caused by: java.lang.IllegalStateException: Library directory
>>>'/hadoop/yarn/local/usercache/root/appcache/application_
>>>1521375636129_0022/container_e06_1521375636129_0022_01_
>>>02/assembly/target/scala-2.11/jars' does not exist; make sure
>>>Spark is built.
>>>2. at org.apache.spark.launcher.CommandBuilderUtils.checkState(Com
>>>mandBuilderUtils.java:260)
>>>3. at org.apache.spark.launcher.CommandBuilderUtils.findJarsDir(Co
>>>mmandBuilderUtils.java:359)
>>>4. at org.apache.spark.launcher.YarnCommandBuilderUtils$.findJarsDir(
>>>YarnCommandBuilderUtils.scala:38)
>>>
>>>
>>> I took a look at the code and it has some hardcodes and checks for
>>> specific files layout. I don't follow why :)
>>> Is it possible to bypass such checks?
>>>
>>


Re: Run spark 2.2 on yarn as usual java application

2018-03-19 Thread Serega Sheypak
I'm trying to run it as Oozie java action and reduce env dependency. The
only thing I need is Hadoop Configuration to talk to hdfs and yarn.
Spark submit is a shell thing. Trying to do all from jvm.
Oozie java action starts main class which inststiates SparkConf and
session. It works well in local mode but throws exception when I try to run
spark as yarn-client

пн, 19 марта 2018 г. в 7:16, Jacek Laskowski <ja...@japila.pl>:

> Hi,
>
> What's the deployment process then (if not using spark-submit)? How is the
> AM deployed? Why would you want to skip spark-submit?
>
> Jacek
>
> On 19 Mar 2018 00:20, "Serega Sheypak" <serega.shey...@gmail.com> wrote:
>
>> Hi, Is it even possible to run spark on yarn as usual java application?
>> I've built jat using maven with spark-yarn dependency and I manually
>> populate SparkConf with all hadoop properties.
>> SparkContext fails to start with exception:
>>
>>1. Caused by: java.lang.IllegalStateException: Library directory
>>
>> '/hadoop/yarn/local/usercache/root/appcache/application_1521375636129_0022/container_e06_1521375636129_0022_01_02/assembly/target/scala-2.11/jars'
>>does not exist; make sure Spark is built.
>>2. at org.apache.spark.launcher.CommandBuilderUtils.checkState(
>>CommandBuilderUtils.java:260)
>>3. at org.apache.spark.launcher.CommandBuilderUtils.findJarsDir(
>>CommandBuilderUtils.java:359)
>>4. at org.apache.spark.launcher.YarnCommandBuilderUtils$.findJarsDir(
>>YarnCommandBuilderUtils.scala:38)
>>
>>
>> I took a look at the code and it has some hardcodes and checks for
>> specific files layout. I don't follow why :)
>> Is it possible to bypass such checks?
>>
>


Run spark 2.2 on yarn as usual java application

2018-03-18 Thread Serega Sheypak
Hi, Is it even possible to run spark on yarn as usual java application?
I've built jat using maven with spark-yarn dependency and I manually
populate SparkConf with all hadoop properties.
SparkContext fails to start with exception:

   1. Caused by: java.lang.IllegalStateException: Library directory
   
'/hadoop/yarn/local/usercache/root/appcache/application_1521375636129_0022/container_e06_1521375636129_0022_01_02/assembly/target/scala-2.11/jars'
   does not exist; make sure Spark is built.
   2. at org.apache.spark.launcher.CommandBuilderUtils.checkState(
   CommandBuilderUtils.java:260)
   3. at org.apache.spark.launcher.CommandBuilderUtils.findJarsDir(
   CommandBuilderUtils.java:359)
   4. at org.apache.spark.launcher.YarnCommandBuilderUtils$.findJarsDir(
   YarnCommandBuilderUtils.scala:38)


I took a look at the code and it has some hardcodes and checks for specific
files layout. I don't follow why :)
Is it possible to bypass such checks?


Re: Append more files to existing partitioned data

2018-03-18 Thread Serega Sheypak
Thanks a lot!

2018-03-18 9:30 GMT+01:00 Denis Bolshakov <bolshakov.de...@gmail.com>:

> Please checkout.
>
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
>
>
> and
>
> org.apache.spark.sql.execution.datasources.WriteRelation
>
>
> I guess it's managed by
>
> job.getConfiguration.set(DATASOURCE_WRITEJOBUUID, uniqueWriteJobId.toString)
>
>
> On 17 March 2018 at 20:46, Serega Sheypak <serega.shey...@gmail.com>
> wrote:
>
>> Hi Denis, great to see you here :)
>> It works, thanks!
>>
>> Do you know how spark generates datafile names?  names look like
>> part- with uuid appended after
>>
>> part-0-124a8c43-83b9-44e1-a9c4-dcc8676cdb99.c000.snappy.parquet
>>
>>
>>
>>
>> 2018-03-17 14:15 GMT+01:00 Denis Bolshakov <bolshakov.de...@gmail.com>:
>>
>>> Hello Serega,
>>>
>>> https://spark.apache.org/docs/latest/sql-programming-guide.html
>>>
>>> Please try SaveMode.Append option. Does it work for you?
>>>
>>>
>>> сб, 17 мар. 2018 г., 15:19 Serega Sheypak <serega.shey...@gmail.com>:
>>>
>>>> Hi, I', using spark-sql to process my data and store result as parquet
>>>> partitioned by several columns
>>>>
>>>> ds.write
>>>>   .partitionBy("year", "month", "day", "hour", "workflowId")
>>>>   .parquet("/here/is/my/dir")
>>>>
>>>>
>>>> I want to run more jobs that will produce new partitions or add more
>>>> files to existing partitions.
>>>> What is the right way to do it?
>>>>
>>>
>>
>
>
> --
> //with Best Regards
> --Denis Bolshakov
> e-mail: bolshakov.de...@gmail.com
>


Re: Append more files to existing partitioned data

2018-03-17 Thread Serega Sheypak
Hi Denis, great to see you here :)
It works, thanks!

Do you know how spark generates datafile names?  names look like part-
with uuid appended after

part-0-124a8c43-83b9-44e1-a9c4-dcc8676cdb99.c000.snappy.parquet




2018-03-17 14:15 GMT+01:00 Denis Bolshakov <bolshakov.de...@gmail.com>:

> Hello Serega,
>
> https://spark.apache.org/docs/latest/sql-programming-guide.html
>
> Please try SaveMode.Append option. Does it work for you?
>
>
> сб, 17 мар. 2018 г., 15:19 Serega Sheypak <serega.shey...@gmail.com>:
>
>> Hi, I', using spark-sql to process my data and store result as parquet
>> partitioned by several columns
>>
>> ds.write
>>   .partitionBy("year", "month", "day", "hour", "workflowId")
>>   .parquet("/here/is/my/dir")
>>
>>
>> I want to run more jobs that will produce new partitions or add more
>> files to existing partitions.
>> What is the right way to do it?
>>
>


Append more files to existing partitioned data

2018-03-17 Thread Serega Sheypak
Hi, I', using spark-sql to process my data and store result as parquet
partitioned by several columns

ds.write
  .partitionBy("year", "month", "day", "hour", "workflowId")
  .parquet("/here/is/my/dir")


I want to run more jobs that will produce new partitions or add more files
to existing partitions.
What is the right way to do it?


Re: how "hour" function in Spark SQL is supposed to work?

2018-03-17 Thread Serega Sheypak
> Not sure why you are dividing by 1000. from_unixtime expects a long type
It expects seconds, I have milliseconds.



2018-03-12 6:16 GMT+01:00 vermanurag :

> Not sure why you are dividing by 1000. from_unixtime expects a long type
> which is time in milliseconds from reference date.
>
> The following should work:
>
> val ds = dataset.withColumn("hour",hour(from_unixtime(dataset.col("ts"
>
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


how "hour" function in Spark SQL is supposed to work?

2018-03-11 Thread Serega Sheypak
hi, desperately trying to extract hour from unix seconds

year, month, dayofmonth functions work as expected.
hour function always returns 0.

val ds  = dataset
  .withColumn("year", year(to_date(from_unixtime(dataset.col("ts") / 1000
  .withColumn("month", month(to_date(from_unixtime(dataset.col("ts") / 1000
  .withColumn("day",
dayofmonth(to_date(from_unixtime(dataset.col("ts") / 1000
  .withColumn("hour", hour(from_utc_timestamp(dataset.col("ts") / 1000, "UTC")))

  //.withColumn("hour", hour(dataset.col("ts") / 1000))
  //.withColumn("hour1", hour(dataset.col("ts")))
  //.withColumn("hour", hour(dataset.col("ts")))
  //.withColumn("hour", hour("2009-07-30 12:58:59"))

I took a look at source code

year, month, dayofmonth expect to get

override def inputTypes: Seq[AbstractDataType] = Seq(DateType)

hour function expects something different

override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType)

from_utc_timestamp returns Timestamp

override def dataType: DataType = TimestampType

but It didn't help

What do I do wrong? how can I get hour from unix seconds?
Thanks!


Implement Dataset reader from SEQ file with protobuf to Dataset

2017-10-08 Thread Serega Sheypak
Hi, did anyone try to implement Spark SQL dataset reader from SEQ file with
protobuf inside to Dataset?

Imagine I have protobuf def
Person
 - name: String
 - lastName: String
- phones: List[String]

and generated scala case class:
case class Person(name:String, lastName: String, phones: List[String])

I want to write some component that gives me Dataset with types schema.

val personsDataset = spark.read
  .option("inferSchema", "true")[Person]

Where can I take a look at references?


Re: Managed memory leak detected.SPARK-11293 ?

2016-05-18 Thread Serega Sheypak
Ok, it happens only in YARN+cluster mode. It works with snappy in
YARN+client mode.
I've  started to hit this problem when I switched to cluster mode.

2016-05-18 16:31 GMT+02:00 Ted Yu <yuzhih...@gmail.com>:

> According to:
>
> http://blog.erdemagaoglu.com/post/4605524309/lzo-vs-snappy-vs-lzf-vs-zlib-a-comparison-of
>
> performance of snappy and lzf were on-par to each other.
>
> Maybe lzf has lower memory requirement.
>
> On Wed, May 18, 2016 at 7:22 AM, Serega Sheypak <serega.shey...@gmail.com>
> wrote:
>
>> Switching from snappy to lzf helped me:
>>
>> *spark.io.compression.codec=lzf*
>>
>> Do you know why? :) I can't find exact explanation...
>>
>>
>>
>> 2016-05-18 15:41 GMT+02:00 Ted Yu <yuzhih...@gmail.com>:
>>
>>> Please increase the number of partitions.
>>>
>>> Cheers
>>>
>>> On Wed, May 18, 2016 at 4:17 AM, Serega Sheypak <
>>> serega.shey...@gmail.com> wrote:
>>>
>>>> Hi, please have a look at log snippet:
>>>> 16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Doing the fetch;
>>>> tracker endpoint =
>>>> NettyRpcEndpointRef(spark://mapoutputtrac...@xxx.xxx.xxx.xxx:38128)
>>>> 16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Got the output
>>>> locations
>>>> 16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Getting 30
>>>> non-empty blocks out of 30 blocks
>>>> 16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Started 30
>>>> remote fetches in 3 ms
>>>> 16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Don't have map
>>>> outputs for shuffle 1, fetching them
>>>> 16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Doing the fetch;
>>>> tracker endpoint =
>>>> NettyRpcEndpointRef(spark://mapoutputtrac...@xxx.xxx.xxx.xxx:38128)
>>>> 16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Got the output
>>>> locations
>>>> 16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Getting 1
>>>> non-empty blocks out of 1500 blocks
>>>> 16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Started 1
>>>> remote fetches in 1 ms
>>>> 16/05/18 03:27:17 ERROR executor.Executor: Managed memory leak
>>>> detected; size = 6685476 bytes, TID = 3405
>>>> 16/05/18 03:27:17 ERROR executor.Executor: Exception in task 285.0 in
>>>> stage 6.0 (TID 3405)
>>>>
>>>> Is it related to https://issues.apache.org/jira/browse/SPARK-11293
>>>>
>>>> Is there any recommended workaround?
>>>>
>>>
>>>
>>
>


Re: Managed memory leak detected.SPARK-11293 ?

2016-05-18 Thread Serega Sheypak
Switching from snappy to lzf helped me:

*spark.io.compression.codec=lzf*

Do you know why? :) I can't find exact explanation...



2016-05-18 15:41 GMT+02:00 Ted Yu <yuzhih...@gmail.com>:

> Please increase the number of partitions.
>
> Cheers
>
> On Wed, May 18, 2016 at 4:17 AM, Serega Sheypak <serega.shey...@gmail.com>
> wrote:
>
>> Hi, please have a look at log snippet:
>> 16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Doing the fetch;
>> tracker endpoint =
>> NettyRpcEndpointRef(spark://mapoutputtrac...@xxx.xxx.xxx.xxx:38128)
>> 16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Got the output
>> locations
>> 16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Getting 30
>> non-empty blocks out of 30 blocks
>> 16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Started 30
>> remote fetches in 3 ms
>> 16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Don't have map
>> outputs for shuffle 1, fetching them
>> 16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Doing the fetch;
>> tracker endpoint =
>> NettyRpcEndpointRef(spark://mapoutputtrac...@xxx.xxx.xxx.xxx:38128)
>> 16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Got the output
>> locations
>> 16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Getting 1
>> non-empty blocks out of 1500 blocks
>> 16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Started 1
>> remote fetches in 1 ms
>> 16/05/18 03:27:17 ERROR executor.Executor: Managed memory leak detected;
>> size = 6685476 bytes, TID = 3405
>> 16/05/18 03:27:17 ERROR executor.Executor: Exception in task 285.0 in
>> stage 6.0 (TID 3405)
>>
>> Is it related to https://issues.apache.org/jira/browse/SPARK-11293
>>
>> Is there any recommended workaround?
>>
>
>


Managed memory leak detected.SPARK-11293 ?

2016-05-18 Thread Serega Sheypak
Hi, please have a look at log snippet:
16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Doing the fetch;
tracker endpoint =
NettyRpcEndpointRef(spark://mapoutputtrac...@xxx.xxx.xxx.xxx:38128)
16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Got the output
locations
16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Getting 30
non-empty blocks out of 30 blocks
16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Started 30
remote fetches in 3 ms
16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Don't have map outputs
for shuffle 1, fetching them
16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Doing the fetch;
tracker endpoint =
NettyRpcEndpointRef(spark://mapoutputtrac...@xxx.xxx.xxx.xxx:38128)
16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Got the output
locations
16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Getting 1
non-empty blocks out of 1500 blocks
16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Started 1
remote fetches in 1 ms
16/05/18 03:27:17 ERROR executor.Executor: Managed memory leak detected;
size = 6685476 bytes, TID = 3405
16/05/18 03:27:17 ERROR executor.Executor: Exception in task 285.0 in stage
6.0 (TID 3405)

Is it related to https://issues.apache.org/jira/browse/SPARK-11293

Is there any recommended workaround?


Re: Why does spark 1.6.0 can't use jar files stored on HDFS

2016-05-17 Thread Serega Sheypak
Hi, I know about that approach.
I don't want to run mess of classes from single jar, I want to utilize
distributed cache functionality and ship application jar and dependent jars
explicitly.
--deploy-mode client unfortunately copies and distributes all jars
repeatedly for every spark job started from driver class...

2016-05-17 15:41 GMT+02:00 <spark@yahoo.com>:

> Hi Serega,
>
> Create a jar including all the the dependencies and execute it like below
> through shell script
>
> /usr/local/spark/bin/spark-submit \  //location of your spark-submit
> --class classname \  //location of your main classname
> --master yarn \
> --deploy-mode cluster \
> /home/hadoop/SparkSampleProgram.jar  //location of your jar file
>
> Thanks
> Raj
>
>
>
> Sent from Yahoo Mail. Get the app <https://yho.com/148vdq>
>
>
> On Tuesday, May 17, 2016 6:03 PM, Serega Sheypak <serega.shey...@gmail.com>
> wrote:
>
>
> hi, I'm trying to:
> 1. upload my app jar files to HDFS
> 2. run spark-submit with:
> 2.1. --master yarn --deploy-mode cluster
> or
> 2.2. --master yarn --deploy-mode client
>
> specifying --jars hdfs:///my/home/commons.jar,hdfs:///my/home/super.jar
>
> When spark job is submitted, SparkSubmit client outputs:
> Warning: Skip remote jar hdfs:///user/baba/lib/akka-slf4j_2.11-2.3.11.jar
> ...
>
> and then spark application main class fails with class not found exception.
> Is there any workaround?
>
>
>


Re: Why does spark 1.6.0 can't use jar files stored on HDFS

2016-05-17 Thread Serega Sheypak
spark-submit --conf "spark.driver.userClassPathFirst=true" --class
com.MyClass --master yarn --deploy-mode client --jars
hdfs:///my-lib.jar,hdfs:///my-seocnd-lib.jar jar-wth-com-MyClass.jar
job_params



2016-05-17 15:41 GMT+02:00 Serega Sheypak <serega.shey...@gmail.com>:

> https://issues.apache.org/jira/browse/SPARK-10643
>
> Looks like it's the reason...
>
> 2016-05-17 15:31 GMT+02:00 Serega Sheypak <serega.shey...@gmail.com>:
>
>> No, and it looks like a problem.
>>
>> 2.2. --master yarn --deploy-mode client
>> means:
>> 1. submit spark as yarn app, but spark-driver is started on local
>> machine.
>> 2. A upload all dependent jars to HDFS and specify jar HDFS paths in
>> --jars arg.
>> 3. Driver runs my Spark Application main class named "MySuperSparkJob"
>> and MySuperSparkJob fails because it doesn't get jars, thay are all in
>> HDFS and not accessible from local machine...
>>
>>
>> 2016-05-17 15:18 GMT+02:00 Jeff Zhang <zjf...@gmail.com>:
>>
>>> Do you put your app jar on hdfs ? The app jar must be on your local
>>> machine.
>>>
>>> On Tue, May 17, 2016 at 8:33 PM, Serega Sheypak <
>>> serega.shey...@gmail.com> wrote:
>>>
>>>> hi, I'm trying to:
>>>> 1. upload my app jar files to HDFS
>>>> 2. run spark-submit with:
>>>> 2.1. --master yarn --deploy-mode cluster
>>>> or
>>>> 2.2. --master yarn --deploy-mode client
>>>>
>>>> specifying --jars hdfs:///my/home/commons.jar,hdfs:///my/home/super.jar
>>>>
>>>> When spark job is submitted, SparkSubmit client outputs:
>>>> Warning: Skip remote jar
>>>> hdfs:///user/baba/lib/akka-slf4j_2.11-2.3.11.jar ...
>>>>
>>>> and then spark application main class fails with class not found
>>>> exception.
>>>> Is there any workaround?
>>>>
>>>
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>>
>


Re: Why does spark 1.6.0 can't use jar files stored on HDFS

2016-05-17 Thread Serega Sheypak
https://issues.apache.org/jira/browse/SPARK-10643

Looks like it's the reason...

2016-05-17 15:31 GMT+02:00 Serega Sheypak <serega.shey...@gmail.com>:

> No, and it looks like a problem.
>
> 2.2. --master yarn --deploy-mode client
> means:
> 1. submit spark as yarn app, but spark-driver is started on local machine.
> 2. A upload all dependent jars to HDFS and specify jar HDFS paths in
> --jars arg.
> 3. Driver runs my Spark Application main class named "MySuperSparkJob" and 
> MySuperSparkJob
> fails because it doesn't get jars, thay are all in HDFS and not accessible
> from local machine...
>
>
> 2016-05-17 15:18 GMT+02:00 Jeff Zhang <zjf...@gmail.com>:
>
>> Do you put your app jar on hdfs ? The app jar must be on your local
>> machine.
>>
>> On Tue, May 17, 2016 at 8:33 PM, Serega Sheypak <serega.shey...@gmail.com
>> > wrote:
>>
>>> hi, I'm trying to:
>>> 1. upload my app jar files to HDFS
>>> 2. run spark-submit with:
>>> 2.1. --master yarn --deploy-mode cluster
>>> or
>>> 2.2. --master yarn --deploy-mode client
>>>
>>> specifying --jars hdfs:///my/home/commons.jar,hdfs:///my/home/super.jar
>>>
>>> When spark job is submitted, SparkSubmit client outputs:
>>> Warning: Skip remote jar
>>> hdfs:///user/baba/lib/akka-slf4j_2.11-2.3.11.jar ...
>>>
>>> and then spark application main class fails with class not found
>>> exception.
>>> Is there any workaround?
>>>
>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>


Re: Why does spark 1.6.0 can't use jar files stored on HDFS

2016-05-17 Thread Serega Sheypak
No, and it looks like a problem.

2.2. --master yarn --deploy-mode client
means:
1. submit spark as yarn app, but spark-driver is started on local machine.
2. A upload all dependent jars to HDFS and specify jar HDFS paths in --jars
arg.
3. Driver runs my Spark Application main class named "MySuperSparkJob"
and MySuperSparkJob
fails because it doesn't get jars, thay are all in HDFS and not accessible
from local machine...


2016-05-17 15:18 GMT+02:00 Jeff Zhang <zjf...@gmail.com>:

> Do you put your app jar on hdfs ? The app jar must be on your local
> machine.
>
> On Tue, May 17, 2016 at 8:33 PM, Serega Sheypak <serega.shey...@gmail.com>
> wrote:
>
>> hi, I'm trying to:
>> 1. upload my app jar files to HDFS
>> 2. run spark-submit with:
>> 2.1. --master yarn --deploy-mode cluster
>> or
>> 2.2. --master yarn --deploy-mode client
>>
>> specifying --jars hdfs:///my/home/commons.jar,hdfs:///my/home/super.jar
>>
>> When spark job is submitted, SparkSubmit client outputs:
>> Warning: Skip remote jar hdfs:///user/baba/lib/akka-slf4j_2.11-2.3.11.jar
>> ...
>>
>> and then spark application main class fails with class not found
>> exception.
>> Is there any workaround?
>>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Why does spark 1.6.0 can't use jar files stored on HDFS

2016-05-17 Thread Serega Sheypak
hi, I'm trying to:
1. upload my app jar files to HDFS
2. run spark-submit with:
2.1. --master yarn --deploy-mode cluster
or
2.2. --master yarn --deploy-mode client

specifying --jars hdfs:///my/home/commons.jar,hdfs:///my/home/super.jar

When spark job is submitted, SparkSubmit client outputs:
Warning: Skip remote jar hdfs:///user/baba/lib/akka-slf4j_2.11-2.3.11.jar
...

and then spark application main class fails with class not found exception.
Is there any workaround?


Re: spark sql and cassandra. spark generate 769 tasks to read 3 lines from cassandra table

2015-06-17 Thread Serega Sheypak
version
We are on DSE 4.7. (Cassandra 2.1) and spark 1.2.1

cqlsh
select * from site_users
returns fast, subsecond, only 3 rows

Can you show some code how you're doing the reads?
dse beeline
!connect ...
select * from site_users
--table has 3 rows, several columns in each row. Spark eunts 769 tasks and
estimates input as 80 TB

0: jdbc:hive2://dsenode01:1 select count(*) from site_users;

+--+

| _c0  |

+--+

| 3|

+--+

1 row selected (41.635 seconds)


Spark and Cassandra-connector

/usr/share/dse/spark/lib/spark-cassandra-connector-java_2.10-1.2.1.jar

/usr/share/dse/spark/lib/spark-cassandra-connector_2.10-1.2.1.jar

2015-06-17 13:52 GMT+02:00 Yana Kadiyska yana.kadiy...@gmail.com:

 Can you show some code how you're doing the reads? Have you successfully
 read other stuff from Cassandra (i.e. do you have a lot of experience with
 this path and this particular table is causing issues or are you trying to
 figure out the right way to do a read).

 What version of Spark and Cassandra-connector are you using?
 Also, what do you get for select count(*) from foo -- is that just as
 bad?

 On Wed, Jun 17, 2015 at 4:37 AM, Serega Sheypak serega.shey...@gmail.com
 wrote:

 Hi, can somebody suggest me the way to reduce quantity of task?

 2015-06-15 18:26 GMT+02:00 Serega Sheypak serega.shey...@gmail.com:

 Hi, I'm running spark sql against Cassandra table. I have 3 C* nodes,
 Each of them has spark worker.
 The problem is that spark runs 869 task to read 3 lines: select bar from
 foo.
 I've tried these properties:

 #try to avoid 769 tasks per dummy select foo from bar qeury
 spark.cassandra.input.split.size_in_mb=32mb
 spark.cassandra.input.fetch.size_in_rows=1000
 spark.cassandra.input.split.size=1

 but it doesn't help.

 Here are  mean metrics for the job :
 input1= 8388608.0 TB
 input2 = -320 B
 input3 = -400 B

 I'm confused with input, there are only 3 rows in C* table.
 Definitely, I don't have 8388608.0 TB of data :)








Re: spark sql and cassandra. spark generate 769 tasks to read 3 lines from cassandra table

2015-06-17 Thread Serega Sheypak
Hi, can somebody suggest me the way to reduce quantity of task?

2015-06-15 18:26 GMT+02:00 Serega Sheypak serega.shey...@gmail.com:

 Hi, I'm running spark sql against Cassandra table. I have 3 C* nodes, Each
 of them has spark worker.
 The problem is that spark runs 869 task to read 3 lines: select bar from
 foo.
 I've tried these properties:

 #try to avoid 769 tasks per dummy select foo from bar qeury
 spark.cassandra.input.split.size_in_mb=32mb
 spark.cassandra.input.fetch.size_in_rows=1000
 spark.cassandra.input.split.size=1

 but it doesn't help.

 Here are  mean metrics for the job :
 input1= 8388608.0 TB
 input2 = -320 B
 input3 = -400 B

 I'm confused with input, there are only 3 rows in C* table.
 Definitely, I don't have 8388608.0 TB of data :)






spark-sql estimates Cassandra table with 3 rows as 8 TB of data

2015-06-17 Thread Serega Sheypak
Hi, spark-sql estimated input for Cassandra table with 3 rows as 8 TB.
sometimes it's estimated as -167B.
I run it on laptop, I don't have 8 TB space for the data.


Re: spark sql and cassandra. spark generate 769 tasks to read 3 lines from cassandra table

2015-06-17 Thread Serega Sheypak
So, there is some input:

So the problem could be in spark-sql-thriftserver.
When I use spark console to submit SQL query, it takes 10 seconds and
reasonable count of tasks.

import com.datastax.spark.connector._;

val cc = new CassandraSQLContext(sc);

cc.sql(select su.user_id from appdata.site_users su join
appdata.user_orders uo on uo.user_id=su.user_id).count();

res8: Long = 2

If the same query submitted through beeline, it takes minutes and spark
creates up to 2000 tasks to read 3 lines of data.

We think spark-sql-thriftserver has bugs in it.

2015-06-17 14:14 GMT+02:00 Serega Sheypak serega.shey...@gmail.com:

 version
 We are on DSE 4.7. (Cassandra 2.1) and spark 1.2.1

 cqlsh
 select * from site_users
 returns fast, subsecond, only 3 rows

 Can you show some code how you're doing the reads?
 dse beeline
 !connect ...
 select * from site_users
 --table has 3 rows, several columns in each row. Spark eunts 769 tasks and
 estimates input as 80 TB

 0: jdbc:hive2://dsenode01:1 select count(*) from site_users;

 +--+

 | _c0  |

 +--+

 | 3|

 +--+

 1 row selected (41.635 seconds)


 Spark and Cassandra-connector

 /usr/share/dse/spark/lib/spark-cassandra-connector-java_2.10-1.2.1.jar

 /usr/share/dse/spark/lib/spark-cassandra-connector_2.10-1.2.1.jar

 2015-06-17 13:52 GMT+02:00 Yana Kadiyska yana.kadiy...@gmail.com:

 Can you show some code how you're doing the reads? Have you successfully
 read other stuff from Cassandra (i.e. do you have a lot of experience with
 this path and this particular table is causing issues or are you trying to
 figure out the right way to do a read).

 What version of Spark and Cassandra-connector are you using?
 Also, what do you get for select count(*) from foo -- is that just as
 bad?

 On Wed, Jun 17, 2015 at 4:37 AM, Serega Sheypak serega.shey...@gmail.com
  wrote:

 Hi, can somebody suggest me the way to reduce quantity of task?

 2015-06-15 18:26 GMT+02:00 Serega Sheypak serega.shey...@gmail.com:

 Hi, I'm running spark sql against Cassandra table. I have 3 C* nodes,
 Each of them has spark worker.
 The problem is that spark runs 869 task to read 3 lines: select bar
 from foo.
 I've tried these properties:

 #try to avoid 769 tasks per dummy select foo from bar qeury
 spark.cassandra.input.split.size_in_mb=32mb
 spark.cassandra.input.fetch.size_in_rows=1000
 spark.cassandra.input.split.size=1

 but it doesn't help.

 Here are  mean metrics for the job :
 input1= 8388608.0 TB
 input2 = -320 B
 input3 = -400 B

 I'm confused with input, there are only 3 rows in C* table.
 Definitely, I don't have 8388608.0 TB of data :)









spark sql and cassandra. spark generate 769 tasks to read 3 lines from cassandra table

2015-06-15 Thread Serega Sheypak
Hi, I'm running spark sql against Cassandra table. I have 3 C* nodes, Each
of them has spark worker.
The problem is that spark runs 869 task to read 3 lines: select bar from
foo.
I've tried these properties:

#try to avoid 769 tasks per dummy select foo from bar qeury
spark.cassandra.input.split.size_in_mb=32mb
spark.cassandra.input.fetch.size_in_rows=1000
spark.cassandra.input.split.size=1

but it doesn't help.

Here are  mean metrics for the job :
input1= 8388608.0 TB
input2 = -320 B
input3 = -400 B

I'm confused with input, there are only 3 rows in C* table.
Definitely, I don't have 8388608.0 TB of data :)


Re: Driver memory leak?

2015-04-29 Thread Serega Sheypak
The memory leak could be related to this
https://issues.apache.org/jira/browse/SPARK-5967 defect that was resolved
in Spark 1.2.2 and 1.3.0.
@Sean
Will it be backported to CDH? I did't find that bug in CDH 5.4 release
notes.

2015-04-29 14:51 GMT+02:00 Conor Fennell conor.fenn...@altocloud.com:

 The memory leak could be related to this
 https://issues.apache.org/jira/browse/SPARK-5967 defect that was
 resolved in Spark 1.2.2 and 1.3.0.

 It also was a HashMap causing the issue.

 -Conor



 On Wed, Apr 29, 2015 at 12:01 PM, Sean Owen so...@cloudera.com wrote:

 Please use user@, not dev@

 This message does not appear to be from your driver. It also doesn't say
 you ran out of memory. It says you didn't tell YARN to let it use the
 memory you want. Look at the memory overhead param and please search first
 for related discussions.
 On Apr 29, 2015 11:43 AM, wyphao.2007 wyphao.2...@163.com wrote:

 Hi, Dear developer, I am using Spark Streaming to read data from kafka,
 the program already run about 120 hours, but today the program failed
 because of driver's OOM as follow:

 Container [pid=49133,containerID=container_1429773909253_0050_02_01]
 is running beyond physical memory limits. Current usage: 2.5 GB of 2.5 GB
 physical memory used; 3.2 GB of 50 GB virtual memory used. Killing
 container.

 I set --driver-memory to 2g, In my mind, driver is responsibility for
 job scheduler and job monitor(Please correct me If I'm wrong), Why it using
 so much memory?

 So I using jmap to monitor other program(already run about 48 hours):
 sudo /home/q/java7/jdk1.7.0_45/bin/jmap -histo:live 31256, the result as
 follow:
 the java.util.HashMap$Entry and java.lang.Long  object using about 600Mb
 memory!

 and I also using jmap to monitor other program(already run about 1 hours
 ),  the result as follow:
 the java.util.HashMap$Entry and java.lang.Long object doesn't using so
 many memory, But I found, as time goes by, the java.util.HashMap$Entry
 and java.lang.Long object will occupied more and more memory,
 It is driver's memory leak question? or other reason?

 Thanks
 Best Regards













Re: history-server does't read logs which are on FS

2015-04-20 Thread Serega Sheypak
Thanks, it helped.
We can't use Spark 1.3 because Cassandra DSE doesn't support it.

2015-04-17 21:48 GMT+02:00 Imran Rashid iras...@cloudera.com:

 are you calling sc.stop() at the end of your applications?

 The history server only displays completed applications, but if you don't
 call sc.stop(), it doesn't know that those applications have been stopped.

 Note that in spark 1.3, the history server can also display running
 applications (including completed applications, but that it thinks are
 still running), which improves things a little bit.

 On Fri, Apr 17, 2015 at 10:13 AM, Serega Sheypak serega.shey...@gmail.com
  wrote:

 Hi, started history-server
 Here is UI output:


- *Event log directory:* file:/var/log/spark/applicationHistory/

 No completed applications found!

 Did you specify the correct logging directory? Please verify your setting
 of spark.history.fs.logDirectory and whether you have the permissions to
 access it.
 It is also possible that your application did not run to completion or
 did not stop the SparkContext.

 Spark 1.2.0

 I goto node where server runs and:

 ls -la /var/log/spark/applicationHistory/

 total 44

 drwxrwxrwx 11 root  root4096 Apr 17 14:50 .

 drwxrwxrwx  3 cassandra root4096 Apr 16 15:31 ..

 drwxrwxrwx  2 vagrant   vagrant 4096 Apr 17 10:06 app-20150417100630-

 drwxrwxrwx  2 vagrant   vagrant 4096 Apr 17 11:01 app-20150417110140-0001

 drwxrwxrwx  2 vagrant   vagrant 4096 Apr 17 11:12 app-20150417111216-0002

 drwxrwxrwx  2 vagrant   vagrant 4096 Apr 17 11:14 app-20150417111441-0003

 drwxrwx---  2 vagrant   vagrant 4096 Apr 17 11:20
 *app-20150417112028-0004*

 drwxrwx---  2 vagrant   vagrant 4096 Apr 17 14:17
 *app-20150417141733-0005*

 drwxrwx---  2 vagrant   vagrant 4096 Apr 17 14:32
 *app-20150417143237-0006*

 drwxrwx---  2 vagrant   vagrant 4096 Apr 17 14:49
 *app-20150417144902-0007*

 drwxrwx---  2 vagrant   vagrant 4096 Apr 17 14:50
 *app-20150417145025-0008*


 So there are logs, but history-server doesn't want to display them.

 I've checked workers, they are pointed to that dir also, I run app, I see
 new log.


 Here is history-server log output:

 vagrant@dsenode01:/usr/lib/spark/logs$ cat
 spark-root-org.apache.spark.deploy.history.HistoryServer-1-dsenode01.out

 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath

 Spark Command: java -cp
 ::/usr/lib/spark/sbin/../conf:/usr/lib/spark/lib/spark-assembly-1.2.0-hadoop2.4.0.jar:/usr/lib/spark/lib/datanucleus-api-jdo-3.2.6.jar:/usr/lib/spark/lib/datanucleus-rdbms-3.2.9.jar:/usr/lib/spark/lib/datanucleus-core-3.2.10.jar
 -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true
 -Dspark.history.fs.logDirectory=/var/log/spark/applicationHistory
 -Dspark.eventLog.enabled=true -Xms512m -Xmx512m
 org.apache.spark.deploy.history.HistoryServer

 


 Using Spark's default log4j profile:
 org/apache/spark/log4j-defaults.properties

 15/04/17 09:55:21 INFO HistoryServer: Registered signal handlers for
 [TERM, HUP, INT]

 15/04/17 09:55:21 INFO SecurityManager: Changing view acls to: root

 15/04/17 09:55:21 INFO SecurityManager: Changing modify acls to: root

 15/04/17 09:55:21 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(root); users
 with modify permissions: Set(root)

 15/04/17 09:55:22 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable

 15/04/17 09:55:24 INFO Utils: Successfully started service on port 18080.

 15/04/17 09:55:24 INFO HistoryServer: Started HistoryServer at
 http://dsenode01:18080


 What could be wrong with it?





Spark 1.2, trying to run spark-history as a service, spark-defaults.conf are ignored

2015-04-14 Thread Serega Sheypak
Here is related problem:
http://apache-spark-user-list.1001560.n3.nabble.com/Launching-history-server-problem-td12574.html

but no answer.
What I'm trying to do: wrap spark-history with /etc/init.d script
Problems I have: can't make it read spark-defaults.conf
I've put this file here:
/etc/spark/conf
/usr/lib/spark/conf where /usr/lib/spark is locaition for spark
no luck.

spark-history tries to use default value for applications log location, it
doesn't read specified value  from  spark-defaults.conf