Re: LIVY VS Spark Job Server

2016-09-15 Thread Vadim Semenov
I have experience with both Livy & spark-jobserver.

spark-jobserver gives you better API, particularly, if you want to work
within a single spark context.

Livy supports submitting python & R code while spark-jobserver doesn't
support it.

spark-jobserver code is more complex, it actively uses Akka, and for me it
was difficult to debug and add additional features to it. Livy's
architecture is simpler in this sense.
I tried to add `yarn-cluster` support to spark-jobserver but got lost
halfway-through while Livy supports yarn-cluster mode out of the box and it
was easy for me to add missing features to Livy.

I'm not going into more details, but I'm ready to answer some specific
questions about usage of both Livy & spark-jobserver.

On Wed, Sep 14, 2016 at 3:32 PM, SamyaMaiti 
wrote:

> Hi Team,
>
> I am evaluating different ways to submit & monitor spark Jobs using REST
> Interfaces.
>
> When to use Livy vs Spark Job Server?
>
> Regards,
> Sam
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/LIVY-VS-Spark-Job-Server-tp27722.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: using SparkILoop.run

2016-09-26 Thread Vadim Semenov
Add "-Dspark.master=local[*]" to the VM properties of your test run.

On Mon, Sep 26, 2016 at 2:25 PM, Mohit Jaggi  wrote:

> I want to use the following API  SparkILoop.run(...). I am writing a test
> case as that passes some scala code to spark interpreter and receives
> result as string.
>
> I couldn't figure out how to pass the right settings into the run()
> method. I get an error about "master' not being set.
>
> object SparkILoop {
>
>   /**
>* Creates an interpreter loop with default settings and feeds
>* the given code to it as input.
>*/
>   def run(code: String, sets: Settings = new Settings): String = {
> import java.io.{ BufferedReader, StringReader, OutputStreamWriter }
>
> stringFromStream { ostream =>
>   Console.withOut(ostream) {
> val input = new BufferedReader(new StringReader(code))
> val output = new JPrintWriter(new OutputStreamWriter(ostream), true)
> val repl = new SparkILoop(input, output)
>
> if (sets.classpath.isDefault) {
>   sets.classpath.value = sys.props("java.class.path")
> }
> repl process sets
>   }
> }
>   }
>   def run(lines: List[String]): String = run(lines.map(_ + "\n").mkString)
> }
>
>


Dynamically change executors settings

2016-08-26 Thread Vadim Semenov
Hi spark users,

I wonder if it's possible to change executors settings on-the-fly.
I have the following use-case: I have a lot of non-splittable skewed files
in a custom format that I read using a custom Hadoop RecordReader. These
files can be small & huge and I'd like to use only one-two cores per
executor while they get processed (to use the whole heap). But once they
got processed I'd like to enable all cores.
I know that I can achieve this by splitting it into two separate jobs but I
wonder if it's possible to somehow achieve the behavior I described.

Thanks!


Re: Restful WS for Spark

2016-09-30 Thread Vadim Semenov
There're two REST job servers that work with spark:

https://github.com/spark-jobserver/spark-jobserver

https://github.com/cloudera/livy


On Fri, Sep 30, 2016 at 2:07 PM, ABHISHEK  wrote:

> Hello all,
> Have you tried accessing Spark application using Restful  web-services?
>
> I have requirement where remote user submit the  request with some data,
> it should be sent to Spark and job should run in Hadoop cluster mode.
> Output should be sent back to user.
>
> Please share your  expertise.
> Thanks,
> Abhishek
>


Re: DataFrame Sort gives Cannot allocate a page with more than 17179869176 bytes

2016-09-30 Thread Vadim Semenov
Can you post the whole exception stack trace?
What are your executor memory settings?

Right now I assume that it happens in UnsafeExternalRowSorter ->
UnsafeExternalSorter:insertRecord

Running more executors with lower `spark.executor.memory` should help.


On Fri, Sep 30, 2016 at 12:57 PM, Babak Alipour 
wrote:

> Greetings everyone,
>
> I'm trying to read a single field of a Hive table stored as Parquet in
> Spark (~140GB for the entire table, this single field should be just a few
> GB) and look at the sorted output using the following:
>
> sql("SELECT " + field + " FROM MY_TABLE ORDER BY " + field + " DESC")
>
> ​But this simple line of code gives:
>
> Caused by: java.lang.IllegalArgumentException: Cannot allocate a page
> with more than 17179869176 bytes
>
> Same error for:
>
> sql("SELECT " + field + " FROM MY_TABLE).sort(field)
>
> and:
>
> sql("SELECT " + field + " FROM MY_TABLE).orderBy(field)
>
>
> I'm running this on a machine with more than 200GB of RAM, running in
> local mode with spark.driver.memory set to 64g.
>
> I do not know why it cannot allocate a big enough page, and why is it
> trying to allocate such a big page in the first place?
>
> I hope someone with more knowledge of Spark can shed some light on this.
> Thank you!
>
>
> *​Best regards,​*
> *Babak Alipour ,*
> *University of Florida*
>


Re: DataFrame Sort gives Cannot allocate a page with more than 17179869176 bytes

2016-09-30 Thread Vadim Semenov
Run more smaller executors: change `spark.executor.memory` to 32g and
`spark.executor.cores` to 2-4, for example.

Changing driver's memory won't help because it doesn't participate in
execution.

On Fri, Sep 30, 2016 at 2:58 PM, Babak Alipour <babak.alip...@gmail.com>
wrote:

> Thank you for your replies.
>
> @Mich, using LIMIT 100 in the query prevents the exception but given the
> fact that there's enough memory, I don't think this should happen even
> without LIMIT.
>
> @Vadim, here's the full stack trace:
>
> Caused by: java.lang.IllegalArgumentException: Cannot allocate a page
> with more than 17179869176 bytes
> at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskM
> emoryManager.java:241)
> at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryCo
> nsumer.java:121)
> at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalS
> orter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:374)
> at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalS
> orter.insertRecord(UnsafeExternalSorter.java:396)
> at org.apache.spark.sql.execution.UnsafeExternalRowSorter.inser
> tRow(UnsafeExternalRowSorter.java:94)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen
> eratedIterator.sort_addToSorter$(Unknown Source)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen
> eratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen
> eratedIterator.processNext(Unknown Source)
> at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(
> BufferedRowIterator.java:43)
> at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfu
> n$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.w
> rite(BypassMergeSortShuffleWriter.java:125)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
> Task.scala:79)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
> Task.scala:47)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.
> scala:274)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
> Executor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
> lExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> I'm running spark in local mode so there is only one executor, the driver
> and spark.driver.memory is set to 64g. Changing the driver's memory doesn't
> help.
>
> *Babak Alipour ,*
> *University of Florida*
>
> On Fri, Sep 30, 2016 at 2:05 PM, Vadim Semenov <
> vadim.seme...@datadoghq.com> wrote:
>
>> Can you post the whole exception stack trace?
>> What are your executor memory settings?
>>
>> Right now I assume that it happens in UnsafeExternalRowSorter ->
>> UnsafeExternalSorter:insertRecord
>>
>> Running more executors with lower `spark.executor.memory` should help.
>>
>>
>> On Fri, Sep 30, 2016 at 12:57 PM, Babak Alipour <babak.alip...@gmail.com>
>> wrote:
>>
>>> Greetings everyone,
>>>
>>> I'm trying to read a single field of a Hive table stored as Parquet in
>>> Spark (~140GB for the entire table, this single field should be just a few
>>> GB) and look at the sorted output using the following:
>>>
>>> sql("SELECT " + field + " FROM MY_TABLE ORDER BY " + field + " DESC")
>>>
>>> ​But this simple line of code gives:
>>>
>>> Caused by: java.lang.IllegalArgumentException: Cannot allocate a page
>>> with more than 17179869176 bytes
>>>
>>> Same error for:
>>>
>>> sql("SELECT " + field + " FROM MY_TABLE).sort(field)
>>>
>>> and:
>>>
>>> sql("SELECT " + field + " FROM MY_TABLE).orderBy(field)
>>>
>>>
>>> I'm running this on a machine with more than 200GB of RAM, running in
>>> local mode with spark.driver.memory set to 64g.
>>>
>>> I do not know why it cannot allocate a big enough page, and why is it
>>> trying to allocate such a big page in the first place?
>>>
>>> I hope someone with more knowledge of Spark can shed some light on this.
>>> Thank you!
>>>
>>>
>>> *​Best regards,​*
>>> *Babak Alipour ,*
>>> *University of Florida*
>>>
>>
>>
>


Re: get different results when debugging and running scala program

2016-10-01 Thread Vadim Semenov
The question has no connection to spark.

In future, if you use apache mailing lists, use external services to add
screenshots and make sure that your code is formatted so other members'd be
able to read it.

On Fri, Sep 30, 2016 at 11:25 AM, chen yong  wrote:

> Hello All,
>
>
>
> I am using IDEA 15.0.4 to debug a scala program. It is strange to me that
> the results were different when I debug or run the program. The differences
> can be seen in the attached filed run.jpg and debug.jpg. The code lines
> of the scala program are shown below.
>
>
> Thank you all
>
>
> ---
>
> import scala.collection.mutable.ArrayBuffer
>
> object TestCase1{
> def func(test:Iterator[(Int,Long)]): Iterator[(Int,Long)]={
> println("in")
> val test1=test.flatmap{
> case(item,count)=>
> val newPrefix=item
> println(count)
> val a=Iterator.single((newPrefix,count))
> func(a)
> val c = a
> c
> }
> test1
> }
> def main(args: Array[String]){
> val freqItems = ArrayBuffer((2,3L),(3,2L),(4,1L))
> val test = freqItems.toIterator
> val result = func(test)
> val reer = result.toArray
> }
> }
>
>
>
>


Re: Spark on yarn enviroment var

2016-10-01 Thread Vadim Semenov
The question should be addressed to the oozie community.

As far as I remember, a spark action doesn't have support of env variables.

On Fri, Sep 30, 2016 at 8:11 PM, Saurabh Malviya (samalviy) <
samal...@cisco.com> wrote:

> Hi,
>
>
>
> I am running spark on yarn using oozie.
>
>
>
> When submit through command line using spark-submit spark is able to read
> env variable.  But while submit through oozie its not able toget env
> variable and don’t see driver log.
>
>
>
> Is there any way we specify env variable in oozie spark action.
>
>
>
> Saurabh
>


Re: DataFrame Sort gives Cannot allocate a page with more than 17179869176 bytes

2016-10-01 Thread Vadim Semenov
oh, and try to run even smaller executors, i.e. with
`spark.executor.memory` <= 16GiB. I wonder what result you're going to get.

On Sun, Oct 2, 2016 at 1:24 AM, Vadim Semenov <vadim.seme...@datadoghq.com>
wrote:

> > Do you mean running a multi-JVM 'cluster' on the single machine?
> Yes, that's what I suggested.
>
> You can get some information here:
> http://blog.cloudera.com/blog/2015/03/how-to-tune-your-
> apache-spark-jobs-part-2/
>
> > How would that affect performance/memory-consumption? If a multi-JVM
> setup can handle such a large input, then why can't a single-JVM break down
> the job into smaller tasks?
> I don't have an answer to these questions, it requires understanding of
> Spark, JVM, and your setup internal.
>
> I ran into the same issue only once when I tried to read a gzipped file
> which size was >16GiB. That's the only time I had to meet this
> https://github.com/apache/spark/blob/5d84c7fd83502aeb551d46a740502d
> b4862508fe/core/src/main/java/org/apache/spark/memory/
> TaskMemoryManager.java#L238-L243
> In the end I had to recompress my file into bzip2 that is splittable to be
> able to read it with spark.
>
>
> I'd look into size of your files and if they're huge I'd try to connect
> the error you got to the size of the files (but it's strange to me as a
> block size of a Parquet file is 128MiB). I don't have any other
> suggestions, I'm sorry.
>
>
> On Sat, Oct 1, 2016 at 11:35 PM, Babak Alipour <babak.alip...@gmail.com>
> wrote:
>
>> Do you mean running a multi-JVM 'cluster' on the single machine? How
>> would that affect performance/memory-consumption? If a multi-JVM setup
>> can handle such a large input, then why can't a single-JVM break down the
>> job into smaller tasks?
>>
>> I also found that SPARK-9411 mentions making the page_size configurable
>> but it's hard-limited to ((1L << 31) - 1) * 8L [1]
>>
>> [1] https://github.com/apache/spark/blob/master/core/src/main/
>> java/org/apache/spark/memory/TaskMemoryManager.java
>>
>> ​Spark-9452 also talks about larger page sizes but I don't know how that
>> affects my use case.​ [2]
>>
>> [2] https://github.com/apache/spark/pull/7891
>>
>>
>> ​The reason provided here is that the on-heap allocator's maximum page
>> size is limited by the maximum amount of data that can be stored in a
>> long[]​.
>> Is it possible to force this specific operation to go off-heap so that it
>> can possibly use a bigger page size?
>>
>>
>>
>> ​>Babak​
>>
>>
>> *Babak Alipour ,*
>> *University of Florida*
>>
>> On Fri, Sep 30, 2016 at 3:03 PM, Vadim Semenov <
>> vadim.seme...@datadoghq.com> wrote:
>>
>>> Run more smaller executors: change `spark.executor.memory` to 32g and
>>> `spark.executor.cores` to 2-4, for example.
>>>
>>> Changing driver's memory won't help because it doesn't participate in
>>> execution.
>>>
>>> On Fri, Sep 30, 2016 at 2:58 PM, Babak Alipour <babak.alip...@gmail.com>
>>> wrote:
>>>
>>>> Thank you for your replies.
>>>>
>>>> @Mich, using LIMIT 100 in the query prevents the exception but given
>>>> the fact that there's enough memory, I don't think this should happen even
>>>> without LIMIT.
>>>>
>>>> @Vadim, here's the full stack trace:
>>>>
>>>> Caused by: java.lang.IllegalArgumentException: Cannot allocate a page
>>>> with more than 17179869176 bytes
>>>> at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskM
>>>> emoryManager.java:241)
>>>> at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryCo
>>>> nsumer.java:121)
>>>> at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalS
>>>> orter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:374)
>>>> at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalS
>>>> orter.insertRecord(UnsafeExternalSorter.java:396)
>>>> at org.apache.spark.sql.execution.UnsafeExternalRowSorter.inser
>>>> tRow(UnsafeExternalRowSorter.java:94)
>>>> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen
>>>> eratedIterator.sort_addToSorter$(Unknown Source)
>>>> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen
>>>> eratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
>>>> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen
>>>> eratedIterator.proce

Re: Restful WS for Spark

2016-10-01 Thread Vadim Semenov
I worked with both, so I'll give you some insight from my perspective.

spark-jobserver has stable API and overall mature but doesn't work with
yarn-cluster mode and python support is in-development right now.

Livy has stable API (but I'm not sure if I can speak for it since it has
appeared recently but considering that Cloudera is behind of it, I'd say
it's mature), supports all deployment modes and has support for python & R.

spark-jobserver has nicer UI and better features overall for logging and
richer API.

I had hard time adding more features to spark-jobserver than into Livy, so
if it's something you would need to do you should consider that.

In some cases, spark-jobserver runs into issues that are difficult to debug
because of Akka.

Spark-jobserver is better if you use Scala and you need to work with shared
spark contexts as it has built-in API for shared RDDs and objects.

Both don't support High-Availability, but Livy has few open active PRs.

I'd start with spark-jobserver as it's easier.

On Sat, Oct 1, 2016 at 8:46 AM, ABHISHEK <abhi...@gmail.com> wrote:

> Thanks Vadim.
> I looked on Spark job server but not sure about session management, will
> job run in  Hadoop cluster ?
> How stable is this API as we will need to implement it in production env.
> Livy looks more promising but still need not matured.
> Have you tested any of them ?
>
> Thanks,
> Abhishek
> Abhishek
>
>
> On Fri, Sep 30, 2016 at 11:39 PM, Vadim Semenov <
> vadim.seme...@datadoghq.com> wrote:
>
>> There're two REST job servers that work with spark:
>>
>> https://github.com/spark-jobserver/spark-jobserver
>>
>> https://github.com/cloudera/livy
>>
>>
>> On Fri, Sep 30, 2016 at 2:07 PM, ABHISHEK <abhi...@gmail.com> wrote:
>>
>>> Hello all,
>>> Have you tried accessing Spark application using Restful  web-services?
>>>
>>> I have requirement where remote user submit the  request with some data,
>>> it should be sent to Spark and job should run in Hadoop cluster mode.
>>> Output should be sent back to user.
>>>
>>> Please share your  expertise.
>>> Thanks,
>>> Abhishek
>>>
>>
>>
>


Re: DataFrame Sort gives Cannot allocate a page with more than 17179869176 bytes

2016-10-01 Thread Vadim Semenov
> Do you mean running a multi-JVM 'cluster' on the single machine?
Yes, that's what I suggested.

You can get some information here:
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

> How would that affect performance/memory-consumption? If a multi-JVM
setup can handle such a large input, then why can't a single-JVM break down
the job into smaller tasks?
I don't have an answer to these questions, it requires understanding of
Spark, JVM, and your setup internal.

I ran into the same issue only once when I tried to read a gzipped file
which size was >16GiB. That's the only time I had to meet this
https://github.com/apache/spark/blob/5d84c7fd83502aeb551d46a740502db4862508fe/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java#L238-L243
In the end I had to recompress my file into bzip2 that is splittable to be
able to read it with spark.


I'd look into size of your files and if they're huge I'd try to connect the
error you got to the size of the files (but it's strange to me as a block
size of a Parquet file is 128MiB). I don't have any other suggestions, I'm
sorry.


On Sat, Oct 1, 2016 at 11:35 PM, Babak Alipour <babak.alip...@gmail.com>
wrote:

> Do you mean running a multi-JVM 'cluster' on the single machine? How would
> that affect performance/memory-consumption? If a multi-JVM setup can
> handle such a large input, then why can't a single-JVM break down the job
> into smaller tasks?
>
> I also found that SPARK-9411 mentions making the page_size configurable
> but it's hard-limited to ((1L << 31) - 1) * 8L [1]
>
> [1] https://github.com/apache/spark/blob/master/core/src/
> main/java/org/apache/spark/memory/TaskMemoryManager.java
>
> ​Spark-9452 also talks about larger page sizes but I don't know how that
> affects my use case.​ [2]
>
> [2] https://github.com/apache/spark/pull/7891
>
>
> ​The reason provided here is that the on-heap allocator's maximum page
> size is limited by the maximum amount of data that can be stored in a
> long[]​.
> Is it possible to force this specific operation to go off-heap so that it
> can possibly use a bigger page size?
>
>
>
> ​>Babak​
>
>
> *Babak Alipour ,*
> *University of Florida*
>
> On Fri, Sep 30, 2016 at 3:03 PM, Vadim Semenov <
> vadim.seme...@datadoghq.com> wrote:
>
>> Run more smaller executors: change `spark.executor.memory` to 32g and
>> `spark.executor.cores` to 2-4, for example.
>>
>> Changing driver's memory won't help because it doesn't participate in
>> execution.
>>
>> On Fri, Sep 30, 2016 at 2:58 PM, Babak Alipour <babak.alip...@gmail.com>
>> wrote:
>>
>>> Thank you for your replies.
>>>
>>> @Mich, using LIMIT 100 in the query prevents the exception but given the
>>> fact that there's enough memory, I don't think this should happen even
>>> without LIMIT.
>>>
>>> @Vadim, here's the full stack trace:
>>>
>>> Caused by: java.lang.IllegalArgumentException: Cannot allocate a page
>>> with more than 17179869176 bytes
>>> at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskM
>>> emoryManager.java:241)
>>> at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryCo
>>> nsumer.java:121)
>>> at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalS
>>> orter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:374)
>>> at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalS
>>> orter.insertRecord(UnsafeExternalSorter.java:396)
>>> at org.apache.spark.sql.execution.UnsafeExternalRowSorter.inser
>>> tRow(UnsafeExternalRowSorter.java:94)
>>> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen
>>> eratedIterator.sort_addToSorter$(Unknown Source)
>>> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen
>>> eratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
>>> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen
>>> eratedIterator.processNext(Unknown Source)
>>> at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(B
>>> ufferedRowIterator.java:43)
>>> at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfu
>>> n$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:40
>>> 8)
>>> at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.w
>>> rite(BypassMergeSortShuffleWriter.java:125)
>>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>>> T

Re: Live data visualisations with Spark

2016-11-08 Thread Vadim Semenov
Take a look at https://zeppelin.apache.org

On Tue, Nov 8, 2016 at 11:13 AM, Andrew Holway <
andrew.hol...@otternetworks.de> wrote:

> Hello,
>
> A colleague and I are trying to work out the best way to provide live data
> visualisations based on Spark. Is it possible to explore a dataset in spark
> from a web browser? Set up pre defined functions that the user can click on
> which return datsets.
>
> We are using a lot of R here. Is this something that could be accomplished
> with shiny server for instance?
>
> Thanks,
>
> Andrew Holway
>


Re: java.lang.OutOfMemoryError: unable to create new native thread

2016-10-31 Thread Vadim Semenov
Have you tried to get number of threads in a running process using `cat
/proc//status` ?

On Sun, Oct 30, 2016 at 11:04 PM, kant kodali  wrote:

> yes I did run ps -ef | grep "app_name" and it is root.
>
>
>
> On Sun, Oct 30, 2016 at 8:00 PM, Chan Chor Pang 
> wrote:
>
>> sorry, the UID
>>
>> On 10/31/16 11:59 AM, Chan Chor Pang wrote:
>>
>> actually if the max user processes is not the problem, i have no idea
>>
>> but i still suspecting the user,
>> as the user who run spark-submit is not necessary the pid for the JVM
>> process
>>
>> can u make sure when you "ps -ef | grep {your app id} " the PID is root?
>> On 10/31/16 11:21 AM, kant kodali wrote:
>>
>> The java process is run by the root and it has the same config
>>
>> sudo -i
>>
>> ulimit -a
>>
>> core file size  (blocks, -c) 0
>> data seg size   (kbytes, -d) unlimited
>> scheduling priority (-e) 0
>> file size   (blocks, -f) unlimited
>> pending signals (-i) 120242
>> max locked memory   (kbytes, -l) 64
>> max memory size (kbytes, -m) unlimited
>> open files  (-n) 1024
>> pipe size(512 bytes, -p) 8
>> POSIX message queues (bytes, -q) 819200
>> real-time priority  (-r) 0
>> stack size  (kbytes, -s) 8192
>> cpu time   (seconds, -t) unlimited
>> max user processes  (-u) 120242
>> virtual memory  (kbytes, -v) unlimited
>> file locks  (-x) unlimited
>>
>>
>>
>> On Sun, Oct 30, 2016 at 7:01 PM, Chan Chor Pang 
>> wrote:
>>
>>> I have the same Exception before and the problem fix after i change the
>>> nproc conf.
>>>
>>> > max user processes  (-u) 120242
>>> ↑this config does looks good.
>>> are u sure the user who run ulimit -a is the same user who run the Java
>>> process?
>>> depend on how u submit the job and your setting, spark job may execute
>>> by other user.
>>>
>>>
>>> On 10/31/16 10:38 AM, kant kodali wrote:
>>>
>>> when I did this
>>>
>>> cat /proc/sys/kernel/pid_max
>>>
>>> I got 32768
>>>
>>> On Sun, Oct 30, 2016 at 6:36 PM, kant kodali  wrote:
>>>
 I believe for ubuntu it is unlimited but I am not 100% sure (I just
 read somewhere online). I ran ulimit -a and this is what I get

 core file size  (blocks, -c) 0
 data seg size   (kbytes, -d) unlimited
 scheduling priority (-e) 0
 file size   (blocks, -f) unlimited
 pending signals (-i) 120242
 max locked memory   (kbytes, -l) 64
 max memory size (kbytes, -m) unlimited
 open files  (-n) 1024
 pipe size(512 bytes, -p) 8
 POSIX message queues (bytes, -q) 819200
 real-time priority  (-r) 0
 stack size  (kbytes, -s) 8192
 cpu time   (seconds, -t) unlimited
 max user processes  (-u) 120242
 virtual memory  (kbytes, -v) unlimited
 file locks  (-x) unlimited

 On Sun, Oct 30, 2016 at 6:15 PM, Chan Chor Pang  wrote:

> not sure for ubuntu, but i think you can just create the file by
> yourself
> the syntax will be the same as /etc/security/limits.conf
>
> nproc.conf not only limit java process but all process by the same user
> so even the jvm process does nothing,  if the corresponding user is
> busy in other way
> the jvm process will still not able to create new thread.
>
> btw the default limit for centos is 1024
>
>
> On 10/31/16 9:51 AM, kant kodali wrote:
>
>
> On Sun, Oct 30, 2016 at 5:22 PM, Chan Chor Pang <
> chin...@indetail.co.jp> wrote:
>
>> /etc/security/limits.d/90-nproc.conf
>>
>
> Hi,
>
> I am using Ubuntu 16.04 LTS. I have this directory
> /etc/security/limits.d/ but I don't have any files underneath it. This
> error happens after running for 4 to 5 hours. I wonder if this is a GC
> issue? And I am thinking if I should use CMS. I have also posted this on 
> SO
> since I havent got much response for this question
> http://stackoverflow.com/questions/40315589/dag-sch
> eduler-event-loop-java-lang-outofmemoryerror-unable-to-creat
> e-new-native
>
>
> Thanks,
> kant
>
>
> --
> ---**---*---*---*---
> 株式会社INDETAIL
> ニアショア総合サービス事業本部
> ゲームサービス事業部
> 陳 楚鵬
> E-mail :chin...@indetail.co.jp
> URL : http://www.indetail.co.jp
>
> 【札幌本社/LABO/LABO2】
> 〒060-0042
> 札幌市中央区大通西9丁目3番地33
> キタコーセンタービルディング
> (札幌本社/LABO2:2階、LABO:9階)
> TEL:011-206-9235 FAX:011-206-9236
>
> 【東京支店】
> 〒108-0014
> 東京都港区芝5丁目29番20号 クロスオフィス三田
> TEL:03-6809-6502 FAX:03-6809-6504
>
> 

Re: How to avoid unnecessary spark starkups on every request?

2016-11-02 Thread Vadim Semenov
Take a look at https://github.com/spark-jobserver/spark-jobserver or
https://github.com/cloudera/livy

you can launch a persistent spark context and then submit your jobs using a
already running context

On Wed, Nov 2, 2016 at 3:34 AM, Fanjin Zeng 
wrote:

>  Hi,
>
>  I am working on a project that takes requests from HTTP server and
> computes accordingly on spark. But the problem is when I receive many
> request at the same time, users have to waste a lot of time on the
> unnecessary startups that occur on each request. Does Spark have built-in
> job scheduler function to solve this problem or is there any trick can be
> used to avoid these unnecessary startups?
>
>  Thanks,
>  Fanjin
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Any Dynamic Compilation of Scala Query

2016-10-26 Thread Vadim Semenov
You can use Cloudera Livy for that https://github.com/cloudera/livy
take a look at this example https://github.com/cloudera/livy#spark-example

On Wed, Oct 26, 2016 at 4:35 AM, Mahender Sarangam <
mahender.bigd...@outlook.com> wrote:

> Hi,
>
> Is there any way to dynamically execute a string  which has scala code
> against spark engine. We are dynamically creating scala file, we would
> like to submit this scala file to Spark, but currently spark accepts
> only JAR file has input from Remote Job submission. Is there any other
> way to submit .SCALA instead of .JAR to REST API of Spark ?
>
> /MS
>
>


Re: Livy with Spark

2016-12-05 Thread Vadim Semenov
You mean share a single spark context across multiple jobs?

https://github.com/spark-jobserver/spark-jobserver does the same

On Mon, Dec 5, 2016 at 9:33 AM, Mich Talebzadeh 
wrote:

> Hi,
>
> Has there been any experience using Livy with Spark to share multiple
> Spark contexts?
>
> thanks
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Spark kryo serialization register Datatype[]

2016-12-21 Thread Vadim Semenov
to enable kryo serializer you just need to pass
`spark.serializer=org.apache.spark.serializer.KryoSerializer`

the `spark.kryo.registrationRequired` controls the following behavior:

Whether to require registration with Kryo. If set to 'true', Kryo will
> throw an exception if an unregistered class is serialized. If set to false
> (the default), Kryo will write unregistered class names along with each
> object. Writing class names can cause significant performance overhead, so
> enabling this option can enforce strictly that a user has not omitted
> classes from registration.


as described here http://spark.apache.org/docs/latest/configuration.html

if it's set to `true` you need to manually register classes as described
here: http://spark.apache.org/docs/latest/tuning.html#data-serialization


On Wed, Dec 21, 2016 at 8:49 AM, geoHeil  wrote:

> To force spark to use kryo serialization I set
> spark.kryo.registrationRequired to true.
>
> Now spark complains that: Class is not registered:
> org.apache.spark.sql.types.DataType[] is not registered.
> How can I fix this? So far I could not successfully register this class.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Spark-kryo-serialization-register-
> Datatype-tp28243.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Launching multiple spark jobs within a main spark job.

2016-12-21 Thread Vadim Semenov
Check the source code for SparkLauncher:
https://github.com/apache/spark/blob/master/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java#L541

a separate process will be started using `spark-submit` and if it uses
`yarn-cluster` mode, a driver may be launched on another NodeManager or may
be launched on the same NodeManager, so you would need to work around it if
you want to avoid hot spots.

On Wed, Dec 21, 2016 at 8:19 AM, Naveen  wrote:

> Thanks Liang!
> I get your point. It would mean that when launching spark jobs, mode needs
> to be specified as client for all spark jobs.
> However, my concern is to know if driver's memory(which is launching spark
> jobs) will be used completely by the Future's(sparkcontext's) or these
> spawned sparkcontexts will get different nodes / executors from resource
> manager?
>
> On Wed, Dec 21, 2016 at 6:43 PM, Naveen  wrote:
>
>> Hi Sebastian,
>>
>> Yes, for fetching the details from Hive and HBase, I would want to use
>> Spark's HiveContext etc.
>> However, based on your point, I might have to check if JDBC based driver
>> connection could be used to do the same.
>>
>> Main reason for this is to avoid a client-server architecture design.
>>
>> If we go by a normal scala app without creating a sparkcontext as per
>> your suggestion, then
>> 1. it turns out to be a client program on cluster on a single node, and
>> for any multiple invocation through xyz scheduler , it will be invoked
>> always from that same node
>> 2. Having client program on a single data node might create a hotspot for
>> that data node which might create a bottleneck as all invocations might
>> create JVMs on that node itself.
>> 3. With above, we will loose the Spark on YARN's feature of dynamically
>> allocating a driver on any available data node through RM and NM
>> co-ordination. With YARN and Cluster mode of invoking a spark-job, it will
>> help distribute multiple application(main one) in cluster uniformly.
>>
>> Thanks and please let me know your views.
>>
>>
>> On Wed, Dec 21, 2016 at 5:43 PM, Sebastian Piu 
>> wrote:
>>
>>> Is there any reason you need a context on the application launching the
>>> jobs?
>>> You can use SparkLauncher in a normal app and just listen for state
>>> transitions
>>>
>>> On Wed, 21 Dec 2016, 11:44 Naveen,  wrote:
>>>
 Hi Team,

 Thanks for your responses.
 Let me give more details in a picture of how I am trying to launch jobs.

 Main spark job will launch other spark-job similar to calling multiple
 spark-submit within a Spark driver program.
 These spawned threads for new jobs will be totally different
 components, so these cannot be implemented using spark actions.

 sample code:

 -

 Object Mainsparkjob {

 main(...){

 val sc=new SparkContext(..)

 Fetch from hive..using hivecontext
 Fetch from hbase

 //spawning multiple Futures..
 Val future1=Future{
 Val sparkjob= SparkLauncher(...).launch; spark.waitFor
 }

 Similarly, future2 to futureN.

 future1.onComplete{...}
 }

 }// end of mainsparkjob
 --


 [image: Inline image 1]

 On Wed, Dec 21, 2016 at 3:13 PM, David Hodeffi <
 david.hode...@niceactimize.com> wrote:

 I am not familiar of any problem with that.

 Anyway, If you run spark applicaction you would have multiple jobs,
 which makes sense that it is not a problem.



 Thanks David.



 *From:* Naveen [mailto:hadoopst...@gmail.com]
 *Sent:* Wednesday, December 21, 2016 9:18 AM
 *To:* d...@spark.apache.org; user@spark.apache.org
 *Subject:* Launching multiple spark jobs within a main spark job.



 Hi Team,



 Is it ok to spawn multiple spark jobs within a main spark job, my main
 spark job's driver which was launched on yarn cluster, will do some
 preprocessing and based on it, it needs to launch multilple spark jobs on
 yarn cluster. Not sure if this right pattern.



 Please share your thoughts.

 Sample code i ve is as below for better understanding..

 -



 Object Mainsparkjob {



 main(...){



 val sc=new SparkContext(..)



 Fetch from hive..using hivecontext

 Fetch from hbase



 //spawning multiple Futures..

 Val future1=Future{

 Val sparkjob= SparkLauncher(...).launch; spark.waitFor

 }



 Similarly, future2 to futureN.



 future1.onComplete{...}

 }



 }// end of mainsparkjob

 --


 Confidentiality: This communication and any attachments are intended
 

Re: NoClassDefFoundError

2016-12-21 Thread Vadim Semenov
You better ask folks in the spark-jobserver gitter channel:
https://github.com/spark-jobserver/spark-jobserver

On Wed, Dec 21, 2016 at 8:02 AM, Reza zade  wrote:

> Hello
>
> I've extended the JavaSparkJob (job-server-0.6.2) and created an object
> of SQLContext class. my maven project doesn't have any problem during
> compile and packaging phase. but when I send .jar of project to sjs and run
> it "NoClassDefFoundError" will be issued. the trace of exception is :
>
>
> job-server[ERROR] Exception in thread "pool-20-thread-1"
> java.lang.NoClassDefFoundError: org/apache/spark/sql/SQLContext
> job-server[ERROR]  at sparkdesk.SparkSQLJob2.runJob(SparkSQLJob2.java:61)
> job-server[ERROR]  at sparkdesk.SparkSQLJob2.runJob(SparkSQLJob2.java:45)
> job-server[ERROR]  at spark.jobserver.JavaSparkJob.r
> unJob(JavaSparkJob.scala:17)
> job-server[ERROR]  at spark.jobserver.JavaSparkJob.r
> unJob(JavaSparkJob.scala:14)
> job-server[ERROR]  at spark.jobserver.JobManagerActo
> r$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.
> apply(JobManagerActor.scala:301)
> job-server[ERROR]  at scala.concurrent.impl.Future$P
> romiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> job-server[ERROR]  at scala.concurrent.impl.Future$P
> romiseCompletingRunnable.run(Future.scala:24)
> job-server[ERROR]  at java.util.concurrent.ThreadPoo
> lExecutor.runWorker(ThreadPoolExecutor.java:1145)
> job-server[ERROR]  at java.util.concurrent.ThreadPoo
> lExecutor$Worker.run(ThreadPoolExecutor.java:615)
> job-server[ERROR]  at java.lang.Thread.run(Thread.java:745)
> job-server[ERROR] Caused by: java.lang.ClassNotFoundException:
> org.apache.spark.sql.SQLContext
> job-server[ERROR]  at java.net.URLClassLoader$1.run(
> URLClassLoader.java:366)
> job-server[ERROR]  at java.net.URLClassLoader$1.run(
> URLClassLoader.java:355)
> job-server[ERROR]  at java.security.AccessController.doPrivileged(Native
> Method)
> job-server[ERROR]  at java.net.URLClassLoader.findCl
> ass(URLClassLoader.java:354)
> job-server[ERROR]  at java.lang.ClassLoader.loadClas
> s(ClassLoader.java:425)
> job-server[ERROR]  at java.lang.ClassLoader.loadClas
> s(ClassLoader.java:358)
> job-server[ERROR]  ... 10 more
>
>
> what is the problem?
> do you have any solution about this?
>


Re: Looking at EMR Logs

2017-03-31 Thread Vadim Semenov
You can provide your own log directory, where Spark log will be saved, and
that you could replay afterwards.

Set in your job this: `spark.eventLog.dir=s3://bucket/some/directory` and
run it.
Note! The path `s3://bucket/some/directory` must exist before you run your
job, it'll not be created automatically.

The Spark HistoryServer on EMR won't show you anything because it's looking
for logs in `hdfs:///var/log/spark/apps` by default.

After that you can either copy the log files from s3 to the hdfs path
above, or you can copy them locally to `/tmp/spark-events` (the default
directory for spark logs) and run the history server like:
```
cd /usr/local/src/spark-1.6.1-bin-hadoop2.6
sbin/start-history-server.sh
```
and then open http://localhost:18080




On Thu, Mar 30, 2017 at 8:45 PM, Paul Tremblay 
wrote:

> I am looking for tips on evaluating my Spark job after it has run.
>
> I know that right now I can look at the history of jobs through the web
> ui. I also know how to look at the current resources being used by a
> similar web ui.
>
> However, I would like to look at the logs after the job is finished to
> evaluate such things as how many tasks were completed, how many executors
> were used, etc. I currently save my logs to S3.
>
> Thanks!
>
> Henry
>
> --
> Paul Henry Tremblay
> Robert Half Technology
>


Re: How can i remove the need for calling cache

2017-08-02 Thread Vadim Semenov
Also check the `RDD.checkpoint()` method

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1533-L1550

On Wed, Aug 2, 2017 at 8:46 PM, Vadim Semenov <vadim.seme...@datadoghq.com>
wrote:

> I'm not sure that "checkpointed" means the same thing in that sentence.
>
> You can run a simple test using `spark-shell`:
>
> sc.setCheckpointDir("/tmp/checkpoint")
> val rdd = sc.parallelize(1 to 10).map(x => {
>   Thread.sleep(1000)
>   x
> })
> rdd.checkpoint()
> rdd.foreach(println) // Will take 10 seconds
> rdd.foreach(println) // Will be instant, because the RDD is checkpointed
>
> On Wed, Aug 2, 2017 at 7:05 PM, jeff saremi <jeffsar...@hotmail.com>
> wrote:
>
>> Vadim:
>>
>> This is from the Mastering Spark book:
>>
>> *"It is strongly recommended that a checkpointed RDD is persisted in
>> memory, otherwise saving it on a file will require recomputation."*
>>
>>
>> To me that means checkpoint will not prevent the recomputation that i was
>> hoping for
>> --
>> *From:* Vadim Semenov <vadim.seme...@datadoghq.com>
>> *Sent:* Tuesday, August 1, 2017 12:05:17 PM
>> *To:* jeff saremi
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: How can i remove the need for calling cache
>>
>> You can use `.checkpoint()`:
>> ```
>> val sc: SparkContext
>> sc.setCheckpointDir("hdfs:///tmp/checkpointDirectory")
>> myrdd.checkpoint()
>> val result1 = myrdd.map(op1(_))
>> result1.count() // Will save `myrdd` to HDFS and do map(op1…
>> val result2 = myrdd.map(op2(_))
>> result2.count() // Will load `myrdd` from HDFS and do map(op2…
>> ```
>>
>> On Tue, Aug 1, 2017 at 2:05 PM, jeff saremi <jeffsar...@hotmail.com>
>> wrote:
>>
>>> Calling cache/persist fails all our jobs (i have  posted 2 threads on
>>> this).
>>>
>>> And we're giving up hope in finding a solution.
>>> So I'd like to find a workaround for that:
>>>
>>> If I save an RDD to hdfs and read it back, can I use it in more than one
>>> operation?
>>>
>>> Example: (using cache)
>>> // do a whole bunch of transformations on an RDD
>>>
>>> myrdd.cache()
>>>
>>> val result1 = myrdd.map(op1(_))
>>>
>>> val result2 = myrdd.map(op2(_))
>>>
>>> // in the above I am assuming that a call to cache will prevent all
>>> previous transformation from being calculated twice
>>>
>>> I'd like to somehow get result1 and result2 without duplicating work.
>>> How can I do that?
>>>
>>> thanks
>>>
>>> Jeff
>>>
>>
>>
>


Re: How can i remove the need for calling cache

2017-08-02 Thread Vadim Semenov
`saveAsObjectFile` doesn't save the DAG, it acts as a typical action, so it
just saves data to some destination.

`cache/persist` allow you to cache data and keep the DAG in case of some
executor that holds data goes down, so Spark would still be able to
recalculate missing partitions

`localCheckpoint` allows you to sacrifice fault-tolerance and truncate the
DAG, so if some executor goes down, the job will fail, because it has
already forgotten the DAG.
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1551-L1610

and `checkpoint` allows you to save data to some shared storage and
truncate the DAG, so if an executor goes down, the job will be able to take
missing partitions from the place where it saved the RDD
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1533-L1549

On Wed, Aug 2, 2017 at 7:20 PM, Suzen, Mehmet  wrote:

> On 3 August 2017 at 01:05, jeff saremi  wrote:
> > Vadim:
> >
> > This is from the Mastering Spark book:
> >
> > "It is strongly recommended that a checkpointed RDD is persisted in
> memory,
> > otherwise saving it on a file will require recomputation."
>
> Is this really true? I had the impression that DAG will not be carried
> out once RDD is serialized to an external file, so 'saveAsObjectFile'
> saves DAG as well?
>


Re: How can i remove the need for calling cache

2017-08-02 Thread Vadim Semenov
I'm not sure that "checkpointed" means the same thing in that sentence.

You can run a simple test using `spark-shell`:

sc.setCheckpointDir("/tmp/checkpoint")
val rdd = sc.parallelize(1 to 10).map(x => {
  Thread.sleep(1000)
  x
})
rdd.checkpoint()
rdd.foreach(println) // Will take 10 seconds
rdd.foreach(println) // Will be instant, because the RDD is checkpointed

On Wed, Aug 2, 2017 at 7:05 PM, jeff saremi <jeffsar...@hotmail.com> wrote:

> Vadim:
>
> This is from the Mastering Spark book:
>
> *"It is strongly recommended that a checkpointed RDD is persisted in
> memory, otherwise saving it on a file will require recomputation."*
>
>
> To me that means checkpoint will not prevent the recomputation that i was
> hoping for
> --
> *From:* Vadim Semenov <vadim.seme...@datadoghq.com>
> *Sent:* Tuesday, August 1, 2017 12:05:17 PM
> *To:* jeff saremi
> *Cc:* user@spark.apache.org
> *Subject:* Re: How can i remove the need for calling cache
>
> You can use `.checkpoint()`:
> ```
> val sc: SparkContext
> sc.setCheckpointDir("hdfs:///tmp/checkpointDirectory")
> myrdd.checkpoint()
> val result1 = myrdd.map(op1(_))
> result1.count() // Will save `myrdd` to HDFS and do map(op1…
> val result2 = myrdd.map(op2(_))
> result2.count() // Will load `myrdd` from HDFS and do map(op2…
> ```
>
> On Tue, Aug 1, 2017 at 2:05 PM, jeff saremi <jeffsar...@hotmail.com>
> wrote:
>
>> Calling cache/persist fails all our jobs (i have  posted 2 threads on
>> this).
>>
>> And we're giving up hope in finding a solution.
>> So I'd like to find a workaround for that:
>>
>> If I save an RDD to hdfs and read it back, can I use it in more than one
>> operation?
>>
>> Example: (using cache)
>> // do a whole bunch of transformations on an RDD
>>
>> myrdd.cache()
>>
>> val result1 = myrdd.map(op1(_))
>>
>> val result2 = myrdd.map(op2(_))
>>
>> // in the above I am assuming that a call to cache will prevent all
>> previous transformation from being calculated twice
>>
>> I'd like to somehow get result1 and result2 without duplicating work. How
>> can I do that?
>>
>> thanks
>>
>> Jeff
>>
>
>


Re: How can i remove the need for calling cache

2017-08-02 Thread Vadim Semenov
So if you just save an RDD to HDFS via 'saveAsSequenceFile', you would have
to create a new RDD that reads that data, this way you'll avoid recomputing
the RDD but may lose time on saving/loading.

Exactly same thing happens in 'checkpoint', 'checkpoint' is just a
convenient method that gives you the same RDD back, basically.

However, if your job fails, there's no way to run a new job using already
'checkpointed' data from a previous failed run. That's where having a
custom check pointer helps.

Another note: you can not delete "checkpoint"ed data in the same job, you
need to delete it somehow else.

BTW, have you tried '.persist(StorageLevel.DISK_ONLY)'? It caches data to
local disk, making more space in JVM and letting you to avoid hdfs.

On Wednesday, August 2, 2017, Vadim Semenov <vadim.seme...@datadoghq.com>
wrote:

> `saveAsObjectFile` doesn't save the DAG, it acts as a typical action, so
> it just saves data to some destination.
>
> `cache/persist` allow you to cache data and keep the DAG in case of some
> executor that holds data goes down, so Spark would still be able to
> recalculate missing partitions
>
> `localCheckpoint` allows you to sacrifice fault-tolerance and truncate the
> DAG, so if some executor goes down, the job will fail, because it has
> already forgotten the DAG. https://github.com/apache/
> spark/blob/master/core/src/main/scala/org/apache/spark/
> rdd/RDD.scala#L1551-L1610
>
> and `checkpoint` allows you to save data to some shared storage and
> truncate the DAG, so if an executor goes down, the job will be able to take
> missing partitions from the place where it saved the RDD
> https://github.com/apache/spark/blob/master/core/src/
> main/scala/org/apache/spark/rdd/RDD.scala#L1533-L1549
>
> On Wed, Aug 2, 2017 at 7:20 PM, Suzen, Mehmet <su...@acm.org
> <javascript:_e(%7B%7D,'cvml','su...@acm.org');>> wrote:
>
>> On 3 August 2017 at 01:05, jeff saremi <jeffsar...@hotmail.com
>> <javascript:_e(%7B%7D,'cvml','jeffsar...@hotmail.com');>> wrote:
>> > Vadim:
>> >
>> > This is from the Mastering Spark book:
>> >
>> > "It is strongly recommended that a checkpointed RDD is persisted in
>> memory,
>> > otherwise saving it on a file will require recomputation."
>>
>> Is this really true? I had the impression that DAG will not be carried
>> out once RDD is serialized to an external file, so 'saveAsObjectFile'
>> saves DAG as well?
>>
>
>


Re: [Spark Core] Is it possible to insert a function directly into the Logical Plan?

2017-08-14 Thread Vadim Semenov
Something like this, maybe?


import org.apache.spark.sql.Dataset
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.catalyst.encoders.RowEncoder

val df: DataFrame = ???
val spark = df.sparkSession
val rddOfInternalRows = df.queryExecution.toRdd.mapPartitions(iter => {
  log.info("Test")
  iter
})
val attributes = df.schema.map(f =>
   AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()
)
val logicalPlan = LogicalRDD(attributes, rddOfInternalRows)(spark)
val rowEncoder = RowEncoder(df.schema)
val resultingDataFrame = new Dataset[Row](spark, logicalPlan, rowEncoder)
resultingDataFrame

On Mon, Aug 14, 2017 at 2:15 PM, Lukas Bradley 
wrote:

> We have had issues with gathering status on long running jobs.  We have
> attempted to draw parallels between the Spark UI/Monitoring API and our
> code base.  Due to the separation between code and the execution plan, even
> having a guess as to where we are in the process is difficult.  The
> Job/Stage/Task information is too abstracted from our code to be easily
> digested by non Spark engineers on our team.
>
> Is there a "hook" to which I can attach a piece of code that is triggered
> when a point in the plan is reached?  This could be when a SQL command
> completes, or when a new DataSet is created, anything really...
>
> It seems Dataset.checkpoint() offers an excellent snapshot position during
> execution, but I'm concerned I'm short-circuiting the optimal execution of
> the full plan.  I really want these trigger functions to be completely
> independent of the actual processing itself.  I'm not looking to extract
> information from a Dataset, RDD, or anything else.  I essentially want to
> write independent output for status.
>
> If this doesn't exist, is there any desire on the dev team for me to
> investigate this feature?
>
> Thank you for any and all help.
>


Re: underlying checkpoint

2017-07-13 Thread Vadim Semenov
You need to trigger an action on that rdd to checkpoint it.

```
scala>spark.sparkContext.setCheckpointDir(".")

scala>val df = spark.createDataFrame(List(("Scala", 35), ("Python",
30), ("R", 15), ("Java", 20)))
df: org.apache.spark.sql.DataFrame = [_1: string, _2: int]

scala> df.rdd.checkpoint()

scala> df.rdd.isCheckpointed
res2: Boolean = false

scala> df.show()
+--+---+
|_1| _2|
+--+---+
| Scala| 35|
|Python| 30|
| R| 15|
|  Java| 20|
+--+---+


scala> df.rdd.isCheckpointed
res4: Boolean = false

scala> df.rdd.count()
res5: Long = 4

scala> df.rdd.isCheckpointed
res6: Boolean = true
```

On Thu, Jul 13, 2017 at 11:35 AM, Bernard Jesop 
wrote:

> Hi everyone, I just tried this simple program :
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> * import
> org.apache.spark.sql.SparkSession
>  object CheckpointTest extends App
> {
>val spark =
> SparkSession
>
> .builder()
>
> .appName("Toto")
>
> .getOrCreate()
>
> spark.sparkContext.setCheckpointDir(".")
>val df = spark.createDataFrame(List(("Scala", 35), ("Python", 30), ("R",
> 15), ("Java",
> 20)))
>
> df.show()
>
> df.rdd.checkpoint()
>println(if (df.rdd.isCheckpointed) "checkpointed" else "not
> checkpointed")
>  }*
> But the result is still *"not checkpointed"*.
> Do you have any idea why? (knowing that the checkpoint file is created)
>
> Best regards,
> Bernard JESOP
>


Re: How to insert a dataframe as a static partition to a partitioned table

2017-07-20 Thread Vadim Semenov
This should work:
```
ALTER TABLE `table` ADD PARTITION (partcol=1) LOCATION
'/path/to/your/dataset'
```

On Wed, Jul 19, 2017 at 6:13 PM, ctang  wrote:

> I wonder if there are any easy ways (or APIs) to insert a dataframe (or
> DataSet), which does not contain the partition columns, as a static
> partition to the table. For example,
> The DataSet with columns (col1, col2) will be inserted into a table (col1,
> col2) partitioned by column partcol as a static partition with partspec
> (partcol =1).
>
> Thanks
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/How-to-insert-a-dataframe-as-a-
> static-partition-to-a-partitioned-table-tp28882.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark, S3A, and 503 SlowDown / rate limit issues

2017-07-05 Thread Vadim Semenov
Are you sure that you use S3A?
Because EMR says that they do not support S3A

https://aws.amazon.com/premiumsupport/knowledge-center/emr-file-system-s3/
> Amazon EMR does not currently support use of the Apache Hadoop S3A file
system.

I think that the HEAD requests come from the `createBucketIfNotExists` in
the AWS S3 library that checks if the bucket exists every time you do a PUT
request, i.e. creates a HEAD request.

You can disable that by setting `fs.s3.buckets.create.enabled` to `false`
http://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-upload-s3.html

On Thu, Jun 29, 2017 at 4:56 PM, Everett Anderson 
wrote:

> Hi,
>
> We're using Spark 2.0.2 + Hadoop 2.7.3 on AWS EMR with S3A for direct I/O
> from/to S3 from our Spark jobs. We set mapreduce.
> fileoutputcommitter.algorithm.version=2 and are using encrypted S3
> buckets.
>
> This has been working fine for us, but perhaps as we've been running more
> jobs in parallel, we've started getting errors like
>
> Status Code: 503, AWS Service: Amazon S3, AWS Request ID: ..., AWS Error
> Code: SlowDown, AWS Error Message: Please reduce your request rate., S3
> Extended Request ID: ...
>
> We enabled CloudWatch S3 request metrics for one of our buckets and I was
> a little alarmed to see spikes of over 800k S3 requests over a minute or
> so, with the bulk of them HEAD requests.
>
> We read and write Parquet files, and most tables have around 50
> shards/parts, though some have up to 200. I imagine there's additional
> parallelism when reading a shard in Parquet, though.
>
> Has anyone else encountered this? How did you solve it?
>
> I'd sure prefer to avoid copying all our data in and out of HDFS for each
> job, if possible.
>
> Thanks!
>
>


Re: How can i remove the need for calling cache

2017-08-01 Thread Vadim Semenov
You can use `.checkpoint()`:
```
val sc: SparkContext
sc.setCheckpointDir("hdfs:///tmp/checkpointDirectory")
myrdd.checkpoint()
val result1 = myrdd.map(op1(_))
result1.count() // Will save `myrdd` to HDFS and do map(op1…
val result2 = myrdd.map(op2(_))
result2.count() // Will load `myrdd` from HDFS and do map(op2…
```

On Tue, Aug 1, 2017 at 2:05 PM, jeff saremi  wrote:

> Calling cache/persist fails all our jobs (i have  posted 2 threads on
> this).
>
> And we're giving up hope in finding a solution.
> So I'd like to find a workaround for that:
>
> If I save an RDD to hdfs and read it back, can I use it in more than one
> operation?
>
> Example: (using cache)
> // do a whole bunch of transformations on an RDD
>
> myrdd.cache()
>
> val result1 = myrdd.map(op1(_))
>
> val result2 = myrdd.map(op2(_))
>
> // in the above I am assuming that a call to cache will prevent all
> previous transformation from being calculated twice
>
> I'd like to somehow get result1 and result2 without duplicating work. How
> can I do that?
>
> thanks
>
> Jeff
>


Re: count exceed int.MaxValue

2017-08-08 Thread Vadim Semenov
Scala doesn't support ranges >= Int.MaxValue
https://github.com/scala/scala/blob/2.12.x/src/library/scala/collection/immutable/Range.scala?utf8=✓#L89

You can create two RDDs and unionize them:

scala> val rdd = sc.parallelize(1L to
Int.MaxValue.toLong).union(sc.parallelize(1L to Int.MaxValue.toLong))
rdd: org.apache.spark.rdd.RDD[Long] = UnionRDD[10] at union at :24

scala> rdd.count
[Stage 0:>  (0 + 4)
/ 8]


Also instead of creating the range on the driver, you can create your RDD
in parallel:

scala> :paste
// Entering paste mode (ctrl-D to finish)

val numberOfParts = 100
val numberOfElementsInEachPart = Int.MaxValue.toDouble / 100

val rdd = sc.parallelize(1 to numberOfParts).flatMap(partNum => {
  val begin = ((partNum - 1) * numberOfElementsInEachPart).toLong
  val end = (partNum * numberOfElementsInEachPart).toLong
  begin to end
})

// Exiting paste mode, now interpreting.

numberOfParts: Int = 100
numberOfElementsInEachPart: Double = 2.147483647E7
rdd: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[15] at flatMap at
:31

scala> rdd.count
res10: Long = 2147483747

On Tue, Aug 8, 2017 at 1:26 PM, makoto  wrote:

> Hello,
> I'd like to count more than Int.MaxValue. But I encountered the following
> error.
>
> scala> val rdd = sc.parallelize(1L to Int.MaxValue*2.toLong)
> rdd: org.apache.spark.rdd.RDD[Long] = ParallelCollectionRDD[28] at
> parallelize at :24
>
> scala> rdd.count
> java.lang.IllegalArgumentException: More than Int.MaxValue elements.
>   at scala.collection.immutable.NumericRange$.check$1(
> NumericRange.scala:304)
>   at scala.collection.immutable.NumericRange$.count(
> NumericRange.scala:314)
>   at scala.collection.immutable.NumericRange.numRangeElements$
> lzycompute(NumericRange.scala:52)
>   at scala.collection.immutable.NumericRange.numRangeElements(
> NumericRange.scala:51)
>   at scala.collection.immutable.NumericRange.length(NumericRange.scala:54)
>   at org.apache.spark.rdd.ParallelCollectionRDD$.slice(
> ParallelCollectionRDD.scala:145)
>   at org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(
> ParallelCollectionRDD.scala:97)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
>   at scala.Option.getOrElse(Option.scala:121)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
>   at org.apache.spark.rdd.RDD.count(RDD.scala:1158)
>   ... 48 elided
>
> How can I avoid the error ?
> A similar problem is as follows:
> scala> rdd.reduce((a,b)=> (a + b))
> java.lang.IllegalArgumentException: More than Int.MaxValue elements.
>   at scala.collection.immutable.NumericRange$.check$1(
> NumericRange.scala:304)
>   at scala.collection.immutable.NumericRange$.count(
> NumericRange.scala:314)
>   at scala.collection.immutable.NumericRange.numRangeElements$
> lzycompute(NumericRange.scala:52)
>   at scala.collection.immutable.NumericRange.numRangeElements(
> NumericRange.scala:51)
>   at scala.collection.immutable.NumericRange.length(NumericRange.scala:54)
>   at org.apache.spark.rdd.ParallelCollectionRDD$.slice(
> ParallelCollectionRDD.scala:145)
>   at org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(
> ParallelCollectionRDD.scala:97)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
>   at scala.Option.getOrElse(Option.scala:121)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2119)
>   at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1026)
>   at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:151)
>   at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>   at org.apache.spark.rdd.RDD.reduce(RDD.scala:1008)
>   ... 48 elided
>
>
>


Re: Spark <--> S3 flakiness

2017-05-11 Thread Vadim Semenov
Use the official mailing list archive

http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3ccajyeq0gh1fbhbajb9gghognhqouogydba28lnn262hfzzgf...@mail.gmail.com%3e

On Thu, May 11, 2017 at 2:50 PM, lucas.g...@gmail.com 
wrote:

> Also, and this is unrelated to the actual question... Why don't these
> messages show up in the archive?
>
> http://apache-spark-user-list.1001560.n3.nabble.com/
>
> Ideally I'd want to post a link to our internal wiki for these questions,
> but can't find them in the archive.
>
> On 11 May 2017 at 07:16, lucas.g...@gmail.com 
> wrote:
>
>> Looks like this isn't viable in spark 2.0.0 (and greater I presume).  I'm
>> pretty sure I came across this blog and ignored it due to that.
>>
>> Any other thoughts?  The linked tickets in: https://issues.apache.org/
>> jira/browse/SPARK-10063 https://issues.apache.org/jira/brows
>> e/HADOOP-13786 https://issues.apache.org/jira/browse/HADOOP-9565 look
>> relevant too.
>>
>> On 10 May 2017 at 22:24, Miguel Morales  wrote:
>>
>>> Try using the DirectParquetOutputCommiter:
>>> http://dev.sortable.com/spark-directparquetoutputcommitter/
>>>
>>> On Wed, May 10, 2017 at 10:07 PM, lucas.g...@gmail.com
>>>  wrote:
>>> > Hi users, we have a bunch of pyspark jobs that are using S3 for
>>> loading /
>>> > intermediate steps and final output of parquet files.
>>> >
>>> > We're running into the following issues on a semi regular basis:
>>> > * These are intermittent errors, IE we have about 300 jobs that run
>>> > nightly... And a fairly random but small-ish percentage of them fail
>>> with
>>> > the following classes of errors.
>>> >
>>> > S3 write errors
>>> >
>>> >> "ERROR Utils: Aborting task
>>> >> com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404,
>>> AWS
>>> >> Service: Amazon S3, AWS Request ID: 2D3RP, AWS Error Code: null, AWS
>>> Error
>>> >> Message: Not Found, S3 Extended Request ID: BlaBlahEtc="
>>> >
>>> >
>>> >>
>>> >> "Py4JJavaError: An error occurred while calling o43.parquet.
>>> >> : com.amazonaws.services.s3.model.MultiObjectDeleteException: Status
>>> Code:
>>> >> 0, AWS Service: null, AWS Request ID: null, AWS Error Code: null, AWS
>>> Error
>>> >> Message: One or more objects could not be deleted, S3 Extended
>>> Request ID:
>>> >> null"
>>> >
>>> >
>>> >
>>> > S3 Read Errors:
>>> >
>>> >> [Stage 1:=>
>>>  (27 + 4)
>>> >> / 31]17/05/10 16:25:23 ERROR Executor: Exception in task 10.0 in
>>> stage 1.0
>>> >> (TID 11)
>>> >> java.net.SocketException: Connection reset
>>> >> at java.net.SocketInputStream.read(SocketInputStream.java:196)
>>> >> at java.net.SocketInputStream.read(SocketInputStream.java:122)
>>> >> at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
>>> >> at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
>>> >> at sun.security.ssl.InputRecord.read(InputRecord.java:509)
>>> >> at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
>>> >> at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.
>>> java:884)
>>> >> at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
>>> >> at
>>> >> org.apache.http.impl.io.AbstractSessionInputBuffer.read(Abst
>>> ractSessionInputBuffer.java:198)
>>> >> at
>>> >> org.apache.http.impl.io.ContentLengthInputStream.read(Conten
>>> tLengthInputStream.java:178)
>>> >> at
>>> >> org.apache.http.impl.io.ContentLengthInputStream.read(Conten
>>> tLengthInputStream.java:200)
>>> >> at
>>> >> org.apache.http.impl.io.ContentLengthInputStream.close(Conte
>>> ntLengthInputStream.java:103)
>>> >> at
>>> >> org.apache.http.conn.BasicManagedEntity.streamClosed(BasicMa
>>> nagedEntity.java:168)
>>> >> at
>>> >> org.apache.http.conn.EofSensorInputStream.checkClose(EofSens
>>> orInputStream.java:228)
>>> >> at
>>> >> org.apache.http.conn.EofSensorInputStream.close(EofSensorInp
>>> utStream.java:174)
>>> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>>> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>>> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>>> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>>> >> at com.amazonaws.services.s3.model.S3Object.close(S3Object.java:203)
>>> >> at org.apache.hadoop.fs.s3a.S3AInputStream.close(S3AInputStream
>>> .java:187)
>>> >
>>> >
>>> >
>>> > We have literally tons of logs we can add but it would make the email
>>> > unwieldy big.  If it would be helpful I'll drop them in a pastebin or
>>> > something.
>>> >
>>> > Our config is along the lines of:
>>> >
>>> > spark-2.1.0-bin-hadoop2.7
>>> > '--packages
>>> > com.amazonaws:aws-java-sdk:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0
>>> > pyspark-shell'
>>> >
>>> > Given the stack overflow / googling I've been doing I know we're not
>>> the
>>> > only org with these issues but I haven't found a good set of solutions
>>> in
>>> 

Re: [How-To] Custom file format as source

2017-06-12 Thread Vadim Semenov
It should be easy to start with a custom Hadoop InputFormat that reads the
file and creates a `RDD[Row]`, since you know the records size, it should
be pretty easy to make the InputFormat to produce splits, so then you could
read the file in parallel.

On Mon, Jun 12, 2017 at 6:01 AM, OBones  wrote:

> Hello,
>
> I have an application here that generates data files in a custom binary
> format that provides the following information:
>
> Column list, each column has a data type (64 bit integer, 32 bit string
> index, 64 bit IEEE float, 1 byte boolean)
> Catalogs that give modalities for some columns (ie, column 1 contains only
> the following values: A, B, C, D)
> Array for actual data, each row has a fixed size according to the columns.
>
> Here is an example:
>
> Col1, 64bit integer
> Col2, 32bit string index
> Col3, 64bit integer
> Col4, 64bit float
>
> Catalog for Col1 = 10, 20, 30, 40, 50
> Catalog for Col2 = Big, Small, Large, Tall
> Catalog for Col3 = 101, 102, 103, 500, 5000
> Catalog for Col4 = (no catalog)
>
> Data array =
> 8 bytes, 4 bytes, 8 bytes, 8 bytes,
> 8 bytes, 4 bytes, 8 bytes, 8 bytes,
> 8 bytes, 4 bytes, 8 bytes, 8 bytes,
> 8 bytes, 4 bytes, 8 bytes, 8 bytes,
> 8 bytes, 4 bytes, 8 bytes, 8 bytes,
> ...
>
> I would like to use this kind of file as a source for various ML related
> computations (CART, RandomForrest, Gradient boosting...) and Spark is very
> interesting in this area.
> However, I'm a bit lost as to what I should write to have Spark use that
> file format as a source for its computation. Considering that those files
> are quite big (100 million lines, hundreds of gigs on disk), I'd rather not
> create something that writes a new file in a built-in format, but I'd
> rather write some code that makes Spark accept the file as it is.
>
> I looked around and saw the textfile method but it is not applicable to my
> case. I also saw the spark.read.format("libsvm") syntax which tells me that
> there is a list of supported formats known to spark, which I believe are
> called Dataframes, but I could not find any tutorial on this subject.
>
> Would you have any suggestion or links to documentation that would get me
> started?
>
> Regards,
> Olivier
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How does HashPartitioner distribute data in Spark?

2017-06-23 Thread Vadim Semenov
This is the code that chooses the partition for a key:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Partitioner.scala#L85-L88

it's basically `math.abs(key.hashCode % numberOfPartitions)`

On Fri, Jun 23, 2017 at 3:42 AM, Vikash Pareek <
vikash.par...@infoobjects.com> wrote:

> I am trying to understand how spark partitoing works.
>
> To understand this I have following piece of code on spark 1.6
>
> def countByPartition1(rdd: RDD[(String, Int)]) = {
> rdd.mapPartitions(iter => Iterator(iter.length))
> }
> def countByPartition2(rdd: RDD[String]) = {
> rdd.mapPartitions(iter => Iterator(iter.length))
> }
>
> //RDDs Creation
> val rdd1 = sc.parallelize(Array(("aa", 1), ("aa", 1), ("aa", 1), ("aa",
> 1)), 8)
> countByPartition(rdd1).collect()
> >> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)
>
> val rdd2 = sc.parallelize(Array("aa", "aa", "aa", "aa"), 8)
> countByPartition(rdd2).collect()
> >> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)
>
> In both the cases data is distributed uniformaly.
> I do have following questions on the basis of above observation:
>
>  1. In case of rdd1, hash partitioning should calculate hashcode of key
> (i.e. "aa" in this case), so all records should go to single partition
> instead of uniform distribution?
>  2. In case of rdd2, there is no key value pair so how hash partitoning
> going to work i.e. what is the key to calculate hashcode?
>
> I have followed @zero323 answer but not getting answer of these.
> https://stackoverflow.com/questions/31424396/how-does-hashpartitioner-work
>
>
>
>
> -
>
> __Vikash Pareek
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/How-does-HashPartitioner-distribute-data-in-Spark-
> tp28785.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: "Sharing" dataframes...

2017-06-20 Thread Vadim Semenov
You can launch one permanent spark context and then execute your jobs
within the context. And since they'll be running in the same context, they
can share data easily.

These two projects provide the functionality that you need:
https://github.com/spark-jobserver/spark-jobserver#persistent-context-mode---faster--required-for-related-jobs
https://github.com/cloudera/livy#post-sessions

On Tue, Jun 20, 2017 at 1:46 PM, Jean Georges Perrin  wrote:

> Hey,
>
> Here is my need: program A does something on a set of data and produces
> results, program B does that on another set, and finally, program C
> combines the data of A and B. Of course, the easy way is to dump all on
> disk after A and B are done, but I wanted to avoid this.
>
> I was thinking of creating a temp view, but I do not really like the temp
> aspect of it ;). Any idea (they are all worth sharing)
>
> jg
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Bizarre UI Behavior after migration

2017-05-22 Thread Vadim Semenov
I believe it shows only the tasks that have actually being executed, if
there were tasks with no data, they don't get reported.

I might be mistaken, if somebody has a good explanation, would also like to
hear.

On Fri, May 19, 2017 at 5:45 PM, Miles Crawford  wrote:

> Hey ya'll,
>
> Trying to migrate from Spark 1.6.1 to 2.1.0.
>
> I use EMR, and launched a new cluster using EMR 5.5, which runs spark
> 2.1.0.
>
> I updated my dependencies, and fixed a few API changes related to
> accumulators, and presto! my application was running on the new cluster.
>
> But the application UI shows crazy output:
> https://www.dropbox.com/s/egtj1056qeudswj/sparkwut.png?dl=0
>
> The applications seem to complete successfully, but I was wondering if
> anyone has an idea of what might be going wrong?
>
> Thanks,
> -Miles
>


Re: [Spark Core] Does spark support read from remote Hive server via JDBC

2017-06-08 Thread Vadim Semenov
Have you tried running a query? something like:

```
test.select("*").limit(10).show()
```

On Thu, Jun 8, 2017 at 4:16 AM, Даша Ковальчук 
wrote:

> Hi guys,
>
> I need to execute hive queries on remote hive server from spark, but for
> some reasons i receive only column names(without data).
> Data available in table, I checked it via HUE and java jdbc connection.
>
> Here is my code example:
> val test = spark.read
> .option("url", "jdbc:hive2://remote.hive.server:1/work_base")
> .option("user", "user")
> .option("password", "password")
> .option("dbtable", "some_table_with_data")
> .option("driver", "org.apache.hive.jdbc.HiveDriver")
> .format("jdbc")
> .load()
> test.show()
>
>
> Scala version: 2.11
> Spark version: 2.1.0, i also tried 2.1.1
> Hive version: CDH 5.7 Hive 1.1.1
> Hive JDBC version: 1.1.1
>
> But this problem available on Hive with later versions, too.
> I didn't find anything in mail group answers and StackOverflow.
> Could you, please, help me with this issue or could you help me find correct
> solution how to query remote hive from spark?
>
> Thanks in advance!
>


Re: Configuration for unit testing and sql.shuffle.partitions

2017-09-18 Thread Vadim Semenov
you can create a Super class "FunSuiteWithSparkContext" that's going to
create a Spark sessions, Spark context, and SQLContext with all the desired
properties.
Then you add the class to all the relevant test suites, and that's pretty
much it.

The other option can be is to pass it as a VM parameter like
`-Dspark.driver.memory=2g -Xmx3G -Dspark.master=local[3]`

For example, if you run your tests with sbt:

```
SBT_OPTS="-Xmx3G -Dspark.driver.memory=1536m" sbt test
```

On Sat, Sep 16, 2017 at 2:54 PM, Femi Anthony  wrote:

> How are you specifying it, as an option to spark-submit ?
>
> On Sat, Sep 16, 2017 at 12:26 PM, Akhil Das  wrote:
>
>> spark.sql.shuffle.partitions is still used I believe. I can see it in the
>> code
>> 
>>  and
>> in the documentation page
>> 
>> .
>>
>> On Wed, Sep 13, 2017 at 4:46 AM, peay  wrote:
>>
>>> Hello,
>>>
>>> I am running unit tests with Spark DataFrames, and I am looking for
>>> configuration tweaks that would make tests faster. Usually, I use a
>>> local[2] or local[4] master.
>>>
>>> Something that has been bothering me is that most of my stages end up
>>> using 200 partitions, independently of whether I repartition the input.
>>> This seems a bit overkill for small unit tests that barely have 200 rows
>>> per DataFrame.
>>>
>>> spark.sql.shuffle.partitions used to control this I believe, but it
>>> seems to be gone and I could not find any information on what
>>> mechanism/setting replaces it or the corresponding JIRA.
>>>
>>> Has anyone experience to share on how to tune Spark best for very small
>>> local runs like that?
>>>
>>> Thanks!
>>>
>>>
>>
>>
>> --
>> Cheers!
>>
>>
>
>
> --
> http://www.femibyte.com/twiki5/bin/view/Tech/
> http://www.nextmatrix.com
> "Great spirits have always encountered violent opposition from mediocre
> minds." - Albert Einstein.
>


Re: SVD computation limit

2017-09-19 Thread Vadim Semenov
This may also be related to
https://issues.apache.org/jira/browse/SPARK-22033

On Tue, Sep 19, 2017 at 3:40 PM, Mark Bittmann  wrote:

> I've run into this before. The EigenValueDecomposition creates a Java
> Array with 2*k*n elements. The Java Array is indexed with a native integer
> type, so 2*k*n cannot exceed Integer.MAX_VALUE values.
>
> The array is created here:
> https://github.com/apache/spark/blob/master/mllib/src/main/
> scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala#L84
>
> If you remove the requirement that 2*k*n fail with java.lang.NegativeArraySizeException. More here on this issue
> here:
> https://issues.apache.org/jira/browse/SPARK-5656
>
> On Tue, Sep 19, 2017 at 9:49 AM, Alexander Ovcharenko <
> shurik@gmail.com> wrote:
>
>> Hello guys,
>>
>> While trying to compute SVD using computeSVD() function, i am getting the
>> following warning with the follow up exception:
>> 17/09/14 12:29:02 WARN RowMatrix: computing svd with k=49865 and
>> n=191077, please check necessity
>> IllegalArgumentException: u'requirement failed: k = 49865 and/or n =
>> 191077 are too large to compute an eigendecomposition'
>>
>> When I try to compute first 3000 singular values, I'm getting several
>> following warnings every second:
>> 17/09/14 13:43:38 WARN TaskSetManager: Stage 4802 contains a task of very
>> large size (135 KB). The maximum recommended task size is 100 KB.
>>
>> The matrix size is 49865 x 191077 and all the singular values are needed.
>>
>> Is there a way to lift that limit and be able to compute whatever number
>> of singular values?
>>
>> Thank you.
>>
>>
>>
>


Re: What are factors need to Be considered when upgrading to Spark 2.1.0 from Spark 1.6.0

2017-09-22 Thread Vadim Semenov
1. 40s is pretty negligible unless you run your job very frequently, there
can be many factors that influence that.

2. Try to compare the CPU time instead of the wall-clock time

3. Check the stages that got slower and compare the DAGs

4. Test with dynamic allocation disabled

On Fri, Sep 22, 2017 at 2:39 PM, Gokula Krishnan D 
wrote:

> Hello All,
>
> Currently our Batch ETL Jobs are in Spark 1.6.0 and planning to upgrade
> into Spark 2.1.0.
>
> With minor code changes (like configuration and Spark Session.sc) able to
> execute the existing JOB into Spark 2.1.0.
>
> But noticed that JOB completion timings are much better in Spark 1.6.0 but
> no in Spark 2.1.0.
>
> For the instance, JOB A completed in 50s in Spark 1.6.0.
>
> And with the same input and JOB A completed in 1.5 mins in Spark 2.1.0.
>
> Is there any specific factor needs to be considered when switching to
> Spark 2.1.0 from Spark 1.6.0.
>
>
>
> Thanks & Regards,
> Gokula Krishnan* (Gokul)*
>


Re: how do you deal with datetime in Spark?

2017-10-03 Thread Vadim Semenov
I usually check the list of Hive UDFs as Spark has implemented almost all
of them
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-DateFunctions

Or/and check `org.apache.spark.sql.functions` directly:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/sql/functions.html

Also you can check the list of all Datetime functions here
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L368-L399

and what they do here
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala



On Tue, Oct 3, 2017 at 1:43 PM, Adaryl Wakefield <
adaryl.wakefi...@hotmail.com> wrote:

> I gave myself a project to start actually writing Spark programs. I’m
> using Scala and Spark 2.2.0. In my project, I had to do some grouping and
> filtering by dates. It was awful and took forever. I was trying to use
> dataframes and SQL as much as possible. I see that there are date functions
> in the dataframe API but trying to use them was frustrating. Even following
> code samples was a headache because apparently the code is different
> depending on which version of Spark you are using. I was really hoping for
> a rich set of date functions like you’d find in T-SQL but I never really
> found them.
>
>
>
> Is there a best practice for dealing with dates and time in Spark? I feel
> like taking a date/time string and converting it to a date/time object and
> then manipulating data based on the various components of the timestamp
> object (hour, day, year etc.) should be a heck of a lot easier than what
> I’m finding and perhaps I’m just not looking in the right place.
>
>
>
> You can see my work here: https://github.com/BobLovesData/Apache-Spark-In-
> 24-Hours/blob/master/src/net/massstreet/hour10/BayAreaBikeAnalysis.scala
>
>
>
> Adaryl "Bob" Wakefield, MBA
> Principal
> Mass Street Analytics, LLC
> 913.938.6685 <(913)%20938-6685>
>
> www.massstreet.net
>
> www.linkedin.com/in/bobwakefieldmba
> Twitter: @BobLovesData 
>
>
>
>
>


Re: Unable to run Spark Jobs in yarn cluster mode

2017-10-10 Thread Vadim Semenov
Try increasing the `spark.yarn.am.waitTime` parameter, it's by default set
to 100ms which might not be enough in certain cases.

On Tue, Oct 10, 2017 at 7:02 AM, Debabrata Ghosh 
wrote:

> Hi All,
> I am constantly hitting an error : "ApplicationMaster:
> SparkContext did not initialize after waiting for 100 ms" while running my
> Spark code in yarn cluster mode.
>
> Here is the command what I am using :* spark-submit --master yarn
> --deploy-mode cluster spark_code.py*
>
>  Please can you help me with a resolution at your
> convenience.
>
> Thanks in advance !
>
> Cheers,
>
> Debu
>
>
>


Re: EMR: Use extra mounted EBS volumes for spark.local.dir

2017-10-10 Thread Vadim Semenov
that's probably better be directed to the AWS support

On Sun, Oct 8, 2017 at 9:54 PM, Tushar Sudake  wrote:

> Hello everyone,
>
> I'm using 'r4.8xlarge' instances on EMR for my Spark Application.
> To each node, I'm attaching one 512 GB EBS volume.
>
> By logging in into nodes I tried verifying that this volume is being set
> for 'spark.local.dir' by EMR automatically, but couldn't find any such
> configuration.
>
> Can someone please confirm this? Do I need to do it myself though
> bootstrap steps?
>
> Thanks,
> Tushar
>


Re: Is there a difference between df.cache() vs df.rdd.cache()

2017-10-13 Thread Vadim Semenov
When you do `Dataset.rdd` you actually create a new job

here you can see what it does internally:
https://github.com/apache/spark/blob/master/sql/core/
src/main/scala/org/apache/spark/sql/Dataset.scala#L2816-L2828



On Fri, Oct 13, 2017 at 5:24 PM, Supun Nakandala 
wrote:

> Hi Weichen,
>
> Thank you for the reply.
>
> My understanding was Dataframe API is using the old RDD implementation
> under the covers though it presents a different API. And calling
> df.rdd will simply give access to the underlying RDD. Is this assumption
> wrong? I would appreciate if you can shed more insights on this issue or
> point me to documentation where I can learn them.
>
> Thank you in advance.
>
> On Fri, Oct 13, 2017 at 3:19 AM, Weichen Xu 
> wrote:
>
>> You should use `df.cache()`
>> `df.rdd.cache()` won't work, because `df.rdd` generate a new RDD from the
>> original `df`. and then cache the new RDD.
>>
>> On Fri, Oct 13, 2017 at 3:35 PM, Supun Nakandala <
>> supun.nakand...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I have been experimenting with cache/persist/unpersist methods with
>>> respect to both Dataframes and RDD APIs. However, I am experiencing
>>> different behaviors Ddataframe API compared RDD API such Dataframes are not
>>> getting cached when count() is called.
>>>
>>> Is there a difference between how these operations act wrt to Dataframe
>>> and RDD APIs?
>>>
>>> Thank You.
>>> -Supun
>>>
>>
>>
>


Re: Bizarre UI Behavior after migration

2017-09-10 Thread Vadim Semenov
Was checking mails I sent, and wanted to get back to this one in case
someone gets the same question.

We found out that the reason why we saw stages being complete without all
tasks complete is related to issues in the ListenerBus

We had to tune the event queue size, see this
https://issues.apache.org/jira/browse/SPARK-15703

and we had to disable `eventLog` completely in some cases because of this
https://issues.apache.org/jira/browse/SPARK-21460

Facebook did some improvements to that, which are discussed here and in the
related PRs https://issues.apache.org/jira/browse/SPARK-18838

You can also see them discussing that at the Spark Summit SF 2017
https://www.youtube.com/watch?v=5dga0UT4RI8





On Mon, May 22, 2017 at 8:35 PM, Miles Crawford <mil...@allenai.org> wrote:

> Well, what's happening here is that jobs become "un-finished" - they
> complete, and then later on pop back into the "Active" section showing a
> small number of complete/inprogress tasks.
>
> In my screenshot, Job #1 completed as normal, and then later on switched
> back to active with only 92 tasks... it never seems to change again, it's
> stuck in this frozen, active state.
>
>
> On Mon, May 22, 2017 at 12:50 PM, Vadim Semenov <
> vadim.seme...@datadoghq.com> wrote:
>
>> I believe it shows only the tasks that have actually being executed, if
>> there were tasks with no data, they don't get reported.
>>
>> I might be mistaken, if somebody has a good explanation, would also like
>> to hear.
>>
>> On Fri, May 19, 2017 at 5:45 PM, Miles Crawford <mil...@allenai.org>
>> wrote:
>>
>>> Hey ya'll,
>>>
>>> Trying to migrate from Spark 1.6.1 to 2.1.0.
>>>
>>> I use EMR, and launched a new cluster using EMR 5.5, which runs spark
>>> 2.1.0.
>>>
>>> I updated my dependencies, and fixed a few API changes related to
>>> accumulators, and presto! my application was running on the new cluster.
>>>
>>> But the application UI shows crazy output:
>>> https://www.dropbox.com/s/egtj1056qeudswj/sparkwut.png?dl=0
>>>
>>> The applications seem to complete successfully, but I was wondering if
>>> anyone has an idea of what might be going wrong?
>>>
>>> Thanks,
>>> -Miles
>>>
>>
>>
>


Re: More instances = slower Spark job

2017-09-28 Thread Vadim Semenov
Instead of having one job, you can try processing each file in a separate
job, but run multiple jobs in parallel within one SparkContext.
Something like this should work for you, it'll submit N jobs from the
driver, the jobs will run independently, but executors will dynamically
work on different jobs, so you'll utilize executors at full.

```
import org.apache.spark.sql.SparkSession

import scala.collection.parallel.ForkJoinTaskSupport

val spark: SparkSession
val files: Seq[String]
val filesParallelCollection = files.toParArray
val howManyFilesToProcessInParallel = math.min(50, files.length)

filesParallelCollection.tasksupport = new ForkJoinTaskSupport()(
  new
scala.concurrent.forkjoin.ForkJoinPool(howManyFilesToProcessInParallel)
)
filesParallelCollection.foreach(file => {
  spark.read.text(file).filter(…)…
})
```

On Thu, Sep 28, 2017 at 2:50 PM, Jeroen Miller 
wrote:

> More details on what I want to achieve. Maybe someone can suggest a
> course of action.
>
> My processing is extremely simple: reading .json.gz text
> files, filtering each line according a regex, and saving the surviving
> lines in a similarly named .gz file.
>
> Unfortunately changing the data format is impossible (we are dealing
> with terabytes here) so I need to process .gz files.
>
> Ideally, I would like to process a large number of such files in
> parallel, that is using n_e executors which would each take care of a
> fraction 1/n_e of all the files. The trouble is that I don't know how
> to process files in parallel without loading them in the driver first,
> which would result in a major bottleneck.
>
> Here was my naive approach in Scala-like pseudo-code:
>
> //
> // This happens on the driver
> //
> val files = List("s3://bckt/file-1.json.gz", ...,
> "s3://bckt/file-N.json.gz")
> val files_rdd = sc.parallelize(files, num_partitions)
> //
> // Now files_rdd (which only holds file names) is distributed across
> several executors
> // and/or nodes
> //
>
> files_rdd.foreach(
> //
> // It is my understanding that what is performed within the foreach
> method
> // will be parallelized on several executors / nodes
> //
> file => {
> //
> // This would happen on a given executor: a given input file
> is processed
> // entirely on a given executor
> //
> val lines = sc.read.text(file)
> val filtered_lines = lines.filter( // filter based on regex // )
> filtered_lines.write.option("compression",
> "gzip").text("a_filename_tbd")
> }
> )
>
> Unfortunately this is not possible since the Spark context sc is
> defined in the driver and cannot be shared.
>
> My problem would be entirely solved if I could manage to read files
> not from the driver, but from a given executor.
>
> Another possibility would be to read each .gz file in the driver
> (about 2GB each), materializing the whole resulting RDD on the driver
> (around 12GB) and then calling repartition on that RDD, but only the
> regex part would be parallelized, and the data shuffling will probably
> ruin the performance.
>
> Any idea?
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Massive fetch fails, io errors in TransportRequestHandler

2017-09-28 Thread Vadim Semenov
Looks like there's slowness in sending shuffle files, maybe one executor
get overwhelmed with all the other executors trying to pull data?
Try lifting `spark.network.timeout` further, we ourselves had to push it to
600s from the default 120s

On Thu, Sep 28, 2017 at 10:19 AM, Ilya Karpov 
wrote:

> Hi,
> I see strange behaviour in my job, and can’t understand what is wrong:
> the stage that uses shuffle data as an input job fails number of times
> because of org.apache.spark.shuffle.FetchFailedException seen in spark UI:
> FetchFailed(BlockManagerId(8, hostname, 11431, None), shuffleId=1,
> mapId=50192, reduceId=12698, message=
> FetchFailed(BlockManagerId(8, hostname, 11431, None), shuffleId=1,
> mapId=3, reduceId=12699, message=
>
> Digging in logs I found a scenario of task failure:
> 1. some shuffle-server-X-Y (important note: external shuffle service is
> OFF) report 'Broken pipe’ at 2017-09-26T05:40:26.484Z
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
> at sun.nio.ch.FileChannelImpl.transferToDirectlyInternal(
> FileChannelImpl.java:428)
> at sun.nio.ch.FileChannelImpl.transferToDirectly(
> FileChannelImpl.java:493)
> at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:608)
> at io.netty.channel.DefaultFileRegion.transferTo(
> DefaultFileRegion.java:139)
> at org.apache.spark.network.protocol.MessageWithHeader.
> transferTo(MessageWithHeader.java:121)
> at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(
> NioSocketChannel.java:287)
> at io.netty.channel.nio.AbstractNioByteChannel.doWrite(
> AbstractNioByteChannel.java:237)
> at io.netty.channel.socket.nio.NioSocketChannel.doWrite(
> NioSocketChannel.java:314)
> at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(
> AbstractChannel.java:802)
> at io.netty.channel.nio.AbstractNioChannel$
> AbstractNioUnsafe.flush0(AbstractNioChannel.java:313)
> at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(
> AbstractChannel.java:770)
>
> and "chunk send" errors:
> Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=65546478185,
> chunkIndex=0}, buffer=FileSegmentManagedBuffer{file=
> /data/1/yarn/nm/usercache/hdfs/appcache/application_
> 1505206571245_2989/blockmgr-9be47304-ffe2-443a-bb10-
> 33a89928f5b9/04/shuffle_1_3_0.data, offset=40858881, length=3208}} to
> /someClientIP:somePort; closing connection
> with exceptions:
> java.nio.channels.ClosedChannelException
> at io.netty.channel.AbstractChannel$AbstractUnsafe.write(...)(Unknown
> Source)
>
> 2. then client of this shuffle-server complains with:
> Connection to some-hostname/someIP:port has been quiet for 24 ms while
> there are outstanding requests. Assuming connection is dead; please adjust
> spark.network.timeout if this is wrong.
> and then
> Still have 3386 requests outstanding when connection from
> some-hostname/someIP:11431 is closed
> and then
> java.io.IOException: Connection from 
> shuffleServerHostname/shuffleServerIP:port
> closed
> at org.apache.spark.network.client.TransportResponseHandler.
> channelInactive(TransportResponseHandler.java:146)
> at org.apache.spark.network.server.TransportChannelHandler.
> channelInactive(TransportChannelHandler.java:108)
> at io.netty.channel.AbstractChannelHandlerContext.
> invokeChannelInactive(AbstractChannelHandlerContext.java:241)
> at io.netty.channel.AbstractChannelHandlerContext.
> invokeChannelInactive(AbstractChannelHandlerContext.java:227)
> at io.netty.channel.AbstractChannelHandlerContext.
> fireChannelInactive(AbstractChannelHandlerContext.java:220)
> at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(
> ChannelInboundHandlerAdapter.java:75)
> at io.netty.handler.timeout.IdleStateHandler.channelInactive(
> IdleStateHandler.java:278)
> at io.netty.channel.AbstractChannelHandlerContext.
> invokeChannelInactive(AbstractChannelHandlerContext.java:241)
> at io.netty.channel.AbstractChannelHandlerContext.
> invokeChannelInactive(AbstractChannelHandlerContext.java:227)
> at io.netty.channel.AbstractChannelHandlerContext.
> fireChannelInactive(AbstractChannelHandlerContext.java:220)
> at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(
> ChannelInboundHandlerAdapter.java:75)
> at io.netty.channel.AbstractChannelHandlerContext.
> invokeChannelInactive(AbstractChannelHandlerContext.java:241)
> at io.netty.channel.AbstractChannelHandlerContext.
> invokeChannelInactive(AbstractChannelHandlerContext.java:227)
> at io.netty.channel.AbstractChannelHandlerContext.
> fireChannelInactive(AbstractChannelHandlerContext.java:220)
> at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(
> ChannelInboundHandlerAdapter.java:75)
>
> this fails tasks and stage for 

Re: Loading objects only once

2017-09-28 Thread Vadim Semenov
as an alternative
```
spark-submit --files 
```

the files will be put on each executor in the working directory, so you can
then load it alongside your `map` function

Behind the scene it uses `SparkContext.addFile` method that you can use too
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala?utf8=✓#L1508-L1558

On Wed, Sep 27, 2017 at 10:08 PM, Naveen Swamy  wrote:

> Hello all,
>
> I am a new user to Spark, please bear with me if this has been discussed
> earlier.
>
> I am trying to run batch inference using DL frameworks pre-trained models
> and Spark. Basically, I want to download a model(which is usually ~500 MB)
> onto the workers and load the model and run inference on images fetched
> from the source like S3 something like this
> rdd = sc.parallelize(load_from_s3)
> rdd.map(fetch_from_s3).map(read_file).map(predict)
>
> I was able to get it running in local mode on Jupyter, However, I would
> like to load the model only once and not every map operation. A setup hook
> would have nice which loads the model once into the JVM, I came across this
> JIRA https://issues.apache.org/jira/browse/SPARK-650  which suggests that
> I can use Singleton and static initialization. I tried to do this using
> a Singleton metaclass following the thread here
> https://stackoverflow.com/questions/6760685/creating-a-singleton-in-python.
> Following this failed miserably complaining that Spark cannot serialize
> ctype objects with pointer references.
>
> After a lot of trial and error, I moved the code to a separate file by
> creating a static method for predict that checks if a class variable is set
> or not and loads the model if not set. This approach does not sound thread
> safe to me, So I wanted to reach out and see if there are established
> patterns on how to achieve something like this.
>
>
> Also, I would like to understand the executor->tasks->python process
> mapping, Does each task gets mapped to a separate python process?  The
> reason I ask is I want to be to use mapPartition method to load a batch of
> files and run inference on them separately for which I need to load the
> object once per task. Any
>
>
> Thanks for your time in answering my question.
>
> Cheers, Naveen
>
>
>


Re: Loading objects only once

2017-09-28 Thread Vadim Semenov
Something like this:

```
object Model {
   @transient lazy val modelObject = new ModelLoader("model-filename")

   def get() = modelObject
}

object SparkJob {
  def main(args: Array[String]) = {
 sc.addFile("s3://bucket/path/model-filename")

 sc.parallelize(…).map(test => {
 Model.get().use(…)
 })
  }
}
```

On Thu, Sep 28, 2017 at 3:49 PM, Vadim Semenov <vadim.seme...@datadoghq.com>
wrote:

> as an alternative
> ```
> spark-submit --files 
> ```
>
> the files will be put on each executor in the working directory, so you
> can then load it alongside your `map` function
>
> Behind the scene it uses `SparkContext.addFile` method that you can use
> too
> https://github.com/apache/spark/blob/master/core/src/
> main/scala/org/apache/spark/SparkContext.scala?utf8=✓#L1508-L1558
>
> On Wed, Sep 27, 2017 at 10:08 PM, Naveen Swamy <mnnav...@gmail.com> wrote:
>
>> Hello all,
>>
>> I am a new user to Spark, please bear with me if this has been discussed
>> earlier.
>>
>> I am trying to run batch inference using DL frameworks pre-trained models
>> and Spark. Basically, I want to download a model(which is usually ~500 MB)
>> onto the workers and load the model and run inference on images fetched
>> from the source like S3 something like this
>> rdd = sc.parallelize(load_from_s3)
>> rdd.map(fetch_from_s3).map(read_file).map(predict)
>>
>> I was able to get it running in local mode on Jupyter, However, I would
>> like to load the model only once and not every map operation. A setup hook
>> would have nice which loads the model once into the JVM, I came across this
>> JIRA https://issues.apache.org/jira/browse/SPARK-650  which suggests
>> that I can use Singleton and static initialization. I tried to do this
>> using a Singleton metaclass following the thread here
>> https://stackoverflow.com/questions/6760685/creating-a-singl
>> eton-in-python. Following this failed miserably complaining that Spark
>> cannot serialize ctype objects with pointer references.
>>
>> After a lot of trial and error, I moved the code to a separate file by
>> creating a static method for predict that checks if a class variable is set
>> or not and loads the model if not set. This approach does not sound thread
>> safe to me, So I wanted to reach out and see if there are established
>> patterns on how to achieve something like this.
>>
>>
>> Also, I would like to understand the executor->tasks->python process
>> mapping, Does each task gets mapped to a separate python process?  The
>> reason I ask is I want to be to use mapPartition method to load a batch of
>> files and run inference on them separately for which I need to load the
>> object once per task. Any
>>
>>
>> Thanks for your time in answering my question.
>>
>> Cheers, Naveen
>>
>>
>>
>


Re: Saving dataframes with partitionBy: append partitions, overwrite within each

2017-09-29 Thread Vadim Semenov
As alternative: checkpoint the dataframe, collect days, and then delete
corresponding directories using hadoop FileUtils, then write the dataframe

On Fri, Sep 29, 2017 at 10:31 AM, peay  wrote:

> Hello,
>
> I am trying to use data_frame.write.partitionBy("day").save("dataset.parquet")
> to write a dataset while splitting by day.
>
> I would like to run a Spark job  to process, e.g., a month:
> dataset.parquet/day=2017-01-01/...
> ...
>
> and then run another Spark job to add another month using the same folder
> structure, getting me
> dataset.parquet/day=2017-01-01/
> ...
> dataset.parquet/day=2017-02-01/ 
> ...
>
> However:
> - with save mode "overwrite", when I process the second month, all of
> dataset.parquet/ gets removed and I lose whatever was already computed for
> the previous month.
> - with save mode "append", then I can't get idempotence: if I run the job
> to process a given month twice, I'll get duplicate data in all the
> subfolders for that month.
>
> Is there a way to do "append in terms of the subfolders from partitionBy,
> but overwrite within each such partitions? Any help would be appreciated.
>
> Thanks!
>


Re: HDFS or NFS as a cache?

2017-09-29 Thread Vadim Semenov
How many files you produce? I believe it spends a lot of time on renaming
the files because of the output committer.
Also instead of 5x c3.2xlarge try using 2x c3.8xlarge instead because they
have 10GbE and you can get good throughput for S3.

On Fri, Sep 29, 2017 at 9:15 AM, Alexander Czech <
alexander.cz...@googlemail.com> wrote:

> I have a small EC2 cluster with 5 c3.2xlarge nodes and I want to write
> parquet files to S3. But the S3 performance for various reasons is bad when
> I access s3 through the parquet write method:
>
> df.write.parquet('s3a://bucket/parquet')
>
> Now I want to setup a small cache for the parquet output. One output is
> about 12-15 GB in size. Would it be enough to setup a NFS-directory on the
> master, write the output to it and then move it to S3? Or should I setup a
> HDFS on the Master? Or should I even opt for an additional cluster running
> a HDFS solution on more than one node?
>
> thanks!
>


Re: More instances = slower Spark job

2017-09-29 Thread Vadim Semenov
Hi Jeroen,

> However, am I correct in assuming that all the filtering will be then
performed on the driver (since the .gz files are not splittable), albeit in
several threads?

Filtering will not happen on the driver, it'll happen on executors, since
`spark.read.json(…).filter(…).write(…)` is a separate job. But you have to
submit each job in a separate thread, because each thread will get locked
until the corresponding job finishes, so that's why you have to use
`parallel collections`, you could also just use Futures, but it's just
easier to use a `ParArray`.

Internally it will work this way: once one task finishes decompressing a
file, many tasks will get scheduled (based on `spark.default.parallelism`),
and the executor that decompressed the file will start processing lines
using all available threads, and after some time additional executors may
join (based on the locality levels), and then after filtering, you would
have to repartition back to 1 partition, so you could write just one
`.gzip` file.

And for each file, there will be a separate job, but because they all run
within one Spark Context, executors will stay with the job, and will work
on all files simultaneously.
See more about scheduling within one application:
https://spark.apache.org/docs/2.2.0/job-scheduling.html#
scheduling-within-an-application

On Fri, Sep 29, 2017 at 12:58 PM, Jeroen Miller <bluedasya...@gmail.com>
wrote:

> On Thu, Sep 28, 2017 at 11:55 PM, Jeroen Miller <bluedasya...@gmail.com>
> wrote:
> > On Thu, Sep 28, 2017 at 9:16 PM, Vadim Semenov
> > <vadim.seme...@datadoghq.com> wrote:
> >> Instead of having one job, you can try processing each file in a
> separate
> >> job, but run multiple jobs in parallel within one SparkContext.
>
> Hello Vadim,
>
> Today was a bit busy and I did not have the time to play with your
> idea. However, am I correct in assuming that all the filtering will be
> then performed on the driver (since the .gz files are not splittable),
> albeit in several threads?
>
> If this is correct, then I guess the proper way to tackle this task
> would be to run without any executors, but using all the cores and
> memory of the machine for the driver?
>
> I will keep you posted on my progress,
>
> Thanks,
>
> Jeroen
>


Re: Multiple filters vs multiple conditions

2017-10-03 Thread Vadim Semenov
Since you're using Dataset API or RDD API, they won't be fused together by
the Catalyst optimizer unless you use the DF API.
Two filters will get executed within one stage, and there'll be very small
overhead on having two separate filters vs having only one.

On Tue, Oct 3, 2017 at 8:14 AM, Ahmed Mahmoud  wrote:

> Hi All,
>
> Just a quick question from an optimisation point of view:
>
> Approach 1:
> .filter (t-> t.x=1 && t.y=2)
>
> Approach 2:
> .filter (t-> t.x=1)
> .filter (t-> t.y=2)
>
> Is there a difference or one is better than the other  or both are same?
>
> Thanks!
> Ahmed Mahmoud
>
>


Re: Collecting Multiple Aggregation query result on one Column as collectAsMap

2017-08-28 Thread Vadim Semenov
I didn't tailor it to your needs, but this is what I can offer you, the
idea should be pretty clear

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{collect_list, struct}

val spark: SparkSession

import spark.implicits._

case class Input(
  a: Int,
  b: Long,
  c: Int,
  d: Int,
  e: String
)

case class Output(
  a: Int,
  b: Long,
  data: Seq[(Int, Int, String)]
)

val df = spark.createDataFrame(Seq(
  Input(1, 1, 1, 1, "a"),
  Input(1, 1, 1, 1, "b"),
  Input(1, 1, 1, 1, "c"),
  Input(1, 2, 3, 3, "d")
))

val dfOut = df.groupBy("a", "b")
  .agg(collect_list(struct($"c", $"d", $"e")))
  .queryExecution.toRdd.mapPartitions(_.map(r => {
val a = r.getInt(0)
val b = r.getLong(1)

val list = r.getArray(2)

Output(
  a,
  b,
  (0 until list.numElements()).map(i => {
val struct = list.getStruct(i, 3)
val c = struct.getInt(0)
val d = struct.getInt(1)
val e = struct.getString(2)

(c, d, e)
  })
)
  })).toDF()
dfOut.explain(extended = true)
dfOut.show()


On Mon, Aug 28, 2017 at 10:47 AM, Patrick  wrote:

> Hi
>
> I have two lists:
>
>
>- List one: contains names of columns on which I want to do aggregate
>operations.
>- List two: contains the aggregate operations on which I want to
>perform on each column eg ( min, max, mean)
>
> I am trying to use spark 2.0 dataset to achieve this. Spark provides an
> agg() where you can pass a Map  (of column name and
> respective aggregate operation ) as input, however I want to perform
> different aggregation operations on the same column of the data and want to
> collect the result in a Map where key is the aggregate
> operation and Value is the result on the particular column.  If i add
> different agg() to same column, the key gets updated with latest value.
>
> Also I dont find any collectAsMap() operation that returns map of
> aggregated column name as key and result as value. I get collectAsList()
> but i dont know the order in which those agg() operations are run so how do
> i match which list values corresponds to which agg operation.  I am able to
> see the result using .show() but How can i collect the result in this case ?
>
> Is it possible to do different aggregation on the same column in one
> Job(i.e only one collect operation) using agg() operation?
>
>
> Thanks in advance.
>
>


Re: Referencing YARN application id, YARN container hostname, Executor ID and YARN attempt for jobs running on Spark EMR 5.7.0 in log statements?

2017-08-28 Thread Vadim Semenov
When you create a EMR cluster you can specify a S3 path where logs will be
saved after cluster, something like this:

s3://bucket/j-18ASDKLJLAKSD/containers/application_1494074597524_0001/container_1494074597524_0001_01_01/stderr.gz

http://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-manage-view-web-log-files.html

On Mon, Aug 28, 2017 at 4:43 PM, Mikhailau, Alex 
wrote:

> Does anyone have a working solution for logging YARN application id, YARN
> container hostname, Executor ID and YARN attempt for jobs running on Spark
> EMR 5.7.0 in log statements? Are there specific ENV variables available or
> other workflow for doing that?
>
>
>
> Thank you
>
>
>
> Alex
>


Re: Referencing YARN application id, YARN container hostname, Executor ID and YARN attempt for jobs running on Spark EMR 5.7.0 in log statements?

2017-08-29 Thread Vadim Semenov
Each java process for each of the executors has some environment variables
that you can used, for example:

> CONTAINER_ID=container_1503994094228_0054_01_13

The executor id gets passed as an argument to the process:

> /usr/lib/jvm/java-1.8.0/bin/java … --driver-url
spark://CoarseGrainedScheduler@:38151 *--executor-id 3 *--hostname ip-1…

And it gets printed out in the container log:

> 17/08/29 13:02:00 INFO Executor: Starting executor ID 3 on host …



On Mon, Aug 28, 2017 at 5:41 PM, Mikhailau, Alex <alex.mikhai...@mlb.com>
wrote:

> Thanks, Vadim. The issue is not access to logs. I am able to view them.
>
>
>
> I have cloudwatch logs agent push logs to elasticsearch and then into
> Kibana using json-event-layout for log4j output. I would like to also log
> applicationId, executorId, etc in those log statements for clarity. Is
> there an MDC way with spark or something other than to achieve this?
>
>
>
> Alex
>
>
>
> *From: *Vadim Semenov <vadim.seme...@datadoghq.com>
> *Date: *Monday, August 28, 2017 at 5:18 PM
> *To: *"Mikhailau, Alex" <alex.mikhai...@mlb.com>
> *Cc: *"user@spark.apache.org" <user@spark.apache.org>
> *Subject: *Re: Referencing YARN application id, YARN container hostname,
> Executor ID and YARN attempt for jobs running on Spark EMR 5.7.0 in log
> statements?
>
>
>
> When you create a EMR cluster you can specify a S3 path where logs will be
> saved after cluster, something like this:
>
>
>
> s3://bucket/j-18ASDKLJLAKSD/containers/application_
> 1494074597524_0001/container_1494074597524_0001_01_01/stderr.gz
>
>
>
> http://docs.aws.amazon.com/emr/latest/ManagementGuide/
> emr-manage-view-web-log-files.html
>
>
>
> On Mon, Aug 28, 2017 at 4:43 PM, Mikhailau, Alex <alex.mikhai...@mlb.com>
> wrote:
>
> Does anyone have a working solution for logging YARN application id, YARN
> container hostname, Executor ID and YARN attempt for jobs running on Spark
> EMR 5.7.0 in log statements? Are there specific ENV variables available or
> other workflow for doing that?
>
>
>
> Thank you
>
>
>
> Alex
>
>
>


Re: Spark Writing to parquet directory : java.io.IOException: Disk quota exceeded

2017-11-22 Thread Vadim Semenov
The error message seems self-explanatory, try to figure out what's the disk
quota you have for your user.

On Wed, Nov 22, 2017 at 8:23 AM, Chetan Khatri 
wrote:

> Anybody reply on this ?
>
> On Tue, Nov 21, 2017 at 3:36 PM, Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
>
>>
>> Hello Spark Users,
>>
>> I am getting below error, when i am trying to write dataset to parquet
>> location. I have enough disk space available. Last time i was facing same
>> kind of error which were resolved by increasing number of cores at hyper
>> parameters. Currently result set data size is almost 400Gig with below
>> hyper parameters
>>
>> Driver memory: 4g
>> Executor Memory: 16g
>> Executor cores=12
>> num executors= 8
>>
>> Still it's failing, any Idea ? that if i increase executor memory and
>> number of executors.  it could get resolved ?
>>
>>
>> 17/11/21 04:29:37 ERROR storage.DiskBlockObjectWriter: Uncaught exception
>> while reverting partial writes to file /mapr/chetan/local/david.com/t
>> mp/hadoop/nm-local-dir/usercache/david-khurana/appcache/
>> application_1509639363072_10572/blockmgr-008604e6-37cb-
>> 421f-8cc5-e94db75684e7/12/temp_shuffle_ae885911-a1ef-
>> 404f-9a6a-ded544bb5b3c
>> java.io.IOException: Disk quota exceeded
>> at java.io.FileOutputStream.close0(Native Method)
>> at java.io.FileOutputStream.access$000(FileOutputStream.java:53)
>> at java.io.FileOutputStream$1.close(FileOutputStream.java:356)
>> at java.io.FileDescriptor.closeAll(FileDescriptor.java:212)
>> at java.io.FileOutputStream.close(FileOutputStream.java:354)
>> at org.apache.spark.storage.TimeTrackingOutputStream.close(Time
>> TrackingOutputStream.java:72)
>> at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
>> at net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStr
>> eam.java:178)
>> at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
>> at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
>> at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$
>> anon$2.close(UnsafeRowSerializer.scala:96)
>> at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$
>> close$2.apply$mcV$sp(DiskBlockObjectWriter.scala:108)
>> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:
>> 1316)
>> at org.apache.spark.storage.DiskBlockObjectWriter.close(DiskBlo
>> ckObjectWriter.scala:107)
>> at org.apache.spark.storage.DiskBlockObjectWriter.revertPartial
>> WritesAndClose(DiskBlockObjectWriter.scala:159)
>> at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.s
>> top(BypassMergeSortShuffleWriter.java:234)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>> Task.scala:85)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>> Task.scala:47)
>> at org.apache.spark.scheduler.Task.run(Task.scala:86)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.
>> scala:274)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>> 17/11/21 04:29:37 WARN netty.OneWayOutboxMessage: Failed to send one-way
>> RPC.
>> java.io.IOException: Failed to connect to /192.168.123.43:58889
>> at org.apache.spark.network.client.TransportClientFactory.creat
>> eClient(TransportClientFactory.java:228)
>> at org.apache.spark.network.client.TransportClientFactory.creat
>> eClient(TransportClientFactory.java:179)
>> at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpc
>> Env.scala:197)
>> at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:
>> 191)
>> at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:
>> 187)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.net.ConnectException: Connection refused: /
>> 192.168.123.43:58889
>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl
>> .java:717)
>> at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect
>> (NioSocketChannel.java:224)
>> at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fi
>> nishConnect(AbstractNioChannel.java:289)
>> at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEven
>> tLoop.java:528)
>> at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimiz
>> ed(NioEventLoop.java:468)
>> at 

Re: JDK1.8 for spark workers

2017-11-29 Thread Vadim Semenov
You can pass `JAVA_HOME` environment variable

`spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-1.8.0`

On Wed, Nov 29, 2017 at 10:54 AM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Hi,
>
> I am running cloudera version of spark2.1 and our cluster is on JDK1.7.
> For some of the libraries, I need JDK1.8, is there a way to set to run
> Spark worker in JDK1.8 without upgrading .
>
> I was able run driver in JDK 1.8 by setting the path but not the workers.
>
> 17/11/28 20:22:27 WARN scheduler.TaskSetManager: Lost task 0.0 in stage
> 1.0 (TID 1, brksvl267.brk.navistar.com, executor 1): 
> java.lang.UnsupportedClassVersionError:
> org/wololo/geojson/GeoJSON : Unsupported major.minor version 52.0
>
> Thanks,
> Asmath
>


Re: NullPointerException while reading a column from the row

2017-12-19 Thread Vadim Semenov
getAs defined as:

def getAs[T](i: Int): T = get(i).asInstanceOf[T]

and when you do toString you call Object.toString which doesn't depend on
the type,
so asInstanceOf[T] get dropped by the compiler, i.e.

row.getAs[Int](0).toString -> row.get(0).toString

we can confirm that by writing a simple scala code:

import org.apache.spark.sql._
object Test {
  val row = Row(null)
  row.getAs[Int](0).toString
}

and then compiling it:

$ scalac -classpath $SPARK_HOME/jars/'*' -print test.scala
[[syntax trees at end of   cleanup]] // test.scala
package  {
  object Test extends Object {
private[this] val row: org.apache.spark.sql.Row = _;
  def row(): org.apache.spark.sql.Row = Test.this.row;
def (): Test.type = {
  Test.super.();
  Test.this.row =
org.apache.spark.sql.Row.apply(scala.this.Predef.genericWrapArray(Array[Object]{null}));
  *Test.this.row().getAs(0).toString();*
  ()
}
  }
}

So the proper way would be:

String.valueOf(row.getAs[Int](0))


On Tue, Dec 19, 2017 at 4:23 AM, Anurag Sharma  wrote:

> The following Scala (Spark 1.6) code for reading a value from a Row fails
> with a NullPointerException when the value is null.
>
> val test = row.getAs[Int]("ColumnName").toString
>
> while this works fine
>
> val test1 = row.getAs[Int]("ColumnName") // returns 0 for nullval test2 = 
> test1.toString // converts to String fine
>
> What is causing NullPointerException and what is the recommended way to
> handle such cases?
>
> PS: getting row from DataFrame as follows:
>
>  val myRDD = myDF
> .repartition(partitions)
> .mapPartitions {
>   rows =>
> rows.flatMap {
> row =>
>   functionWithRows(row) //has above logic to read null column 
> which fails
>   }
>   }
>
> functionWithRows has then above mentioned NullPointerException
>
> MyDF schema:
>
> root
>  |-- LDID: string (nullable = true)
>  |-- KTAG: string (nullable = true)
>  |-- ColumnName: integer (nullable = true)
>
>


Re: What does Blockchain technology mean for Big Data? And how Hadoop/Spark will play role with it?

2017-12-19 Thread Vadim Semenov
I think it means that we can replace HDFS with a blockchain-based FS, and
then offload some processing to smart contracts.

On Mon, Dec 18, 2017 at 11:59 PM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> I am looking for same answer too .. will wait for response from other
> people
>
> Sent from my iPhone
>
> > On Dec 18, 2017, at 10:56 PM, Gaurav1809 
> wrote:
> >
> > Hi All,
> >
> > Will Bigdata tools & technology work with Blockchain in future? Any
> possible
> > use cases that anyone is likely to face, please share.
> >
> > Thanks
> > Gaurav
> >
> >
> >
> > --
> > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: /tmp fills up to 100GB when using a window function

2017-12-19 Thread Vadim Semenov
Spark doesn't remove intermediate shuffle files if they're part of the same
job.

On Mon, Dec 18, 2017 at 3:10 PM, Mihai Iacob  wrote:

> This code generates files under /tmp...blockmgr... which do not get
> cleaned up after the job finishes.
>
> Anything wrong with the code below? or are there any known issues with
> spark not cleaning up /tmp files?
>
>
> window = Window.\
>   partitionBy('***', 'date_str').\
>   orderBy(sqlDf['***'])
>
> sqlDf = sqlDf.withColumn("***",rank().over(window))
> df_w_least = sqlDf.filter("***=1")
>
>
>
>
>
> Regards,
>
> *Mihai Iacob*
> DSX Local  - Security, IBM Analytics
>
> - To
> unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: /tmp fills up to 100GB when using a window function

2017-12-19 Thread Vadim Semenov
Until after an action is done (i.e. save/count/reduce) or if you explicitly
truncate the DAG by checkpointing.

Spark needs to keep all shuffle files because if some task/stage/node fails
it'll only need to recompute missing partitions by using already computed
parts.

On Tue, Dec 19, 2017 at 10:08 AM, Mihai Iacob <mia...@ca.ibm.com> wrote:

> When does spark remove them?
>
>
> Regards,
>
> *Mihai Iacob*
> DSX Local <https://datascience.ibm.com/local> - Security, IBM Analytics
>
>
>
> - Original message -
> From: Vadim Semenov <vadim.seme...@datadoghq.com>
> To: Mihai Iacob <mia...@ca.ibm.com>
> Cc: user <user@spark.apache.org>
> Subject: Re: /tmp fills up to 100GB when using a window function
> Date: Tue, Dec 19, 2017 9:46 AM
>
> Spark doesn't remove intermediate shuffle files if they're part of the
> same job.
>
> On Mon, Dec 18, 2017 at 3:10 PM, Mihai Iacob <mia...@ca.ibm.com> wrote:
>
> This code generates files under /tmp...blockmgr... which do not get
> cleaned up after the job finishes.
>
> Anything wrong with the code below? or are there any known issues with
> spark not cleaning up /tmp files?
>
> window = Window.\
>   partitionBy('***', 'date_str').\
>   orderBy(sqlDf['***'])
>
> sqlDf = sqlDf.withColumn("***",rank().over(window))
> df_w_least = sqlDf.filter("***=1")
>
>
>
>
> Regards,
>
> *Mihai Iacob*
> DSX Local <https://datascience.ibm.com/local> - Security, IBM Analytics
>
> - To
> unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>
>


Re: Kryo not registered class

2017-11-20 Thread Vadim Semenov
Try:

Class.forName("[Lorg.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$SerializableFileStatus$SerializableBlockLocation;")

On Sun, Nov 19, 2017 at 3:24 PM, Angel Francisco Orta <
angel.francisco.o...@gmail.com> wrote:

> Hello, I'm with spark 2.1.0 with scala and I'm registering all classes
> with kryo, and I have a  problem registering this class,
>
> org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$
> SerializableFileStatus$SerializableBlockLocation[]
>
> I can't register with classOf[Array[Class.forName("org.apache.spark.sql.
> execution.datasources.PartitioningAwareFileIndex$SerializableFileStatus$
> SerializableBlockLocation").type]]
>
>
> I have tried as well creating a java class like register and registering
> the class as org.apache.spark.sql.execution.datasources.
> PartitioningAwareFileIndex$SerializableFileStatus$
> SerializableBlockLocation[].class;
>
> Any clue is appreciatted,
>
> Thanks.
>
>


Re: Process large JSON file without causing OOM

2017-11-15 Thread Vadim Semenov
There's a lot of off-heap memory involved in decompressing Snappy,
compressing ZLib.

Since you're running using `local[*]`, you process multiple tasks
simultaneously, so they all might consume memory.

I don't think that increasing heap will help, since it looks like you're
hitting system memory limits.

I'd suggest trying to run with `local[2]` and checking what's the memory
usage of the jvm process.

On Mon, Nov 13, 2017 at 7:22 PM, Alec Swan  wrote:

> Hello,
>
> I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB
> format. Effectively, my Java service starts up an embedded Spark cluster
> (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I
> keep getting OOM errors with large (~1GB) files.
>
> I've tried different ways to reduce memory usage, e.g. by partitioning
> data with dataSet.partitionBy("customer).save(filePath), or capping
> memory usage by setting spark.executor.memory=1G, but to no vail.
>
> I am wondering if there is a way to avoid OOM besides splitting the source
> JSON file into multiple smaller ones and processing the small ones
> individually? Does Spark SQL have to read the JSON/Snappy (row-based) file
> in it's entirety before converting it to ORC (columnar)? If so, would it
> make sense to create a custom receiver that reads the Snappy file and use
> Spark streaming for ORC conversion?
>
> Thanks,
>
> Alec
>


Re: Spark based Data Warehouse

2017-11-12 Thread Vadim Semenov
It's actually quite simple to answer

> 1. Is Spark SQL and UDF, able to handle all the workloads?
Yes

> 2. What user interface did you provide for data scientist, data engineers
and analysts
Home-grown platform, EMR, Zeppelin

> What are the challenges in running concurrent queries, by many users,
over Spark SQL? Considering Spark still does not provide spill to disk, in
many scenarios, are there frequent query failures when executing concurrent
queries
You can run separate Spark Contexts, so jobs will be isolated

> Are there any open source implementations, which provide something
similar?
Yes, many.


On Sun, Nov 12, 2017 at 1:47 PM, Gourav Sengupta 
wrote:

> Dear Ashish,
> what you are asking for involves at least a few weeks of dedicated
> understanding of your used case and then it takes at least 3 to 4 months to
> even propose a solution. You can even build a fantastic data warehouse just
> using C++. The matter depends on lots of conditions. I just think that your
> approach and question needs a lot of modification.
>
> Regards,
> Gourav
>
> On Sun, Nov 12, 2017 at 6:19 PM, Phillip Henry 
> wrote:
>
>> Hi, Ashish.
>>
>> You are correct in saying that not *all* functionality of Spark is
>> spill-to-disk but I am not sure how this pertains to a "concurrent user
>> scenario". Each executor will run in its own JVM and is therefore isolated
>> from others. That is, if the JVM of one user dies, this should not effect
>> another user who is running their own jobs in their own JVMs. The amount of
>> resources used by a user can be controlled by the resource manager.
>>
>> AFAIK, you configure something like YARN to limit the number of cores and
>> the amount of memory in the cluster a certain user or group is allowed to
>> use for their job. This is obviously quite a coarse-grained approach as (to
>> my knowledge) IO is not throttled. I believe people generally use something
>> like Apache Ambari to keep an eye on network and disk usage to mitigate
>> problems in a shared cluster.
>>
>> If the user has badly designed their query, it may very well fail with
>> OOMEs but this can happen irrespective of whether one user or many is using
>> the cluster at a given moment in time.
>>
>> Does this help?
>>
>> Regards,
>>
>> Phillip
>>
>>
>> On Sun, Nov 12, 2017 at 5:50 PM, ashish rawat 
>> wrote:
>>
>>> Thanks Jorn and Phillip. My question was specifically to anyone who have
>>> tried creating a system using spark SQL, as Data Warehouse. I was trying to
>>> check, if someone has tried it and they can help with the kind of workloads
>>> which worked and the ones, which have problems.
>>>
>>> Regarding spill to disk, I might be wrong but not all functionality of
>>> spark is spill to disk. So it still doesn't provide DB like reliability in
>>> execution. In case of DBs, queries get slow but they don't fail or go out
>>> of memory, specifically in concurrent user scenarios.
>>>
>>> Regards,
>>> Ashish
>>>
>>> On Nov 12, 2017 3:02 PM, "Phillip Henry" 
>>> wrote:
>>>
>>> Agree with Jorn. The answer is: it depends.
>>>
>>> In the past, I've worked with data scientists who are happy to use the
>>> Spark CLI. Again, the answer is "it depends" (in this case, on the skills
>>> of your customers).
>>>
>>> Regarding sharing resources, different teams were limited to their own
>>> queue so they could not hog all the resources. However, people within a
>>> team had to do some horse trading if they had a particularly intensive job
>>> to run. I did feel that this was an area that could be improved. It may be
>>> by now, I've just not looked into it for a while.
>>>
>>> BTW I'm not sure what you mean by "Spark still does not provide spill to
>>> disk" as the FAQ says "Spark's operators spill data to disk if it does not
>>> fit in memory" (http://spark.apache.org/faq.html). So, your data will
>>> not normally cause OutOfMemoryErrors (certain terms and conditions may
>>> apply).
>>>
>>> My 2 cents.
>>>
>>> Phillip
>>>
>>>
>>>
>>> On Sun, Nov 12, 2017 at 9:14 AM, Jörn Franke 
>>> wrote:
>>>
 What do you mean all possible workloads?
 You cannot prepare any system to do all possible processing.

 We do not know the requirements of your data scientists now or in the
 future so it is difficult to say. How do they work currently without the
 new solution? Do they all work on the same data? I bet you will receive on
 your email a lot of private messages trying to sell their solution that
 solves everything - with the information you provided this is impossible to
 say.

 Then with every system: have incremental releases but have then in
 short time frames - do not engineer a big system that you will deliver in 2
 years. In the cloud you have the perfect possibility to scale feature but
 also infrastructure wise.

 Challenges with concurrent queries is 

Re: RDD[internalRow] -> DataSet

2017-12-12 Thread Vadim Semenov
not possible, but you can add your own object in your project to the
spark's package that would give you access to private methods

package org.apache.spark.sql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.types.StructType

object DataFrameUtil {
  /**
* Creates a DataFrame out of RDD[InternalRow] that you can get
using `df.queryExection.toRdd`
*/
  def createFromInternalRows(sparkSession: SparkSession, schema:
StructType, rdd: RDD[InternalRow]): DataFrame = {
val logicalPlan = LogicalRDD(schema.toAttributes, rdd)(sparkSession)
Dataset.ofRows(sparkSession, logicalPlan)
  }
}


Re: Free Column Reference with $

2018-05-04 Thread Vadim Semenov
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L38-L47

It's called String Interpolation
See "Advanced Usage" here
https://docs.scala-lang.org/overviews/core/string-interpolation.html

On Fri, May 4, 2018 at 10:10 AM, Christopher Piggott 
wrote:

> How does $"something" actually work (from a scala perspective) as a free
> column reference?
>
>


-- 
Sent from my iPhone


Re: Tuning Resource Allocation during runtime

2018-04-27 Thread Vadim Semenov
You can not change dynamically the number of cores per executor or cores
per task, but you can change the number of executors.

In one of my jobs I have something like this, so when I know that I don't
need more than 4 executors, I kill all other executors (assuming that they
don't hold any cached data), and they join other jobs (thanks to dynamic
allocation)


// At this point we have 1500 parquet files
// But we want 100 files, which means about 4 executors can process
everything
// assuming that they can process 30 tasks each
// So we can let other executors leave the job
val executors = SparkContextUtil.getExecutorIds(sc)
executors.take(executors.size - 4).foreach(sc.killExecutor)


package org.apache.spark
/**
* `SparkContextUtil` gives access to private methods
*/
object SparkContextUtil {
def getExecutorIds(sc: SparkContext): Seq[String] =
sc.getExecutorIds.filter(_ != SparkContext.DRIVER_IDENTIFIER)




On Fri, Apr 27, 2018 at 3:52 AM, Donni Khan 
wrote:

> Hi All,
>
> Is there any way to change the  number of executors/cores  during running
> Saprk Job.
> I have Spark Job containing two tasks: First task need many executors to
> run fastly. the second task has many input and output opeartions and
> shuffling, so it needs  few executors, otherwise it taks loong time to
> finish.
> Does anyone knows if that possible in YARN?
>
>
> Thank you.
> Donni
>



-- 
Sent from my iPhone


Re:

2018-05-16 Thread Vadim Semenov
Upon downsizing to 20 partitions some of your partitions become too big,
and I see that you're doing caching, and executors try to write big
partitions to disk, but fail because they exceed 2GiB

> Caused by: java.lang.IllegalArgumentException: Size exceeds
Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.
apply(DiskStore.scala:125)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.
apply(DiskStore.scala:124)

You can try to coalesce to 100 and reduce the number of executors to keep
the load on MySQL reasonable

On Wed, May 16, 2018 at 5:36 AM, Davide Brambilla <
davide.brambi...@contentwise.tv> wrote:

> Hi all,
>we have a dataframe with 1000 partitions and we need to write the
> dataframe into a MySQL using this command:
>
> df.coalesce(20)
> df.write.jdbc(url=url,
>   table=table,
>   mode=mode,
>   properties=properties)
>
> and we get this errors randomly
>
> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
> at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.
> apply(DiskStore.scala:125)
> at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.
> apply(DiskStore.scala:124)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:126)
> at org.apache.spark.storage.BlockManager.getLocalValues(
> BlockManager.scala:520)
> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:693)
> at org.apache.spark.storage.BlockManager.getOrElseUpdate(
> BlockManager.scala:753)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:96)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
> scheduler$DAGScheduler$$failJobAndIndependentStages(
> DAGScheduler.scala:1690)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
> DAGScheduler.scala:1678)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
> DAGScheduler.scala:1677)
> at scala.collection.mutable.ResizableArray$class.foreach(
> ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.abortStage(
> DAGScheduler.scala:1677)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
> at scala.Option.foreach(Option.scala:257)
> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(
> DAGScheduler.scala:855)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> doOnReceive(DAGScheduler.scala:1905)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> onReceive(DAGScheduler.scala:1860)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> onReceive(DAGScheduler.scala:1849)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
> at 

Re: Time series data

2018-05-24 Thread Vadim Semenov
Yeah, it depends on what you want to do with that timeseries data. We at
Datadog process trillions of points daily using Spark, I cannot really go
about what exactly we do with the data, but just saying that Spark can
handle the volume, scale well and be fault-tolerant, albeit everything I
said comes with multiple asterisks.

On Thursday, May 24, 2018, amin mohebbi  wrote:

> Could you please help me to understand  the performance that we get from
> using spark with any nosql or TSDB ? We receive 1 mil meters x 288 readings
> = 288 mil rows (Approx. 360 GB per day) – Therefore, we will end up with
> 10's or 100's of TBs of data and I feel that NoSQL will be much quicker
> than Hadoop/Spark. This is time series data that are coming from many
> devices in form of flat files and it is currently extracted / transformed 
> /loaded
> into another database which is connected to BI tools. We might use azure
> data factory to collect the flat files and then use spark to do the ETL(not
> sure if it is correct way) and then use spark to join table or do the
> aggregations and save them into a db (preferably nosql not sure).
> Finally, connect deploy Power BI to get visualize the data from nosql db.
> My questions are :
>
> 1- Is the above mentioned correct architecture? using spark with nosql as
> I think combination of these two could help to have random access and run
> many queries by different users.
> 2- do we really need to use a time series db?
>
>
> Best Regards ... Amin
> Mohebbi PhD candidate in Software Engineering   at university of Malaysia
> Tel : +60 18 2040 017 E-Mail : tp025...@ex.apiit.edu.my
> amin_...@me.com
>


-- 
Sent from my iPhone


Writing rows directly in Tungsten format into memory

2018-06-12 Thread Vadim Semenov
Is there a way to write rows directly into off-heap memory in the Tungsten
format bypassing creating objects?

I have a lot of rows, and right now I'm creating objects, and they get
encoded, but because of the number of rows, it creates significant pressure
on GC. I'd like to avoid creating objects at all, and since I know the
schema, I was wondering if I could just somehow write rows directly into
off-heap?

Thank you.


Re: Can we get the partition Index in an UDF

2018-06-25 Thread Vadim Semenov
Try using `TaskContext`:

import org.apache.spark.TaskContext
val partitionId = TaskContext.getPartitionId()

On Mon, Jun 25, 2018 at 11:17 AM Lalwani, Jayesh
 wrote:
>
> We are trying to add a column to a Dataframe with some data that is seeded by 
> some random data. We want to be able to control the seed, so multiple runs of 
> the same transformation generate the same output. We also want to generate 
> different random numbers for each partition
>
>
>
> This is easy to do with mapPartitionsWithIndex. For each partition, we 
> generate a Random number generator that is seeded with a global seed + index 
> of partition. The problem with this is mapPartitionsWithIndex is a blackbox, 
> and any filter predicates that are added after mapPartitionsWithIndex don’t 
> get pushed down to source
>
>
>
> If we implement this function as an UDF, we can get the filters pushed down 
> to the source, but we don’t have the partition index.
>
>
>
> Yes, I know we could use the mapPartitionsWithIndex after the filter. That is 
> what we will probably end up doing. I was wondering if there is a way of 
> implementing this without having to move the filter around.
>
>
> 
>
> The information contained in this e-mail is confidential and/or proprietary 
> to Capital One and/or its affiliates and may only be used solely in 
> performance of work or services for Capital One. The information transmitted 
> herewith is intended only for use by the individual or entity to which it is 
> addressed. If the reader of this message is not the intended recipient, you 
> are hereby notified that any review, retransmission, dissemination, 
> distribution, copying or other use of, or taking of any action in reliance 
> upon this information is strictly prohibited. If you have received this 
> communication in error, please contact the sender and delete the material 
> from your computer.



-- 
Sent from my iPhone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark.executor.extraJavaOptions inside application code

2018-05-02 Thread Vadim Semenov
You need to pass config before creating a session

val conf = new SparkConf()
// All three methods below are equivalent
conf.set("spark.executor.extraJavaOptions", "-Dbasicauth=myuser:mypassword")
conf.set("spark.executorEnv.basicauth", "myuser:mypassword")
conf.setExecutorEnv("basicauth", "myuser:mypassword")
val spark = SparkSession.builder().config(conf).appName("…").getOrCreate()


On Wed, May 2, 2018 at 6:59 AM, Agostino Calamita <
agostino.calam...@gmail.com> wrote:

> Hi all,
> I wrote an application that needs an environment variable. I can set this
> variable with
>
> --conf 'spark.executor.extraJavaOptions=-Dbasicauth=myuser:mypwd'
>
> in spark-submit and it works well in standalone cluster mode.
>
> But, I want set it inside the application code, because the variable
> contains a password.
>
> How can I do ?
>
> I tried with:
>
> SparkSession spark = SparkSession
>   .builder()
>   .appName("Java Spark Solr ETL")
>   .getOrCreate();
>
> 
> spark.sparkContext().conf().setExecutorEnv("spark.executor.extraJavaOptions",
> "-Dbasicauth=myuser:mypassword");
>
> but it doesn't work.
>
> Thanks.
>



-- 
Sent from my iPhone


Re: [Spark 2.x Core] .collect() size limit

2018-04-30 Thread Vadim Semenov
`.collect` returns an Array, and array's can't have more than Int.MaxValue
elements, and in most JVMs it's lower: `Int.MaxValue - 8`
So it puts upper limit, however, you can just create Array of Arrays, and
so on, basically limitless, albeit with some gymnastics.


Re: [G1GC] -XX: -ResizePLAB How to provide in Spark Submit

2018-07-03 Thread Vadim Semenov
As typical `JAVA_OPTS` you need to pass as a single parameter:

--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:-ResizePLAB"

Also you got an extra space in the parameter, there should be no space
after the colon symbol
On Tue, Jul 3, 2018 at 3:01 AM Aakash Basu  wrote:
>
> Hi,
>
> I used the below in the Spark Submit for using G1GC -
>
> --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC"
>
> Now, I want to use -XX: -ResizePLAB of the G1GC to control to avoid 
> performance degradation caused by a large number of thread communications.
>
> How to do it? I tried submitting in the similar fashion -
>
> --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC" --conf 
> "spark.executor.extraJavaOptions=-XX: -ResizePLAB", but it doesn't work.
>
> Thanks,
> Aakash.



-- 
Sent from my iPhone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Inferring Data driven Spark parameters

2018-07-03 Thread Vadim Semenov
You can't change the executor/driver cores/memory on the fly once
you've already started a Spark Context.
On Tue, Jul 3, 2018 at 4:30 AM Aakash Basu  wrote:
>
> We aren't using Oozie or similar, moreover, the end to end job shall be 
> exactly the same, but the data will be extremely different (number of 
> continuous and categorical columns, vertical size, horizontal size, etc), 
> hence, if there would have been a calculation of the parameters to arrive at 
> a conclusion that we can simply get the data and derive the respective 
> configuration/parameters, it would be great.
>
> On Tue, Jul 3, 2018 at 1:09 PM, Jörn Franke  wrote:
>>
>> Don’t do this in your job. Create for different types of jobs different jobs 
>> and orchestrate them using oozie or similar.
>>
>> On 3. Jul 2018, at 09:34, Aakash Basu  wrote:
>>
>> Hi,
>>
>> Cluster - 5 node (1 Driver and 4 workers)
>> Driver Config: 16 cores, 32 GB RAM
>> Worker Config: 8 cores, 16 GB RAM
>>
>> I'm using the below parameters from which I know the first chunk is cluster 
>> dependent and the second chunk is data/code dependent.
>>
>> --num-executors 4
>> --executor-cores 5
>> --executor-memory 10G
>> --driver-cores 5
>> --driver-memory 25G
>>
>>
>> --conf spark.sql.shuffle.partitions=100
>> --conf spark.driver.maxResultSize=2G
>> --conf "spark.executor.extraJavaOptions=-XX:+UseParallelGC"
>> --conf spark.scheduler.listenerbus.eventqueue.capacity=2
>>
>> I've come upto these values depending on my R on the properties and the 
>> issues I faced and hence the handles.
>>
>> My ask here is -
>>
>> 1) How can I infer, using some formula or a code, to calculate the below 
>> chunk dependent on the data/code?
>> 2) What are the other usable properties/configurations which I can use to 
>> shorten my job runtime?
>>
>> Thanks,
>> Aakash.
>
>


-- 
Sent from my iPhone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Sharing spark executor pool across multiple long running spark applications

2018-02-07 Thread Vadim Semenov
The other way might be to launch a single SparkContext and then run jobs
inside of it.

You can take a look at these projects:
-
https://github.com/spark-jobserver/spark-jobserver#persistent-context-mode---faster--required-for-related-jobs
- http://livy.incubator.apache.org

Problems with this way:
- Can't update the code of your job.
- A single job can break the SparkContext.


We evaluated this way and decided to go with the dynamic allocation,
but we also had to rethink the way we write our jobs:
- Can't use caching since it locks executors, have to use checkpointing,
which adds up to computation time.
- Use some unconventional methods like reusing the same DF to write out
multiple separate things in one go.
- Sometimes remove executors from within the job, like when we know how
many we would need, so the executors could join other jobs.

On Tue, Feb 6, 2018 at 3:00 PM, Nirav Patel  wrote:

> Currently sparkContext and it's executor pool is not shareable. Each
> spakContext gets its own executor pool for entire life of an application.
> So what is the best ways to share cluster resources across multiple long
> running spark applications?
>
> Only one I see is spark dynamic allocation but it has high latency when it
> comes to real-time application.
>
>
>
>
> [image: What's New with Xactly] 
>
> 
> 
>    
> 


Re: /tmp fills up to 100GB when using a window function

2017-12-20 Thread Vadim Semenov
Ah, yes, I missed that part

it's `spark.local.dir`

spark.local.dir /tmp Directory to use for "scratch" space in Spark,
including map output files and RDDs that get stored on disk. This should be
on a fast, local disk in your system. It can also be a comma-separated list
of multiple directories on different disks. NOTE: In Spark 1.0 and later
this will be overridden by SPARK_LOCAL_DIRS (Standalone, Mesos) or
LOCAL_DIRS (YARN) environment variables set by the cluster manager.

On Wed, Dec 20, 2017 at 2:58 PM, Gourav Sengupta 
wrote:

> I do think that there is an option to set the temporary shuffle location
> to a particular directory. While working with EMR I set it to /mnt1/. Let
> me know in case you are not able to find it.
>
> On Mon, Dec 18, 2017 at 8:10 PM, Mihai Iacob  wrote:
>
>> This code generates files under /tmp...blockmgr... which do not get
>> cleaned up after the job finishes.
>>
>> Anything wrong with the code below? or are there any known issues with
>> spark not cleaning up /tmp files?
>>
>>
>> window = Window.\
>>   partitionBy('***', 'date_str').\
>>   orderBy(sqlDf['***'])
>>
>> sqlDf = sqlDf.withColumn("***",rank().over(window))
>> df_w_least = sqlDf.filter("***=1")
>>
>>
>>
>>
>>
>> Regards,
>>
>> *Mihai Iacob*
>> DSX Local  - Security, IBM Analytics
>>
>> - To
>> unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>


Re: Passing an array of more than 22 elements in a UDF

2017-12-26 Thread Vadim Semenov
Functions are still limited to 22 arguments

https://github.com/scala/scala/blob/2.13.x/src/library/scala/Function22.scala

On Tue, Dec 26, 2017 at 2:19 PM, Felix Cheung 
wrote:

> Generally the 22 limitation is from Scala 2.10.
>
> In Scala 2.11, the issue with case class is fixed, but with that said I’m
> not sure if with UDF in Java other limitation might apply.
>
> _
> From: Aakash Basu 
> Sent: Monday, December 25, 2017 9:13 PM
> Subject: Re: Passing an array of more than 22 elements in a UDF
> To: Felix Cheung 
> Cc: ayan guha , user 
>
>
>
> What's the privilege of using that specific version for this? Please throw
> some light onto it.
>
> On Mon, Dec 25, 2017 at 6:51 AM, Felix Cheung 
> wrote:
>
>> Or use it with Scala 2.11?
>>
>> --
>> *From:* ayan guha 
>> *Sent:* Friday, December 22, 2017 3:15:14 AM
>> *To:* Aakash Basu
>> *Cc:* user
>> *Subject:* Re: Passing an array of more than 22 elements in a UDF
>>
>> Hi I think you are in correct track. You can stuff all your param in a
>> suitable data structure like array or dict and pass this structure as a
>> single param in your udf.
>>
>> On Fri, 22 Dec 2017 at 2:55 pm, Aakash Basu 
>> wrote:
>>
>>> Hi,
>>>
>>> I am using Spark 2.2 using Java, can anyone please suggest me how to
>>> take more than 22 parameters in an UDF? I mean, if I want to pass all the
>>> parameters as an array of integers?
>>>
>>> Thanks,
>>> Aakash.
>>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>
>
>


Re: Spark EMR executor-core vs Vcores

2018-02-26 Thread Vadim Semenov
All used cores aren't getting reported correctly in EMR, and YARN itself
has no control over it, so whatever you put in `spark.executor.cores` will
be used,
but in the ResourceManager you will only see 1 vcore used per nodemanager.

On Mon, Feb 26, 2018 at 5:20 AM, Selvam Raman  wrote:

> Hi,
>
> spark version - 2.0.0
> spark distribution - EMR 5.0.0
>
> Spark Cluster - one master, 5 slaves
>
> Master node - m3.xlarge - 8 vCore, 15 GiB memory, 80 SSD GB storage
> Slave node - m3.2xlarge - 16 vCore, 30 GiB memory, 160 SSD GB storage
>
>
> Cluster Metrics
> Apps SubmittedApps PendingApps RunningApps CompletedContainers RunningMemory
> UsedMemory TotalMemory ReservedVCores UsedVCores TotalVCores ReservedActive
> NodesDecommissioning NodesDecommissioned NodesLost NodesUnhealthy 
> NodesRebooted
> Nodes
> 16 0 1 15 5 88.88 GB 90.50 GB 22 GB 5 79 1 5
>  0
>  0
>  5
>  0
>  0
> 
> I have submitted job with below configuration
> --num-executors 5 --executor-cores 10 --executor-memory 20g
>
>
>
> spark.task.cpus - be default 1
>
>
> My understanding is there will be 5 executore each can run 10 task at a
> time and task can share total memory of 20g. Here, i could see only 5
> vcores used which means 1 executor instance use 20g+10%overhead ram(22gb),
> 10 core(number of threads), 1 Vcore(cpu).
>
> please correct me if my understand is wrong.
>
> how can i utilize number of vcore in EMR effectively. Will Vcore boost
> performance?
>
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>


Re: Spark EMR executor-core vs Vcores

2018-02-26 Thread Vadim Semenov
yeah, for some reason (unknown to me, but you can find on aws forums) they
double the actual number of cores for nodemanagers.

I assume that's done to maximize utilization, but doesn't really matter to
me, at least, since I only run Spark, so I, personally, set `total number
of cores - 1/2` saving one core for the OS/DataNode/NodeManager, because
Spark itself can create a significant load.

On Mon, Feb 26, 2018 at 4:51 PM, Selvam Raman <sel...@gmail.com> wrote:

> Thanks. That’s make sense.
>
> I want to know one more think , available vcore per machine is 16 but
> threads per node 8. Am I missing to relate here.
>
> What I m thinking now is number of vote = number of threads.
>
>
>
> On Mon, 26 Feb 2018 at 18:45, Vadim Semenov <va...@datadoghq.com> wrote:
>
>> All used cores aren't getting reported correctly in EMR, and YARN itself
>> has no control over it, so whatever you put in `spark.executor.cores` will
>> be used,
>> but in the ResourceManager you will only see 1 vcore used per nodemanager.
>>
>> On Mon, Feb 26, 2018 at 5:20 AM, Selvam Raman <sel...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> spark version - 2.0.0
>>> spark distribution - EMR 5.0.0
>>>
>>> Spark Cluster - one master, 5 slaves
>>>
>>> Master node - m3.xlarge - 8 vCore, 15 GiB memory, 80 SSD GB storage
>>> Slave node - m3.2xlarge - 16 vCore, 30 GiB memory, 160 SSD GB storage
>>>
>>>
>>> Cluster Metrics
>>> Apps SubmittedApps PendingApps RunningApps CompletedContainers RunningMemory
>>> UsedMemory TotalMemory ReservedVCores UsedVCores TotalVCores ReservedActive
>>> NodesDecommissioning NodesDecommissioned NodesLost NodesUnhealthy 
>>> NodesRebooted
>>> Nodes
>>> 16 0 1 15 5 88.88 GB 90.50 GB 22 GB 5 79 1 5
>>> <http://localhost:8088/cluster/nodes> 0
>>> <http://localhost:8088/cluster/nodes/decommissioning> 0
>>> <http://localhost:8088/cluster/nodes/decommissioned> 5
>>> <http://localhost:8088/cluster/nodes/lost> 0
>>> <http://localhost:8088/cluster/nodes/unhealthy> 0
>>> <http://localhost:8088/cluster/nodes/rebooted>
>>> I have submitted job with below configuration
>>> --num-executors 5 --executor-cores 10 --executor-memory 20g
>>>
>>>
>>>
>>> spark.task.cpus - be default 1
>>>
>>>
>>> My understanding is there will be 5 executore each can run 10 task at a
>>> time and task can share total memory of 20g. Here, i could see only 5
>>> vcores used which means 1 executor instance use 20g+10%overhead ram(22gb),
>>> 10 core(number of threads), 1 Vcore(cpu).
>>>
>>> please correct me if my understand is wrong.
>>>
>>> how can i utilize number of vcore in EMR effectively. Will Vcore boost
>>> performance?
>>>
>>>
>>> --
>>> Selvam Raman
>>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>>
>>
>> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>


Re: How to Create one DB connection per executor and close it after the job is done?

2018-07-30 Thread Vadim Semenov
object MyDatabseSingleton {
@transient
lazy val dbConn = DB.connect(…)

`transient` marks the variable to be excluded from serialization

and `lazy` would open connection only when it's needed and also makes
sure that the val is thread-safe

http://fdahms.com/2015/10/14/scala-and-the-transient-lazy-val-pattern/
http://code-o-matic.blogspot.com/2009/05/double-checked-locking-idiom-sweet-in.html
On Mon, Jul 30, 2018 at 1:32 PM kant kodali  wrote:
>
> Hi Patrick,
>
> This object must be serializable right? I wonder if I will access to this 
> object in my driver(since it is getting created on the executor side) so I 
> can close when I am done with my batch?
>
> Thanks!
>
> On Mon, Jul 30, 2018 at 7:37 AM, Patrick McGloin  
> wrote:
>>
>> You could use an object in Scala, of which only one instance will be created 
>> on each JVM / Executor. E.g.
>>
>> object MyDatabseSingleton {
>> var dbConn = ???
>> }
>>
>> On Sat, 28 Jul 2018, 08:34 kant kodali,  wrote:
>>>
>>> Hi All,
>>>
>>> I understand creating a connection forEachPartition but I am wondering can 
>>> I create one DB connection per executor and close it after the job is done? 
>>> any sample code would help. you can imagine I am running a simple batch 
>>> processing application.
>>>
>>> Thanks!
>
>


-- 
Sent from my iPhone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Broadcast variable size limit?

2018-08-05 Thread Vadim Semenov
That’s the max size of a byte array in Java, limited by the length which is
defined as integer, and in most JVMS arrays can’t hold more than
Int.MaxValue - 8 elements. Other way to overcome this is to create multiple
broadcast variables

On Sunday, August 5, 2018, klrmowse  wrote:

> i don't need more, per se... i just need to watch the size of the variable;
> then, if it's within the size limit, go ahead and broadcast it; if not,
> then
> i won't broadcast...
>
> so, that would be a yes then? (2 GB, or which is it exactly?)
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Sent from my iPhone


Re: Iterative rdd union + reduceByKey operations on small dataset leads to "No space left on device" error on account of lot of shuffle spill.

2018-07-27 Thread Vadim Semenov
`spark.worker.cleanup.enabled=true` doesn't work for YARN.
On Fri, Jul 27, 2018 at 8:52 AM dineshdharme  wrote:
>
> I am trying to do few (union + reduceByKey) operations on a hiearchical
> dataset in a iterative fashion in rdd. The first few loops run fine but on
> the subsequent loops, the operations ends up using the whole scratch space
> provided to it.
>
> I have set the spark scratch directory, i.e. SPARK_LOCAL_DIRS , to be one
> having 100 GB space.
> The heirarchical dataset, whose size is (< 400kB), remains constant
> throughout the iterations.
> I have tried the worker cleanup flag but it has no effect i.e.
> "spark.worker.cleanup.enabled=true"
>
>
>
> Error :
> Caused by: java.io.IOException: No space left on device
> at java.io.FileOutputStream.writeBytes(Native Method)
> at java.io.FileOutputStream.write(FileOutputStream.java:326)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> at java.io.DataOutputStream.writeLong(DataOutputStream.java:224)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply$mcVJ$sp(IndexShuffleBlockResolver.scala:151)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:246)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply$mcV$sp(IndexShuffleBlockResolver.scala:149)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:153)
> at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:109)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
>
> What I am trying to do (High Level):
>
> I have a dataset of 5 different csv ( Parent, Child1, Child2, Child21,
> Child22 ) which are related in a hierarchical fashion as shown below.
>
> Parent-> Child1 -> Child2  -> Child21
>
> Parent-> Child1 -> Child2  -> Child22
>
> Each element in the tree has 14 columns (elementid, parentelement_id, cat1,
> cat2, num1, num2,., num10)
>
> I am trying to aggregate the values of one column of Child21 into Child1
> (i.e. 2 levels up). I am doing the same for another column value of Child22
> into Child1. Then I am merging these aggregated values at the same Child1
> level.
>
> This is present in the code at location :
>
> spark.rddexample.dummyrdd.tree.child1.events.Function1
>
>
> Code which replicates the issue:
>
> 1] https://github.com/dineshdharme/SparkRddShuffleIssue
>
>
>
> Steps to reproduce the issue :
>
> 1] Clone the above repository.
>
> 2] Put the csvs in the "issue-data" folder in the above repository at a
> hadoop location "hdfs:///tree/dummy/data/"
>
> 3] Set the spark scratch directory (SPARK_LOCAL_DIRS) to a folder which has
> large space. (> 100 GB)
>
> 4] Run "sbt assembly"
>
> 5] Run the following command at the project location
>
> /path/to/spark-2.3.0-bin-hadoop2.7/bin/spark-submit \
> --class spark.rddexample.dummyrdd.FunctionExecutor \
> --master local[2] \
> --deploy-mode client \
> --executor-memory 2G \
> --driver-memory 2G \
> target/scala-2.11/rdd-shuffle-assembly-0.1.0.jar \
> 20 \
> hdfs:///tree/dummy/data/ \
> hdfs:///tree/dummy/results/
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>


-- 
Sent from my iPhone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Vadim Semenov
`coalesce` sets the number of partitions for the last stage, so you
have to use `repartition` instead which is going to introduce an extra
shuffle stage

On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers  wrote:
>
> one small correction: lots of files leads to pressure on the spark driver 
> program when reading this data in spark.
>
> On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers  wrote:
>>
>> hi,
>>
>> i am reading data from files into a dataframe, then doing a groupBy for a 
>> given column with a count, and finally i coalesce to a smaller number of 
>> partitions before writing out to disk. so roughly:
>>
>> spark.read.format(...).load(...).groupBy(column).count().coalesce(100).write.format(...).save(...)
>>
>> i have this setting: spark.sql.shuffle.partitions=2048
>>
>> i expect to see 2048 partitions in shuffle. what i am seeing instead is a 
>> shuffle with only 100 partitions. it's like the coalesce has taken over the 
>> partitioning of the groupBy.
>>
>> any idea why?
>>
>> i am doing coalesce because it is not helpful to write out 2048 files, lots 
>> of files leads to pressure down the line on executors reading this data (i 
>> am writing to just one partition of a larger dataset), and since i have less 
>> than 100 executors i expect it to be efficient. so sounds like a good idea, 
>> no?
>>
>> but i do need 2048 partitions in my shuffle due to the operation i am doing 
>> in the groupBy (in my real problem i am not just doing a count...).
>>
>> thanks!
>> koert
>>
>


-- 
Sent from my iPhone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: java.lang.IndexOutOfBoundsException: len is negative - when data size increases

2018-08-16 Thread Vadim Semenov
one of the spills becomes bigger than 2GiB and can't be loaded fully
(as arrays in Java can't have more than 2^32 values)

> 
> org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.loadNext(UnsafeSorterSpillReader.java:76)


You can try increasing the number of partitions, so spills would be
further smaller.

Also check if you have some skewness on the stage that precedes the
stage where it fails on

On Thu, Aug 16, 2018 at 11:25 AM Deepak Sharma  wrote:
>
> Hi All,
> I am running spark based ETL in spark 1.6  and facing this weird issue.
> The same code with same properties/configuration runs fine in other 
> environment E.g. PROD but never completes in CAT.
> The only change would be the size of data it is processing and that too be by 
> 1-2 GB.
> This is the stack trace:java.lang.IndexOutOfBoundsException: len is negative
> at org.spark-project.guava.io.ByteStreams.read(ByteStreams.java:895)
> at 
> org.spark-project.guava.io.ByteStreams.readFully(ByteStreams.java:733)
> at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.loadNext(UnsafeSorterSpillReader.java:76)
> at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator.loadNext(UnsafeExternalSorter.java:509)
> at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:136)
> at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:123)
> at 
> org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:84)
> at 
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoin.scala:272)
> at 
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextOuterJoinRows(SortMergeJoin.scala:233)
> at 
> org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceStream(SortMergeOuterJoin.scala:250)
> at 
> org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceNext(SortMergeOuterJoin.scala:283)
> at 
> org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
> at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:748)
>
> Did anyone faced this issue?
> If yes , what can i do to resolve this?
>
> Thanks
> Deepak



-- 
Sent from my iPhone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

2018-09-07 Thread Vadim Semenov
You have too many partitions, so when the driver is trying to gather
the status of all map outputs and send back to executors it chokes on
the size of the structure that needs to be GZipped, and since it's
bigger than 2GiB, it produces OOM.
On Fri, Sep 7, 2018 at 10:35 AM Harel Gliksman  wrote:
>
> Hi,
>
> We are running a Spark (2.3.1) job on an EMR cluster with 500 r3.2xlarge (60 
> GB, 8 vcores, 160 GB SSD ). Driver memory is set to 25GB.
>
> It processes ~40 TB of data using aggregateByKey in which we specify 
> numPartitions = 300,000.
> Map side tasks succeed, but reduce side tasks all fail.
>
> We notice the following driver error:
>
> 18/09/07 13:35:03 WARN Utils: Suppressing exception in finally: null
>
>  java.lang.OutOfMemoryError
>
> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
> at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
> at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
> at 
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
> at 
> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> at 
> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> at 
> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Exception in thread "map-output-dispatcher-0" java.lang.OutOfMemoryError
> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at 
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:787)
> at 
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
> at 
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
> at 
> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> at 
> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> at 
> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Suppressed: java.lang.OutOfMemoryError
> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> at 

Re: Dataframe joins - UnsupportedOperationException: Unimplemented type: IntegerType

2018-07-09 Thread Vadim Semenov
That usually happens when you have different types for a column in some
parquet files.
In this case, I think you have a column of `Long` type that got a file with
`Integer` type, I had to deal with similar problem once.
You would have to cast it yourself to Long.

On Mon, Jul 9, 2018 at 2:53 PM Nirav Patel  wrote:

> I am getting following error after performing joins between 2 dataframe.
> It happens on call to .show() method. I assume it's an issue with
> incompatible type but it's been really hard to identify which column of
> which dataframe have that incompatibility.
> Any pointers?
>
>
> 11:06:10.304 13700 [Executor task launch worker for task 16] WARN
>  o.a.s.s.e.datasources.FileScanRDD - Skipped the rest of the content in the
> corrupted file: path:
> maprfs:///user/hive/warehouse/analytics.db/myTable/BUSINESS_ID=123/part-0-b01dbc82-9bc3-43c5-89c6-4c9b2d407106.c000.snappy.parquet,
> range: 0-14248, partition values: [1085]
> java.lang.UnsupportedOperationException: Unimplemented type: IntegerType
> at
> org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBinaryBatch(VectorizedColumnReader.java:431)
> at
> org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:203)
> at
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:230)
> at
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137)
> at
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:154)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> [image: What's New with Xactly] 
>
> 
> 
>    
> 



-- 
Sent from my iPhone


Re: Dynamic allocation not releasing executors after unpersisting all cached data

2018-07-09 Thread Vadim Semenov
Try doing `unpersist(blocking=true)`
On Mon, Jul 9, 2018 at 2:59 PM Jeffrey Charles
 wrote:
>
> I'm persisting a dataframe in Zeppelin which has dynamic allocation enabled 
> to get a sense of how much memory the dataframe takes up. After I note the 
> size, I unpersist the dataframe. For some reason, Yarn is not releasing the 
> executors that were added to Zeppelin. If I don't run the persist and 
> unpersist steps, the executors that were added are removed about a minute 
> after the paragraphs complete. Looking at the storage tab in the Spark UI for 
> the Zeppelin job, I don't see anything cached.
>
> Is there any way to get Yarn to automatically remove executors after doing a 
> persist followed by an unpersist if there is no activity on the executor 
> within the configured dynamic allocation timeout (similar to how it works 
> without a persist/unpersist cycle) without having to set 
> spark.dynamicAllocation.cachedExecutorIdleTimeout? The main reason I'd like 
> to avoid setting that configuration is I do not want to the executors being 
> reclaimed if they do have cached data.



-- 
Sent from my iPhone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: org.apache.spark.SparkException: Task failed while writing rows

2018-02-28 Thread Vadim Semenov
Who's your spark provider? EMR, Azure, Databricks, etc.? Maybe contact
them, since they've probably applied some patches

Also have you checked `stdout` for some Segfaults? I vaguely remember
getting `Task failed while writing rows at` and seeing some segfaults that
caused that

On Wed, Feb 28, 2018 at 2:07 PM, unk1102  wrote:

> Hi thanks Vadim you are right I saw that line already 468 I dont see any
> code
> it is just comment yes I am sure I am using all spark-* jar which is built
> for spark 2.2.0 and Scala 2.11. I am also stuck unfortunately with these
> errors not sure how to solve them.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: org.apache.spark.SparkException: Task failed while writing rows

2018-02-28 Thread Vadim Semenov
Yeah, without actually seeing what's happening on that line, it'd be
difficult to say for sure.

You can check what patches HortonWorks applied, or/and ask them.

And yeah, seg fault is totally possible on any size of the data. But you
should've seen it in the `stdout` (assuming that the regular logs go to
`stderr`)

On Wed, Feb 28, 2018 at 2:53 PM, unk1102  wrote:

> Hi Vadim thanks I use HortonWorks package. I dont think there are any seg
> faults are dataframe I am trying to write is very small in size. Can it
> still create seg fault?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: org.apache.spark.SparkException: Task failed while writing rows

2018-02-28 Thread Vadim Semenov
There should be another exception trace (basically, the actual cause) after
this one, could you post it?

On Wed, Feb 28, 2018 at 1:39 PM, unk1102  wrote:

> Hi I am getting the following exception when I try to write DataFrame using
> the following code. Please guide. I am using Spark 2.2.0.
>
> df.write.format("parquet").mode(SaveMode.Append);
>
> org.apache.spark.SparkException: Task failed while writing rows at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$
> spark$sql$execution$datasources$FileFormatWriter$$
> executeTask(FileFormatWriter.scala:270)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$
> write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:189)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$
> write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:188)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at
> org.apache.spark.scheduler.Task.run(Task.scala:108) at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745) Caused by:
> java.lang.NullPointerException at
> org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:468)
> at
> org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:468)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at
> scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$
> SingleDirectoryWriteTask.execute(FileFormatWriter.scala:324)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$
> apache$spark$sql$execution$datasources$FileFormatWriter$$
> executeTask$3.apply(FileFormatWriter.scala:256)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$
> apache$spark$sql$execution$datasources$FileFormatWriter$$
> executeTask$3.apply(FileFormatWriter.scala:254)
> at
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCa
> llbacks(Utils.scala:1371)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$
> spark$sql$execution$datasources$FileFormatWriter$$
> executeTask(FileFormatWriter.scala:259)
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: org.apache.spark.SparkException: Task failed while writing rows

2018-02-28 Thread Vadim Semenov
I'm sorry, didn't see `Caused by:
java.lang.NullPointerException at
org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:468)`

Are you sure that you use 2.2.0?
I don't see any code on that line
https://github.com/apache/spark/blob/v2.2.0/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L468

Also pretty strange that it fails there

On Wed, Feb 28, 2018 at 1:55 PM, unk1102  wrote:

> Hi thanks for the reply I only see NPE and Task failed while writing rows
> all
> over places I dont see any other errors expect SparkException job aborted
> and followed by two exception I pasted earlier.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark & S3 - Introducing random values into key names

2018-03-08 Thread Vadim Semenov
You need to put randomness into the beginning of the key, if you put it
other than into the beginning, it's not guaranteed that you're going to
have good performance.

The way we achieved this is by writing to HDFS first, and then having a
custom DistCp implemented using Spark that copies parquet files using
random keys,
and then saves the list of resulting keys to S3, and when we want to use
those parquet files, we just need to load the listing file, and then take
keys from it and pass them into the loader.

You only need to do this when you have way too many files, if the number of
keys you operate is reasonably small (let's say, in thousands), you won't
get any benefits.

Also the S3 buckets have internal optimizations, and overtime it adjusts to
the workload, i.e. some additional underlying partitions are getting added,
some splits happen, etc.
If you want to have good performance from start, you would need to use
randomization, yes.
Or alternatively, you can contact AWS and tell them about the naming schema
that you're going to have (but it must be set in stone), and then they can
try to pre-optimize the bucket for you.

On Thu, Mar 8, 2018 at 11:42 AM, Subhash Sriram 
wrote:

> Hey Spark user community,
>
> I am writing Parquet files from Spark to S3 using S3a. I was reading this
> article about improving S3 bucket performance, specifically about how it
> can help to introduce randomness to your key names so that data is written
> to different partitions.
>
> https://aws.amazon.com/premiumsupport/knowledge-
> center/s3-bucket-performance-improve/
>
> Is there a straight forward way to accomplish this randomness in Spark via
> the DataSet API? The only thing that I could think of would be to actually
> split the large set into multiple sets (based on row boundaries), and then
> write each one with the random key name.
>
> Is there an easier way that I am missing?
>
> Thanks in advance!
> Subhash
>
>
>


Re: Writing a DataFrame is taking too long and huge space

2018-03-09 Thread Vadim Semenov
But overall, I think the original approach is not correct.
If you get a single file in 10s GB, the approach is probably must be
reworked.

I don't see why you can't just write multiple CSV files using Spark, and
then concatenate them without Spark

On Fri, Mar 9, 2018 at 10:02 AM, Vadim Semenov <va...@datadoghq.com> wrote:

> You can use `.checkpoint` for that
>
> `df.sort(…).coalesce(1).write...` — `coalesce` will make `sort` to have
> only one partition, so sorting will take a lot of time
>
> `df.sort(…).repartition(1).write...` — `repartition` will add an explicit
> stage, but sorting will be lost, since it's a repartition
>
> ```
> sc.setCheckpointDir("/tmp/test")
> val checkpointedDf = df.sort(…).checkpoint(eager=true) // will save all
> partitions
> checkpointedDf.coalesce(1).write.csv(…) // will load checkpointed
> partitions in one task, concatenate them, and will write them out as a
> single file
> ```
>
> On Fri, Mar 9, 2018 at 9:47 AM, Deepak Sharma <deepakmc...@gmail.com>
> wrote:
>
>> I would suggest repartioning it to reasonable partitions  may ne 500 and
>> save it to some intermediate working directory .
>> Finally read all the files from this working dir and then coalesce as 1
>> and save to final location.
>>
>> Thanks
>> Deepak
>>
>> On Fri, Mar 9, 2018, 20:12 Vadim Semenov <va...@datadoghq.com> wrote:
>>
>>> because `coalesce` gets propagated further up in the DAG in the last
>>> stage, so your last stage only has one task.
>>>
>>> You need to break your DAG so your expensive operations would be in a
>>> previous stage before the stage with `.coalesce(1)`
>>>
>>> On Fri, Mar 9, 2018 at 5:23 AM, Md. Rezaul Karim <
>>> rezaul.ka...@insight-centre.org> wrote:
>>>
>>>> Dear All,
>>>>
>>>> I have a tiny CSV file, which is around 250MB. There are only 30
>>>> columns in the DataFrame. Now I'm trying to save the pre-processed
>>>> DataFrame as an another CSV file on disk for later usage.
>>>>
>>>> However, I'm getting pissed off as writing the resultant DataFrame is
>>>> taking too long, which is about 4 to 5 hours. Nevertheless, the size of the
>>>> file written on the disk is about 58GB!
>>>>
>>>> Here's the sample code that I tried:
>>>>
>>>> # Using repartition()
>>>> myDF.repartition(1).write.format("com.databricks.spark.csv")
>>>> .save("data/file.csv")
>>>>
>>>> # Using coalesce()
>>>> myDF. coalesce(1).write.format("com.databricks.spark.csv").save("d
>>>> ata/file.csv")
>>>>
>>>>
>>>> Any better suggestion?
>>>>
>>>>
>>>>
>>>> 
>>>> Md. Rezaul Karim, BSc, MSc
>>>> Research Scientist, Fraunhofer FIT, Germany
>>>>
>>>> Ph.D. Researcher, Information Systems, RWTH Aachen University, Germany
>>>>
>>>> eMail: rezaul.ka...@fit.fraunhofer.de
>>>> <andrea.berna...@fit.fraunhofer.de>
>>>> Tel: +49 241 80-21527 <+49%20241%208021527>
>>>>
>>>
>>>
>>>
>>> --
>>> Sent from my iPhone
>>>
>>
>
>
> --
> Sent from my iPhone
>



-- 
Sent from my iPhone


Re: Writing a DataFrame is taking too long and huge space

2018-03-09 Thread Vadim Semenov
You can use `.checkpoint` for that

`df.sort(…).coalesce(1).write...` — `coalesce` will make `sort` to have
only one partition, so sorting will take a lot of time

`df.sort(…).repartition(1).write...` — `repartition` will add an explicit
stage, but sorting will be lost, since it's a repartition

```
sc.setCheckpointDir("/tmp/test")
val checkpointedDf = df.sort(…).checkpoint(eager=true) // will save all
partitions
checkpointedDf.coalesce(1).write.csv(…) // will load checkpointed
partitions in one task, concatenate them, and will write them out as a
single file
```

On Fri, Mar 9, 2018 at 9:47 AM, Deepak Sharma <deepakmc...@gmail.com> wrote:

> I would suggest repartioning it to reasonable partitions  may ne 500 and
> save it to some intermediate working directory .
> Finally read all the files from this working dir and then coalesce as 1
> and save to final location.
>
> Thanks
> Deepak
>
> On Fri, Mar 9, 2018, 20:12 Vadim Semenov <va...@datadoghq.com> wrote:
>
>> because `coalesce` gets propagated further up in the DAG in the last
>> stage, so your last stage only has one task.
>>
>> You need to break your DAG so your expensive operations would be in a
>> previous stage before the stage with `.coalesce(1)`
>>
>> On Fri, Mar 9, 2018 at 5:23 AM, Md. Rezaul Karim <
>> rezaul.ka...@insight-centre.org> wrote:
>>
>>> Dear All,
>>>
>>> I have a tiny CSV file, which is around 250MB. There are only 30 columns
>>> in the DataFrame. Now I'm trying to save the pre-processed DataFrame as an
>>> another CSV file on disk for later usage.
>>>
>>> However, I'm getting pissed off as writing the resultant DataFrame is
>>> taking too long, which is about 4 to 5 hours. Nevertheless, the size of the
>>> file written on the disk is about 58GB!
>>>
>>> Here's the sample code that I tried:
>>>
>>> # Using repartition()
>>> myDF.repartition(1).write.format("com.databricks.spark.
>>> csv").save("data/file.csv")
>>>
>>> # Using coalesce()
>>> myDF. coalesce(1).write.format("com.databricks.spark.csv").save("
>>> data/file.csv")
>>>
>>>
>>> Any better suggestion?
>>>
>>>
>>>
>>> 
>>> Md. Rezaul Karim, BSc, MSc
>>> Research Scientist, Fraunhofer FIT, Germany
>>>
>>> Ph.D. Researcher, Information Systems, RWTH Aachen University, Germany
>>>
>>> eMail: rezaul.ka...@fit.fraunhofer.de
>>> <andrea.berna...@fit.fraunhofer.de>
>>> Tel: +49 241 80-21527 <+49%20241%208021527>
>>>
>>
>>
>>
>> --
>> Sent from my iPhone
>>
>


-- 
Sent from my iPhone


Re: Writing a DataFrame is taking too long and huge space

2018-03-09 Thread Vadim Semenov
because `coalesce` gets propagated further up in the DAG in the last stage,
so your last stage only has one task.

You need to break your DAG so your expensive operations would be in a
previous stage before the stage with `.coalesce(1)`

On Fri, Mar 9, 2018 at 5:23 AM, Md. Rezaul Karim <
rezaul.ka...@insight-centre.org> wrote:

> Dear All,
>
> I have a tiny CSV file, which is around 250MB. There are only 30 columns
> in the DataFrame. Now I'm trying to save the pre-processed DataFrame as an
> another CSV file on disk for later usage.
>
> However, I'm getting pissed off as writing the resultant DataFrame is
> taking too long, which is about 4 to 5 hours. Nevertheless, the size of the
> file written on the disk is about 58GB!
>
> Here's the sample code that I tried:
>
> # Using repartition()
> myDF.repartition(1).write.format("com.databricks.spark.
> csv").save("data/file.csv")
>
> # Using coalesce()
> myDF. coalesce(1).write.format("com.databricks.spark.csv").save("
> data/file.csv")
>
>
> Any better suggestion?
>
>
>
> 
> Md. Rezaul Karim, BSc, MSc
> Research Scientist, Fraunhofer FIT, Germany
>
> Ph.D. Researcher, Information Systems, RWTH Aachen University, Germany
>
> eMail: rezaul.ka...@fit.fraunhofer.de 
> Tel: +49 241 80-21527 <+49%20241%208021527>
>



-- 
Sent from my iPhone


Re: OutOfDirectMemoryError for Spark 2.2

2018-03-06 Thread Vadim Semenov
Do you have a trace? i.e. what's the source of `io.netty.*` calls?

And have you tried bumping `-XX:MaxDirectMemorySize`?

On Tue, Mar 6, 2018 at 12:45 AM, Chawla,Sumit 
wrote:

> Hi All
>
> I have a job which processes a large dataset.  All items in the dataset
> are unrelated.  To save on cluster resources,  I process these items in
> chunks.  Since chunks are independent of each other,  I start and shut down
> the spark context for each chunk.  This allows me to keep DAG smaller and
> not retry the entire DAG in case of failures.   This mechanism used to work
> fine with Spark 1.6.  Now,  as we have moved to 2.2,  the job started
> failing with OutOfDirectMemoryError error.
>
> 2018-03-03 22:00:59,687 WARN  [rpc-server-48-1]
> server.TransportChannelHandler 
> (TransportChannelHandler.java:exceptionCaught(78))
> - Exception in connection from /10.66.73.27:60374
>
> io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 8388608
> byte(s) of direct memory (used: 1023410176, max: 1029177344)
>
> at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(
> PlatformDependent.java:506)
>
> at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(
> PlatformDependent.java:460)
>
> at io.netty.buffer.PoolArena$DirectArena.allocateDirect(
> PoolArena.java:701)
>
> at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:690)
>
> at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:237)
>
> at io.netty.buffer.PoolArena.allocate(PoolArena.java:213)
>
> at io.netty.buffer.PoolArena.allocate(PoolArena.java:141)
>
> at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(
> PooledByteBufAllocator.java:271)
>
> at io.netty.buffer.AbstractByteBufAllocator.directBuffer(
> AbstractByteBufAllocator.java:177)
>
> at io.netty.buffer.AbstractByteBufAllocator.directBuffer(
> AbstractByteBufAllocator.java:168)
>
> at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(
> AbstractByteBufAllocator.java:129)
>
> at io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(
> AdaptiveRecvByteBufAllocator.java:104)
>
> at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(
> AbstractNioByteChannel.java:117)
>
> at io.netty.channel.nio.NioEventLoop.processSelectedKey(
> NioEventLoop.java:564)
>
> I got some clue on what is causing this from https://github.com/netty/
> netty/issues/6343,  However I am not able to add up numbers on what is
> causing 1 GB of Direct Memory to fill up.
>
> Output from jmap
>
>
> 7: 22230 1422720 io.netty.buffer.PoolSubpage
>
> 12: 1370 804640 io.netty.buffer.PoolSubpage[]
>
> 41: 3600 144000 io.netty.buffer.PoolChunkList
>
> 98: 1440 46080 io.netty.buffer.PoolThreadCache$SubPageMemoryRegionCache
>
> 113: 300 40800 io.netty.buffer.PoolArena$HeapArena
>
> 114: 300 40800 io.netty.buffer.PoolArena$DirectArena
>
> 192: 198 15840 io.netty.buffer.PoolChunk
>
> 274: 120 8320 io.netty.buffer.PoolThreadCache$MemoryRegionCache[]
>
> 406: 120 3840 io.netty.buffer.PoolThreadCache$NormalMemoryRegionCache
>
> 422: 72 3552 io.netty.buffer.PoolArena[]
>
> 458: 30 2640 io.netty.buffer.PooledUnsafeDirectByteBuf
>
> 500: 36 2016 io.netty.buffer.PooledByteBufAllocator
>
> 529: 32 1792 io.netty.buffer.UnpooledUnsafeHeapByteBuf
>
> 589: 20 1440 io.netty.buffer.PoolThreadCache
>
> 630: 37 1184 io.netty.buffer.EmptyByteBuf
>
> 703: 36 864 io.netty.buffer.PooledByteBufAllocator$PoolThreadLocalCache
>
> 852: 22 528 io.netty.buffer.AdvancedLeakAwareByteBuf
>
> 889: 10 480 io.netty.buffer.SlicedAbstractByteBuf
>
> 917: 8 448 io.netty.buffer.UnpooledHeapByteBuf
>
> 1018: 20 320 io.netty.buffer.PoolThreadCache$1
>
> 1305: 4 128 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
>
> 1404: 1 80 io.netty.buffer.PooledUnsafeHeapByteBuf
>
> 1473: 3 72 io.netty.buffer.PoolArena$SizeClass
>
> 1529: 1 64 io.netty.buffer.AdvancedLeakAwareCompositeByteBuf
>
> 1541: 2 64 io.netty.buffer.CompositeByteBuf$Component
>
> 1568: 1 56 io.netty.buffer.CompositeByteBuf
>
> 1896: 1 32 io.netty.buffer.PoolArena$SizeClass[]
>
> 2042: 1 24 io.netty.buffer.PooledUnsafeDirectByteBuf$1
>
> 2046: 1 24 io.netty.buffer.UnpooledByteBufAllocator
>
> 2051: 1 24 io.netty.buffer.PoolThreadCache$MemoryRegionCache$1
>
> 2078: 1 24 io.netty.buffer.PooledHeapByteBuf$1
>
> 2135: 1 24 io.netty.buffer.PooledUnsafeHeapByteBuf$1
>
> 2302: 1 16 io.netty.buffer.ByteBufUtil$1
>
> 2769: 1 16 io.netty.util.internal.__matchers__.io.netty.buffer.
> ByteBufMatcher
>
>
>
> My Driver machine has 32 CPUs,  and as of now i have 15 machines in my
> cluster.   As of now, the error happens on processing 5th or 6th chunk.  I
> suspect the error is dependent on number of Executors and would happen
> early if we add more executors.
>
>
> I am trying to come up an explanation of what is filling up the Direct
> Memory and how to quanitfy it as factor of Number of Executors.  Our
> cluster is shared cluster,  And we need to understand how much Driver
> Memory to allocate for most of the jobs.
>
>
>
>
>
> 

Re: Spark Job Server application compilation issue

2018-03-14 Thread Vadim Semenov
This question should be directed to the `spark-jobserver` group:
https://github.com/spark-jobserver/spark-jobserver#contact

They also have a gitter chat.

Also include the errors you get once you're going to be asking them a
question

On Wed, Mar 14, 2018 at 1:37 PM, sujeet jog  wrote:

>
> Input is a json request, which would be decoded in myJob() & processed
> further.
>
> Not sure what is wrong with below code, it emits errors as unimplemented
> methods (runJob/validate),
> any pointers on this would be helpful,
>
> jobserver-0.8.0
>
> object MyJobServer extends SparkSessionJob {
>
>   type JobData = String
>   type JobOutput = Seq[String]
>
>   def myJob(a : String)  = {
> }
>
>   def runJob(sc: SparkContext, runtime: JobEnvironment, data: JobData):
> JobOutput = {
>myJob(a)
>}
>
>  def validate(sc: SparkContext, runtime: JobEnvironment, config: Config):
> JobData Or Every[ValidationProblem] = {
>Good(config.root().render())
>  }
>
>


-- 
Sent from my iPhone


Re: Can I get my custom spark strategy to run last?

2018-03-02 Thread Vadim Semenov
Something like this?

sparkSession.experimental.extraStrategies = Seq(Strategy)

val logicalPlan = df.logicalPlan
val newPlan: LogicalPlan = Strategy(logicalPlan)

Dataset.ofRows(sparkSession, newPlan)


On Thu, Mar 1, 2018 at 8:20 PM, Keith Chapman 
wrote:

> Hi,
>
> I'd like to write a custom Spark strategy that runs after all the existing
> Spark strategies are run. Looking through the Spark code it seems like the
> custom strategies are prepended to the list of strategies in Spark. Is
> there a way I could get it to run last?
>
> Regards,
> Keith.
>
> http://keith-chapman.com
>


Re: Does Spark have a plan to move away from sun.misc.Unsafe?

2018-10-25 Thread Vadim Semenov
Here you go:
the umbrella ticket:
https://issues.apache.org/jira/browse/SPARK-24417

and the sun.misc.unsafe one
https://issues.apache.org/jira/browse/SPARK-24421
On Wed, Oct 24, 2018 at 8:08 PM kant kodali  wrote:
>
> Hi All,
>
> Does Spark have a plan to move away from sun.misc.Unsafe to VarHandles? I am 
> trying to find a JIRA issue for this?
>
> Thanks!



-- 
Sent from my iPhone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



  1   2   >