Re: How can i remove the need for calling cache

2017-08-02 Thread jeff saremi
thanks Vadim. yes this is a good option for us. thanks


From: Vadim Semenov 
Sent: Wednesday, August 2, 2017 6:24:40 PM
To: Suzen, Mehmet
Cc: jeff saremi; user@spark.apache.org
Subject: Re: How can i remove the need for calling cache

So if you just save an RDD to HDFS via 'saveAsSequenceFile', you would have to 
create a new RDD that reads that data, this way you'll avoid recomputing the 
RDD but may lose time on saving/loading.

Exactly same thing happens in 'checkpoint', 'checkpoint' is just a convenient 
method that gives you the same RDD back, basically.

However, if your job fails, there's no way to run a new job using already 
'checkpointed' data from a previous failed run. That's where having a custom 
check pointer helps.

Another note: you can not delete "checkpoint"ed data in the same job, you need 
to delete it somehow else.

BTW, have you tried '.persist(StorageLevel.DISK_ONLY)'? It caches data to local 
disk, making more space in JVM and letting you to avoid hdfs.

On Wednesday, August 2, 2017, Vadim Semenov 
> wrote:
`saveAsObjectFile` doesn't save the DAG, it acts as a typical action, so it 
just saves data to some destination.

`cache/persist` allow you to cache data and keep the DAG in case of some 
executor that holds data goes down, so Spark would still be able to recalculate 
missing partitions

`localCheckpoint` allows you to sacrifice fault-tolerance and truncate the DAG, 
so if some executor goes down, the job will fail, because it has already 
forgotten the DAG. 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1551-L1610

and `checkpoint` allows you to save data to some shared storage and truncate 
the DAG, so if an executor goes down, the job will be able to take missing 
partitions from the place where it saved the RDD
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1533-L1549

On Wed, Aug 2, 2017 at 7:20 PM, Suzen, Mehmet 
> wrote:
On 3 August 2017 at 01:05, jeff saremi 
>
 wrote:
> Vadim:
>
> This is from the Mastering Spark book:
>
> "It is strongly recommended that a checkpointed RDD is persisted in memory,
> otherwise saving it on a file will require recomputation."

Is this really true? I had the impression that DAG will not be carried
out once RDD is serialized to an external file, so 'saveAsObjectFile'
saves DAG as well?



Re: How can i remove the need for calling cache

2017-08-02 Thread Vadim Semenov
So if you just save an RDD to HDFS via 'saveAsSequenceFile', you would have
to create a new RDD that reads that data, this way you'll avoid recomputing
the RDD but may lose time on saving/loading.

Exactly same thing happens in 'checkpoint', 'checkpoint' is just a
convenient method that gives you the same RDD back, basically.

However, if your job fails, there's no way to run a new job using already
'checkpointed' data from a previous failed run. That's where having a
custom check pointer helps.

Another note: you can not delete "checkpoint"ed data in the same job, you
need to delete it somehow else.

BTW, have you tried '.persist(StorageLevel.DISK_ONLY)'? It caches data to
local disk, making more space in JVM and letting you to avoid hdfs.

On Wednesday, August 2, 2017, Vadim Semenov 
wrote:

> `saveAsObjectFile` doesn't save the DAG, it acts as a typical action, so
> it just saves data to some destination.
>
> `cache/persist` allow you to cache data and keep the DAG in case of some
> executor that holds data goes down, so Spark would still be able to
> recalculate missing partitions
>
> `localCheckpoint` allows you to sacrifice fault-tolerance and truncate the
> DAG, so if some executor goes down, the job will fail, because it has
> already forgotten the DAG. https://github.com/apache/
> spark/blob/master/core/src/main/scala/org/apache/spark/
> rdd/RDD.scala#L1551-L1610
>
> and `checkpoint` allows you to save data to some shared storage and
> truncate the DAG, so if an executor goes down, the job will be able to take
> missing partitions from the place where it saved the RDD
> https://github.com/apache/spark/blob/master/core/src/
> main/scala/org/apache/spark/rdd/RDD.scala#L1533-L1549
>
> On Wed, Aug 2, 2017 at 7:20 PM, Suzen, Mehmet  > wrote:
>
>> On 3 August 2017 at 01:05, jeff saremi > > wrote:
>> > Vadim:
>> >
>> > This is from the Mastering Spark book:
>> >
>> > "It is strongly recommended that a checkpointed RDD is persisted in
>> memory,
>> > otherwise saving it on a file will require recomputation."
>>
>> Is this really true? I had the impression that DAG will not be carried
>> out once RDD is serialized to an external file, so 'saveAsObjectFile'
>> saves DAG as well?
>>
>
>


Re: How can i remove the need for calling cache

2017-08-02 Thread Suzen, Mehmet
On 3 August 2017 at 03:00, Vadim Semenov  wrote:
> `saveAsObjectFile` doesn't save the DAG, it acts as a typical action, so it
> just saves data to some destination.

Yes, that's what I thought, so the statement "..otherwise saving it on
a file will require recomputation."  from the book is not entirely
true.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How can i remove the need for calling cache

2017-08-02 Thread Vadim Semenov
`saveAsObjectFile` doesn't save the DAG, it acts as a typical action, so it
just saves data to some destination.

`cache/persist` allow you to cache data and keep the DAG in case of some
executor that holds data goes down, so Spark would still be able to
recalculate missing partitions

`localCheckpoint` allows you to sacrifice fault-tolerance and truncate the
DAG, so if some executor goes down, the job will fail, because it has
already forgotten the DAG.
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1551-L1610

and `checkpoint` allows you to save data to some shared storage and
truncate the DAG, so if an executor goes down, the job will be able to take
missing partitions from the place where it saved the RDD
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1533-L1549

On Wed, Aug 2, 2017 at 7:20 PM, Suzen, Mehmet  wrote:

> On 3 August 2017 at 01:05, jeff saremi  wrote:
> > Vadim:
> >
> > This is from the Mastering Spark book:
> >
> > "It is strongly recommended that a checkpointed RDD is persisted in
> memory,
> > otherwise saving it on a file will require recomputation."
>
> Is this really true? I had the impression that DAG will not be carried
> out once RDD is serialized to an external file, so 'saveAsObjectFile'
> saves DAG as well?
>


Re: How can i remove the need for calling cache

2017-08-02 Thread Vadim Semenov
Also check the `RDD.checkpoint()` method

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1533-L1550

On Wed, Aug 2, 2017 at 8:46 PM, Vadim Semenov 
wrote:

> I'm not sure that "checkpointed" means the same thing in that sentence.
>
> You can run a simple test using `spark-shell`:
>
> sc.setCheckpointDir("/tmp/checkpoint")
> val rdd = sc.parallelize(1 to 10).map(x => {
>   Thread.sleep(1000)
>   x
> })
> rdd.checkpoint()
> rdd.foreach(println) // Will take 10 seconds
> rdd.foreach(println) // Will be instant, because the RDD is checkpointed
>
> On Wed, Aug 2, 2017 at 7:05 PM, jeff saremi 
> wrote:
>
>> Vadim:
>>
>> This is from the Mastering Spark book:
>>
>> *"It is strongly recommended that a checkpointed RDD is persisted in
>> memory, otherwise saving it on a file will require recomputation."*
>>
>>
>> To me that means checkpoint will not prevent the recomputation that i was
>> hoping for
>> --
>> *From:* Vadim Semenov 
>> *Sent:* Tuesday, August 1, 2017 12:05:17 PM
>> *To:* jeff saremi
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: How can i remove the need for calling cache
>>
>> You can use `.checkpoint()`:
>> ```
>> val sc: SparkContext
>> sc.setCheckpointDir("hdfs:///tmp/checkpointDirectory")
>> myrdd.checkpoint()
>> val result1 = myrdd.map(op1(_))
>> result1.count() // Will save `myrdd` to HDFS and do map(op1…
>> val result2 = myrdd.map(op2(_))
>> result2.count() // Will load `myrdd` from HDFS and do map(op2…
>> ```
>>
>> On Tue, Aug 1, 2017 at 2:05 PM, jeff saremi 
>> wrote:
>>
>>> Calling cache/persist fails all our jobs (i have  posted 2 threads on
>>> this).
>>>
>>> And we're giving up hope in finding a solution.
>>> So I'd like to find a workaround for that:
>>>
>>> If I save an RDD to hdfs and read it back, can I use it in more than one
>>> operation?
>>>
>>> Example: (using cache)
>>> // do a whole bunch of transformations on an RDD
>>>
>>> myrdd.cache()
>>>
>>> val result1 = myrdd.map(op1(_))
>>>
>>> val result2 = myrdd.map(op2(_))
>>>
>>> // in the above I am assuming that a call to cache will prevent all
>>> previous transformation from being calculated twice
>>>
>>> I'd like to somehow get result1 and result2 without duplicating work.
>>> How can I do that?
>>>
>>> thanks
>>>
>>> Jeff
>>>
>>
>>
>


Re: How can i remove the need for calling cache

2017-08-02 Thread Vadim Semenov
I'm not sure that "checkpointed" means the same thing in that sentence.

You can run a simple test using `spark-shell`:

sc.setCheckpointDir("/tmp/checkpoint")
val rdd = sc.parallelize(1 to 10).map(x => {
  Thread.sleep(1000)
  x
})
rdd.checkpoint()
rdd.foreach(println) // Will take 10 seconds
rdd.foreach(println) // Will be instant, because the RDD is checkpointed

On Wed, Aug 2, 2017 at 7:05 PM, jeff saremi  wrote:

> Vadim:
>
> This is from the Mastering Spark book:
>
> *"It is strongly recommended that a checkpointed RDD is persisted in
> memory, otherwise saving it on a file will require recomputation."*
>
>
> To me that means checkpoint will not prevent the recomputation that i was
> hoping for
> --
> *From:* Vadim Semenov 
> *Sent:* Tuesday, August 1, 2017 12:05:17 PM
> *To:* jeff saremi
> *Cc:* user@spark.apache.org
> *Subject:* Re: How can i remove the need for calling cache
>
> You can use `.checkpoint()`:
> ```
> val sc: SparkContext
> sc.setCheckpointDir("hdfs:///tmp/checkpointDirectory")
> myrdd.checkpoint()
> val result1 = myrdd.map(op1(_))
> result1.count() // Will save `myrdd` to HDFS and do map(op1…
> val result2 = myrdd.map(op2(_))
> result2.count() // Will load `myrdd` from HDFS and do map(op2…
> ```
>
> On Tue, Aug 1, 2017 at 2:05 PM, jeff saremi 
> wrote:
>
>> Calling cache/persist fails all our jobs (i have  posted 2 threads on
>> this).
>>
>> And we're giving up hope in finding a solution.
>> So I'd like to find a workaround for that:
>>
>> If I save an RDD to hdfs and read it back, can I use it in more than one
>> operation?
>>
>> Example: (using cache)
>> // do a whole bunch of transformations on an RDD
>>
>> myrdd.cache()
>>
>> val result1 = myrdd.map(op1(_))
>>
>> val result2 = myrdd.map(op2(_))
>>
>> // in the above I am assuming that a call to cache will prevent all
>> previous transformation from being calculated twice
>>
>> I'd like to somehow get result1 and result2 without duplicating work. How
>> can I do that?
>>
>> thanks
>>
>> Jeff
>>
>
>


Re: How can i remove the need for calling cache

2017-08-02 Thread Suzen, Mehmet
On 3 August 2017 at 01:05, jeff saremi  wrote:
> Vadim:
>
> This is from the Mastering Spark book:
>
> "It is strongly recommended that a checkpointed RDD is persisted in memory,
> otherwise saving it on a file will require recomputation."

Is this really true? I had the impression that DAG will not be carried
out once RDD is serialized to an external file, so 'saveAsObjectFile'
saves DAG as well?

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How can i remove the need for calling cache

2017-08-02 Thread jeff saremi
Vadim:

This is from the Mastering Spark book:

"It is strongly recommended that a checkpointed RDD is persisted in memory, 
otherwise saving it on a file will require recomputation."


To me that means checkpoint will not prevent the recomputation that i was 
hoping for


From: Vadim Semenov 
Sent: Tuesday, August 1, 2017 12:05:17 PM
To: jeff saremi
Cc: user@spark.apache.org
Subject: Re: How can i remove the need for calling cache

You can use `.checkpoint()`:
```
val sc: SparkContext
sc.setCheckpointDir("hdfs:///tmp/checkpointDirectory")
myrdd.checkpoint()
val result1 = myrdd.map(op1(_))
result1.count() // Will save `myrdd` to HDFS and do map(op1…
val result2 = myrdd.map(op2(_))
result2.count() // Will load `myrdd` from HDFS and do map(op2…
```

On Tue, Aug 1, 2017 at 2:05 PM, jeff saremi 
> wrote:

Calling cache/persist fails all our jobs (i have  posted 2 threads on this).

And we're giving up hope in finding a solution.
So I'd like to find a workaround for that:

If I save an RDD to hdfs and read it back, can I use it in more than one 
operation?

Example: (using cache)
// do a whole bunch of transformations on an RDD

myrdd.cache()

val result1 = myrdd.map(op1(_))

val result2 = myrdd.map(op2(_))

// in the above I am assuming that a call to cache will prevent all previous 
transformation from being calculated twice


I'd like to somehow get result1 and result2 without duplicating work. How can I 
do that?

thanks

Jeff



Re: Reparitioning Hive tables - Container killed by YARN for exceeding memory limits

2017-08-02 Thread Holden Karau
The memory overhead is based less on the total amount of data and more on
what you end up doing with the data (e.g. if your doing a lot of off-heap
processing or using Python you need to increase it). Honestly most people
find this number for their job "experimentally" (e.g. they try a few
different things).

On Wed, Aug 2, 2017 at 1:52 PM, Chetan Khatri 
wrote:

> Ryan,
> Thank you for reply.
>
> For 2 TB of Data what should be the value of 
> spark.yarn.executor.memoryOverhead
> = ?
>
> with regards to this - i see issue at spark https://issues.apache.org/
> jira/browse/SPARK-18787 , not sure whether it works or not at Spark 2.0.1
>  !
>
> can you elaborate more for spark.memory.fraction setting.
>
> number of partitions = 674
> Cluster: 455 GB total memory, VCores: 288, Nodes: 17
> Given / tried memory config: executor-mem = 16g, num-executor=10, executor
> cores=6, driver mem=4g
>
> spark.default.parallelism=1000
> spark.sql.shuffle.partitions=1000
> spark.yarn.executor.memoryOverhead=2048
> spark.shuffle.io.preferDirectBufs=false
>
>
>
>
>
>
>
>
>
> On Wed, Aug 2, 2017 at 10:43 PM, Ryan Blue  wrote:
>
>> Chetan,
>>
>> When you're writing to a partitioned table, you want to use a shuffle to
>> avoid the situation where each task has to write to every partition. You
>> can do that either by adding a repartition by your table's partition keys,
>> or by adding an order by with the partition keys and then columns you
>> normally use to filter when reading the table. I generally recommend the
>> second approach because it handles skew and prepares the data for more
>> efficient reads.
>>
>> If that doesn't help, then you should look at your memory settings. When
>> you're getting killed by YARN, you should consider setting `
>> spark.shuffle.io.preferDirectBufs=false` so you use less off-heap memory
>> that the JVM doesn't account for. That is usually an easier fix than
>> increasing the memory overhead. Also, when you set executor memory, always
>> change spark.memory.fraction to ensure the memory you're adding is used
>> where it is needed. If your memory fraction is the default 60%, then 60% of
>> the memory will be used for Spark execution, not reserved whatever is
>> consuming it and causing the OOM. (If Spark's memory is too low, you'll see
>> other problems like spilling too much to disk.)
>>
>> rb
>>
>> On Wed, Aug 2, 2017 at 9:02 AM, Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Can anyone please guide me with above issue.
>>>
>>>
>>> On Wed, Aug 2, 2017 at 6:28 PM, Chetan Khatri <
>>> chetan.opensou...@gmail.com> wrote:
>>>
 Hello Spark Users,

 I have Hbase table reading and writing to Hive managed table where i
 applied partitioning by date column which worked fine but it has generate
 more number of files in almost 700 partitions but i wanted to use
 reparation to reduce File I/O by reducing number of files inside each
 partition.

 *But i ended up with below exception:*

 ExecutorLostFailure (executor 11 exited caused by one of the running
 tasks) Reason: Container killed by YARN for exceeding memory limits. 14.0
 GB of 14 GB physical memory used. Consider boosting spark.yarn.executor.
 memoryOverhead.

 Driver memory=4g, executor mem=12g, num-executors=8, executor core=8

 Do you think below setting can help me to overcome above issue:

 spark.default.parellism=1000
 spark.sql.shuffle.partitions=1000

 Because default max number of partitions are 1000.



>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>
>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: Reparitioning Hive tables - Container killed by YARN for exceeding memory limits

2017-08-02 Thread Chetan Khatri
Ryan,
Thank you for reply.

For 2 TB of Data what should be the value of
spark.yarn.executor.memoryOverhead = ?

with regards to this - i see issue at spark
https://issues.apache.org/jira/browse/SPARK-18787 , not sure whether it
works or not at Spark 2.0.1  !

can you elaborate more for spark.memory.fraction setting.

number of partitions = 674
Cluster: 455 GB total memory, VCores: 288, Nodes: 17
Given / tried memory config: executor-mem = 16g, num-executor=10, executor
cores=6, driver mem=4g

spark.default.parallelism=1000
spark.sql.shuffle.partitions=1000
spark.yarn.executor.memoryOverhead=2048
spark.shuffle.io.preferDirectBufs=false









On Wed, Aug 2, 2017 at 10:43 PM, Ryan Blue  wrote:

> Chetan,
>
> When you're writing to a partitioned table, you want to use a shuffle to
> avoid the situation where each task has to write to every partition. You
> can do that either by adding a repartition by your table's partition keys,
> or by adding an order by with the partition keys and then columns you
> normally use to filter when reading the table. I generally recommend the
> second approach because it handles skew and prepares the data for more
> efficient reads.
>
> If that doesn't help, then you should look at your memory settings. When
> you're getting killed by YARN, you should consider setting `
> spark.shuffle.io.preferDirectBufs=false` so you use less off-heap memory
> that the JVM doesn't account for. That is usually an easier fix than
> increasing the memory overhead. Also, when you set executor memory, always
> change spark.memory.fraction to ensure the memory you're adding is used
> where it is needed. If your memory fraction is the default 60%, then 60% of
> the memory will be used for Spark execution, not reserved whatever is
> consuming it and causing the OOM. (If Spark's memory is too low, you'll see
> other problems like spilling too much to disk.)
>
> rb
>
> On Wed, Aug 2, 2017 at 9:02 AM, Chetan Khatri  > wrote:
>
>> Can anyone please guide me with above issue.
>>
>>
>> On Wed, Aug 2, 2017 at 6:28 PM, Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Hello Spark Users,
>>>
>>> I have Hbase table reading and writing to Hive managed table where i
>>> applied partitioning by date column which worked fine but it has generate
>>> more number of files in almost 700 partitions but i wanted to use
>>> reparation to reduce File I/O by reducing number of files inside each
>>> partition.
>>>
>>> *But i ended up with below exception:*
>>>
>>> ExecutorLostFailure (executor 11 exited caused by one of the running
>>> tasks) Reason: Container killed by YARN for exceeding memory limits. 14.0
>>> GB of 14 GB physical memory used. Consider boosting spark.yarn.executor.
>>> memoryOverhead.
>>>
>>> Driver memory=4g, executor mem=12g, num-executors=8, executor core=8
>>>
>>> Do you think below setting can help me to overcome above issue:
>>>
>>> spark.default.parellism=1000
>>> spark.sql.shuffle.partitions=1000
>>>
>>> Because default max number of partitions are 1000.
>>>
>>>
>>>
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Spark Streaming: Async action scheduling inside foreachRDD

2017-08-02 Thread Andrii Biletskyi
Hi all,

What is the correct way to schedule multiple jobs inside foreachRDD method
and importantly await on result to ensure those jobs have completed
successfully?
E.g.:

kafkaDStream.foreachRDD{ rdd =>
val rdd1 = rdd.map(...)
val rdd2 = rdd1.map(...)

val job1Future = Future{
rdd1.saveToCassandra(...)
}

val job2Future = Future{
rdd1.foreachPartition( iter => /* save to Kafka */)
}

  Await.result(
  Future.sequence(job1Future, job2Future),
  Duration.Inf)


   // commit Kafka offsets
}

In this code I'm scheduling two actions in futures and awaiting them. I
need to be sure when I commit Kafka offsets at the end of the batch
processing that job1 and job2 have actually executed successfully. Does
given approach provide these guarantees? I.e. in case one of the jobs fails
the entire batch will be marked as failed too?


Thanks,
Andrii


Re: SPARK Issue in Standalone cluster

2017-08-02 Thread Gourav Sengupta
Hi Steve,

I have written a sincere note of apology to everyone in a separate email. I
sincerely request your kind forgiveness before hand if anything does sound
impolite in my emails, in advance.

Let me first start by thanking you.

I know it looks like I formed all my opinion based on that document, but
that is not the case at all. If you or anyone tries to execute the code
that I have given then they will see what I mean. Code speaks louder and
better than words for me.

So I am not saying you are wrong. I am asking verify and expecting someone
will be able to correct  a set of understanding that a moron like me has
gained after long hours of not having anything better to do.


SCENARIO: there are two files file1.csv and file2.csv stored in HDFS with
replication 2 and there is a HADOOP cluster of three nodes. All these nodes
have SPARK workers (executors) running in them.  Both are stored in the
following way:
-
| SYSTEM 1 |  SYSTEM 2 | SYSTEM 3 |
| (worker1)   |  (worker2)|  (worker3)   |
| (master) | ||
-
| file1.csv  | | file1.csv |
-
||  file2.csv  | file2.csv |
-
| file3.csv  |  file3.csv  |   |
-

CONSIDERATION BASED ON WHICH ABOVE SCENARIO HAS BEEN DRAWN:
HDFS replication does not store the same file in all the nodes in the
cluster. So if I have three nodes and the replication is two then the same
file will be stored physically in two nodes in the cluster. Does that sound
right?

ASSUMPTION  (STEVE PLEASE CLARIFY THIS):
If SPARK is trying to process to the records then I am expecting that
WORKER2 should not be processing file1.csv, and similary WORKER 1 should
not be processing file2.csv and WORKER3 should not be processing file3.csv.
Because in case WORKER2 was trying to process file1.csv then it will
actually causing network transmission of the file unnecessarily.

ASSUMPTION BASED ON ABOVE ASSUMPTION (STEVE ONCE AGAIN, PLEASE CLARIFY
THIS):
if WORKER 2 is not processing file1.csv then how does it matter whether the
file is there or not at all in the system? Should not SPARK just ask the
workers to process the files which are avialable in the worker nodes? In
case both WORKER2 and WORKER3 fails and are not available then file2.csv
will not be processed at all.

ALSO I DID POST THE CODE AND I GENUINELY THINK THAT THE CODE SHOULD BE
EXECUTED (Its been pointed out that I am learning SPARK, and even I did not
take more than 13 mins to set up the cluster and run the code).

Once you execute the code then you will find that:
1.  if the path starts with file:/// while reading back then there is no
error reported, but the number of records reported back are only those
records in the worker which also has the server.
2. also you will notice that once you cache the file before writing the
partitions are ditributed nicely across the workers, and while writing
back, the dataframe partitions does write properly to the worker node in
the Master, but the workers in the other system have the files written in
_temporary folder which does not get copied back to the main folder.
Inspite of this the job is not reported as failed in SPARK.

Now in my own world, if I see, the following things are happening,
something is going wrong (with me):
1. SPARK transfers files from different systems to process, instead of
processing them locally (I do not have code to prove this, and therefore
its just an assumption)
2. SPARK cannot determine when the writes are failing in standalone
clusters workers and reports success (code is there for this)
3. SPARK reports back number of records in the worker running in the master
node when count() is given without reporting an error while using file:///
and reports an error when I mention the path without file:/// (for SPARK
2.1.x onwards, code is there for this)


I very sincerely hope with your genuine help the bar of language and social
skills will be lowered for me. And everyone will find a way to excuse me
and not qualify this email as a means to measure my extremely versatile and
amazingly vivid social skills. It will be a lot of help to just focus on
the facts related to machines, data, error and (the language that I somehow
understand better) code.


My sincere apologies once again, as I am 100% sure that I did not meet the
required social and language skills.

Thanks a ton once again for your kindness, patience and understanding.


Regards,
Gourav Sengupta



On Wed, Aug 2, 2017 at 4:59 PM, Steve Loughran 
wrote:

>
> On 2 Aug 2017, at 14:25, Gourav Sengupta 
> wrote:
>
> Hi,
>
> I am definitely sure that at this point of time everyone who has 

Projection Pushdown and Predicate Pushdown in Parquet for Nested Column

2017-08-02 Thread Patrick
Hi,

I would like to know that  if  Spark has support for Projection Pushdown
and Predicate Pushdown in Parquet for nested column.?

I can see two JIRA tasks with PR.

https://issues.apache.org/jira/browse/SPARK-17636

https://issues.apache.org/jira/browse/SPARK-4502


If not, are we seeing these feature in Spark 2.3 release?

Thanks
.


Re: Reparitioning Hive tables - Container killed by YARN for exceeding memory limits

2017-08-02 Thread Ryan Blue
Chetan,

When you're writing to a partitioned table, you want to use a shuffle to
avoid the situation where each task has to write to every partition. You
can do that either by adding a repartition by your table's partition keys,
or by adding an order by with the partition keys and then columns you
normally use to filter when reading the table. I generally recommend the
second approach because it handles skew and prepares the data for more
efficient reads.

If that doesn't help, then you should look at your memory settings. When
you're getting killed by YARN, you should consider setting
`spark.shuffle.io.preferDirectBufs=false` so you use less off-heap memory
that the JVM doesn't account for. That is usually an easier fix than
increasing the memory overhead. Also, when you set executor memory, always
change spark.memory.fraction to ensure the memory you're adding is used
where it is needed. If your memory fraction is the default 60%, then 60% of
the memory will be used for Spark execution, not reserved whatever is
consuming it and causing the OOM. (If Spark's memory is too low, you'll see
other problems like spilling too much to disk.)

rb

On Wed, Aug 2, 2017 at 9:02 AM, Chetan Khatri 
wrote:

> Can anyone please guide me with above issue.
>
>
> On Wed, Aug 2, 2017 at 6:28 PM, Chetan Khatri  > wrote:
>
>> Hello Spark Users,
>>
>> I have Hbase table reading and writing to Hive managed table where i
>> applied partitioning by date column which worked fine but it has generate
>> more number of files in almost 700 partitions but i wanted to use
>> reparation to reduce File I/O by reducing number of files inside each
>> partition.
>>
>> *But i ended up with below exception:*
>>
>> ExecutorLostFailure (executor 11 exited caused by one of the running
>> tasks) Reason: Container killed by YARN for exceeding memory limits. 14.0
>> GB of 14 GB physical memory used. Consider boosting spark.yarn.executor.
>> memoryOverhead.
>>
>> Driver memory=4g, executor mem=12g, num-executors=8, executor core=8
>>
>> Do you think below setting can help me to overcome above issue:
>>
>> spark.default.parellism=1000
>> spark.sql.shuffle.partitions=1000
>>
>> Because default max number of partitions are 1000.
>>
>>
>>
>


-- 
Ryan Blue
Software Engineer
Netflix


[Debug] Upgrade from 1.4.1 to 2.0.0 breaks job

2017-08-02 Thread Druhin Goel
Hi,

I just upgraded our spark version from 1.4.1 to 2.0.0. Upon running a job
(which worked perfectly with 1.4.1) to test it, I see that it fails with an
assertion error:

"assertion failed: copyAndReset must return a zero value copy”. I saw a
thread related to this on StackOverflow, however, I could not find any code
or dependency in our codebase that implements the old Accumulator. We do
not use SparkRunner either.

What are some other reasons I would be seeing this error?

Thanks,
Druhin


UNSUBSCRIBE

2017-08-02 Thread Manikandan Vijayakumar
Please Unsubscribe me.



Re: UNSUBSCRIBE

2017-08-02 Thread Andi Levin
Writing to the list user@spark.apache.org
Subscription address user-subscr...@spark.apache.org
Digest subscription address user-digest-subscr...@spark.apache.org
Unsubscription addresses user-unsubscr...@spark.apache.org
Getting help with the list user-h...@spark.apache.org
Feeds: Atom 1.0


On Wed, Aug 2, 2017 at 9:36 AM, DAS, SUTANU  wrote:

>
>
>
>
>
>
> Please Unsubscribe me.
>
>
>
>
>



-- 
Cheers,


/andi

Andi Levin
*(415) 462-4490 <%28415%29%20462-4490>*


UNSUBSCRIBE

2017-08-02 Thread DAS, SUTANU



Please Unsubscribe me.





UNSUBSCRIBE

2017-08-02 Thread DAS, SUTANU

Please Unsubscribe me.




Re: Reparitioning Hive tables - Container killed by YARN for exceeding memory limits

2017-08-02 Thread Ravindra
either increase overall executor memory if you have scope. or try to give
more % to overhead memory from default of .7.

Read this

for more details.


On Wed, Aug 2, 2017 at 11:03 PM Chetan Khatri 
wrote:

> Can anyone please guide me with above issue.
>
>
> On Wed, Aug 2, 2017 at 6:28 PM, Chetan Khatri  > wrote:
>
>> Hello Spark Users,
>>
>> I have Hbase table reading and writing to Hive managed table where i
>> applied partitioning by date column which worked fine but it has generate
>> more number of files in almost 700 partitions but i wanted to use
>> reparation to reduce File I/O by reducing number of files inside each
>> partition.
>>
>> *But i ended up with below exception:*
>>
>> ExecutorLostFailure (executor 11 exited caused by one of the running
>> tasks) Reason: Container killed by YARN for exceeding memory limits. 14.0
>> GB of 14 GB physical memory used. Consider boosting spark.yarn.executor.
>> memoryOverhead.
>>
>> Driver memory=4g, executor mem=12g, num-executors=8, executor core=8
>>
>> Do you think below setting can help me to overcome above issue:
>>
>> spark.default.parellism=1000
>> spark.sql.shuffle.partitions=1000
>>
>> Because default max number of partitions are 1000.
>>
>>
>>
>


Re: SPARK Issue in Standalone cluster

2017-08-02 Thread Frank Austin Nothaft
> The general idea of writing to the user group is that people who know should 
> answer, and not those who do not know. 

I’d also add that if you’re going to write to the user group, you should be 
polite to people who try to answer your queries, even if you think they’re 
wrong.

This is especially true if the people you think are wrong are actually correct.

Frank Austin Nothaft
fnoth...@berkeley.edu
fnoth...@eecs.berkeley.edu
202-340-0466

> On Aug 2, 2017, at 6:25 AM, Gourav Sengupta  wrote:
> 
> Hi,
> 
> I am definitely sure that at this point of time everyone who has kindly cared 
> to respond to my query do need to go and check this link 
> https://spark.apache.org/docs/2.2.0/spark-standalone.html#spark-standalone-mode
>  
> .
>  
> 
> It does mention that SPARK standalone cluster can have multiple machines 
> running as slaves. 
> 
> The general idea of writing to the user group is that people who know should 
> answer, and not those who do not know. 
> 
> 
> 
> Regards,
> Gourav Sengupta
> 
> On Tue, Aug 1, 2017 at 4:50 AM, Mahesh Sawaiker 
> > 
> wrote:
> Gourav,
> 
> Riccardo’s answer is spot on.
> 
> What is happening is one node of spark is writing to its own directory and 
> telling a slave to read the data from there, when the slave goes to read it, 
> the part is not found.
> 
>  
> 
> Check the folder 
> Users/gouravsengupta/Development/spark/sparkdata/test1/part-1-e79273b5-9b4e-4037-92f3-2e52f523dfdf-c000.snappy.parquet
>  on the slave.
> 
> The reason it ran on spark 1.5 may have been because the executor ran on the 
> driver itself. There is not much use to a set up where you don’t have some 
> kind of distributed file system, so I would encourage you to use hdfs, or a 
> mounted file system shared by all nodes.
> 
>  
> 
> Regards,
> 
> Mahesh
> 
>  
> 
>  
> 
> From: Gourav Sengupta [mailto:gourav.sengu...@gmail.com 
> ] 
> Sent: Monday, July 31, 2017 9:54 PM
> To: Riccardo Ferrari
> Cc: user
> Subject: Re: SPARK Issue in Standalone cluster
> 
>  
> 
> Hi Riccardo,
> 
>  
> 
> I am grateful for your kind response.
> 
>  
> 
> Also I am sure that your answer is completely wrong and errorneous. SPARK 
> must be having a method so that different executors do not pick up the same 
> files to process. You also did not answer the question why was the processing 
> successful in SPARK 1.5 and not in SPARK 2.2.
> 
>  
> 
> Also the exact same directory is is present across in both the nodes. 
> 
>  
> 
> I feel quite facinated when individuals respond before even understanding the 
> issue, or trying out the code.
> 
>  
> 
> It will be of great help if someone could kindly read my email and help me 
> figure out the issue.
> 
>  
> 
>  
> 
> Regards,
> 
> Gourav Sengupta
> 
>  
> 
>  
> 
>  
> 
> On Mon, Jul 31, 2017 at 9:27 AM, Riccardo Ferrari  > wrote:
> 
> Hi Gourav,
> 
>  
> 
> The issue here is the location where you're trying to write/read from 
> :/Users/gouravsengupta/Development/spark/sparkdata/test1/p...
> 
> When dealing with clusters all the paths and resources should be available to 
> all executors (and driver), and that is reason why you generally use HDFS, 
> S3, NFS or any shared file system.
> 
>  
> 
> Spark assumes your data is generally available to all nodes and does not 
> tries to pick up the data from a selected node, it rather tries to write/read 
> in parallel from the executor nodes. Also given its control logic there is no 
> way (read. you should not care) to know what executor is doing what task.
> 
>  
> 
> Hope it helps,
> 
> Riccardo
> 
>  
> 
> On Mon, Jul 31, 2017 at 2:14 AM, Gourav Sengupta  > wrote:
> 
> Hi,
> 
>  
> 
> I am working by creating a native SPARK standalone cluster 
> (https://spark.apache.org/docs/2.2.0/spark-standalone.html 
> )
> 
>  
> 
> Therefore I  do not have a HDFS. 
> 
>  
> 
>  
> 
> EXERCISE: 
> 
> Its the most fundamental and simple exercise. Create a sample SPARK dataframe 
> and then write it to a location and then read it back.
> 
>  
> 
> SETTINGS:
> 
> So after I have installed SPARK in two physical systems with the same:
> 
> 1. SPARK version, 
> 
> 2. JAVA version, 
> 
> 3. PYTHON_PATH
> 
> 4. SPARK_HOME
> 
> 5. PYSPARK_PYTHON 
> 
> the user in both the systems is the root user therefore there are no 
> permission issues anywhere.
> 
>  
> 
> I am able to start:
> 
> 1. ./spark-2.2.0-bin-hadoop2.7/sbin/start-master.sh
> 
> 2. ./spark-2.2.0-bin-hadoop2.7/sbin/start-slave.sh (from two separate 
> computers)
> 
>  
> 
> After that I can see in the spark UI (at port 8080) two workers.
> 
>  
> 
>  
> 
> CODE:
> 
> Then I run the following 

Re: PySpark Streaming S3 checkpointing

2017-08-02 Thread Steve Loughran

On 2 Aug 2017, at 10:34, Riccardo Ferrari 
> wrote:

Hi list!

I am working on a pyspark streaming job (ver 2.2.0) and I need to enable 
checkpointing. At high level my python script goes like this:

class StreamingJob():

def __init__(..):
...
   sparkContext._jsc.hadoopConfiguration().set('fs.s3a.access.key',)
   sparkContext._jsc.hadoopConfiguration().set('fs.s3a.secret.key',)

def doJob(self):
   ssc = StreamingContext.getOrCreate('', )

and I run it:

myJob = StreamingJob(...)
myJob.doJob()

The problem is that StreamingContext.getOrCreate is not able to have access to 
hadoop configuration configured in the constructor and fails to load from 
checkpoint with

"com.amazonaws.AmazonClientException: Unable to load AWS credentials from any 
provider in the chain"

If I export AWS credentials to the system ENV before starting the script it 
works!


Spark magically copies the env vars over for you when you launch a job


I see the Scala version has an option to provide the hadoop configuration that 
is not available in python

I don't have the whole Hadoop, just Spark, so I don't really want to configure 
hadoop's xmls and such


when you set up the context, as in spark-defaults.conf

spark.hadoop.fs.s3a.access.key=access key
spark.hadoop.fs.s3a.secret.key=secret key

Reminder: Do keep your secret key a secret, avoid checking it in to any form of 
revision control.


Re: Reparitioning Hive tables - Container killed by YARN for exceeding memory limits

2017-08-02 Thread Chetan Khatri
Can anyone please guide me with above issue.


On Wed, Aug 2, 2017 at 6:28 PM, Chetan Khatri 
wrote:

> Hello Spark Users,
>
> I have Hbase table reading and writing to Hive managed table where i
> applied partitioning by date column which worked fine but it has generate
> more number of files in almost 700 partitions but i wanted to use
> reparation to reduce File I/O by reducing number of files inside each
> partition.
>
> *But i ended up with below exception:*
>
> ExecutorLostFailure (executor 11 exited caused by one of the running
> tasks) Reason: Container killed by YARN for exceeding memory limits. 14.0
> GB of 14 GB physical memory used. Consider boosting spark.yarn.executor.
> memoryOverhead.
>
> Driver memory=4g, executor mem=12g, num-executors=8, executor core=8
>
> Do you think below setting can help me to overcome above issue:
>
> spark.default.parellism=1000
> spark.sql.shuffle.partitions=1000
>
> Because default max number of partitions are 1000.
>
>
>


Re: SPARK Issue in Standalone cluster

2017-08-02 Thread Steve Loughran

On 2 Aug 2017, at 14:25, Gourav Sengupta 
> wrote:

Hi,

I am definitely sure that at this point of time everyone who has kindly cared 
to respond to my query do need to go and check this link 
https://spark.apache.org/docs/2.2.0/spark-standalone.html#spark-standalone-mode.

I see. Well, we shall have to edit that document to make clear something which 
had been omitted:

in order for multiple spark workers to process data, they must have a shared 
store for that data, one with read/write access for all workers. This is must 
be provided by a shared filesystem: HDFS, network-mounted NFS, Glusterfs, 
through an object store (S3, Azure WASB, ...), or through alternative 
datastores implementing the Hadoop Filesystem API (example: Apache Cassandra).

n your case, for a small cluster of 1-3 machines, especially if you are just 
learning to play with spark, I'd start with an NFS mounted disk accessible on 
the same path on all machines. If you aren't willing to set that up, stick to 
spark standalone on a single machine first. You don't need a shared cluster to 
use spark standalone.

Personally, I'd recommend downloading apache zeppelin and running it locally as 
the simplest out-the-box experience.


It does mention that SPARK standalone cluster can have multiple machines 
running as slaves.


Clearly it omits the small detail about the requirement for a shared store.

The general idea of writing to the user group is that people who know should 
answer, and not those who do not know.

Agreed, but if the answer doesn't appear to be correct to you, do consider that 
there may be some detail that hasn't been mentioned, rather than immediately 
concluding that the person replying is wrong.

-Steve





Regards,
Gourav Sengupta

On Tue, Aug 1, 2017 at 4:50 AM, Mahesh Sawaiker 
> wrote:
Gourav,
Riccardo’s answer is spot on.
What is happening is one node of spark is writing to its own directory and 
telling a slave to read the data from there, when the slave goes to read it, 
the part is not found.

Check the folder 
Users/gouravsengupta/Development/spark/sparkdata/test1/part-1-e79273b5-9b4e-4037-92f3-2e52f523dfdf-c000.snappy.parquet
 on the slave.
The reason it ran on spark 1.5 may have been because the executor ran on the 
driver itself. There is not much use to a set up where you don’t have some kind 
of distributed file system, so I would encourage you to use hdfs, or a mounted 
file system shared by all nodes.

Regards,
Mahesh


From: Gourav Sengupta 
[mailto:gourav.sengu...@gmail.com]
Sent: Monday, July 31, 2017 9:54 PM
To: Riccardo Ferrari
Cc: user
Subject: Re: SPARK Issue in Standalone cluster

Hi Riccardo,

I am grateful for your kind response.

Also I am sure that your answer is completely wrong and errorneous. SPARK must 
be having a method so that different executors do not pick up the same files to 
process. You also did not answer the question why was the processing successful 
in SPARK 1.5 and not in SPARK 2.2.

Also the exact same directory is is present across in both the nodes.

I feel quite facinated when individuals respond before even understanding the 
issue, or trying out the code.

It will be of great help if someone could kindly read my email and help me 
figure out the issue.


Regards,
Gourav Sengupta



On Mon, Jul 31, 2017 at 9:27 AM, Riccardo Ferrari 
> wrote:
Hi Gourav,

The issue here is the location where you're trying to write/read from 
:/Users/gouravsengupta/Development/spark/sparkdata/test1/p...
When dealing with clusters all the paths and resources should be available to 
all executors (and driver), and that is reason why you generally use HDFS, S3, 
NFS or any shared file system.

Spark assumes your data is generally available to all nodes and does not tries 
to pick up the data from a selected node, it rather tries to write/read in 
parallel from the executor nodes. Also given its control logic there is no way 
(read. you should not care) to know what executor is doing what task.

Hope it helps,
Riccardo

On Mon, Jul 31, 2017 at 2:14 AM, Gourav Sengupta 
> wrote:
Hi,

I am working by creating a native SPARK standalone cluster 
(https://spark.apache.org/docs/2.2.0/spark-standalone.html)

Therefore I  do not have a HDFS.


EXERCISE:
Its the most fundamental and simple exercise. Create a sample SPARK dataframe 
and then write it to a location and then read it back.

SETTINGS:
So after I have installed SPARK in two physical systems with the same:
1. SPARK version,
2. JAVA version,
3. PYTHON_PATH
4. SPARK_HOME
5. PYSPARK_PYTHON
the user in both the systems is the root user therefore there are no permission 
issues anywhere.

I am able to start:
1. ./spark-2.2.0-bin-hadoop2.7/sbin/start-master.sh
2. 

Re: Runnig multiple spark jobs on yarn

2017-08-02 Thread Jörn Franke
And if the yarn queues are configured  as such

> On 2. Aug 2017, at 16:47, ayan guha  wrote:
> 
> Each of your spark-submit will create separate applications in YARN and run 
> concurrently (if you have enough resource, that is)
> 
>> On Thu, Aug 3, 2017 at 12:42 AM, serkan taş  wrote:
>> Hi,
>> 
>> Where should i start to be able to run multiple spark jobs concurrent on 3 
>> node spark cluster?
>> 
>> Android için Outlook uygulamasını edinin
>> 
> 
> 
> 
> -- 
> Best Regards,
> Ayan Guha


[PySpark] Multiple driver cores

2017-08-02 Thread Judit Planas

  
  
Hello,

I recently came across the "--driver-cores" option when, for
example, launching a PySpark shell.

Provided that there are idle CPUs on driver's node, what would be
the benefit of having multiple driver cores? For example, will this
accelerate the "collect()" operations? Will they be done in
parallel?

Can't find any documentation about how the behavior will change from
using just 1 core for the driver.

Thanks in advance.

Judit
  


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Runnig multiple spark jobs on yarn

2017-08-02 Thread ayan guha
Each of your spark-submit will create separate applications in YARN and run
concurrently (if you have enough resource, that is)

On Thu, Aug 3, 2017 at 12:42 AM, serkan taş  wrote:

> Hi,
>
> Where should i start to be able to run multiple spark jobs concurrent on 3
> node spark cluster?
>
> Android için Outlook  uygulamasını edinin
>
>


-- 
Best Regards,
Ayan Guha


Runnig multiple spark jobs on yarn

2017-08-02 Thread serkan taş
Hi,

Where should i start to be able to run multiple spark jobs concurrent on 3 node 
spark cluster?

Android için Outlook uygulamasını edinin



UNSUBSCRIBE

2017-08-02 Thread Jnana Sagar
Please Unsubscribe me.

-- 
regards
Jnana Sagar


Reading spark-env.sh from configured directory

2017-08-02 Thread Lior Chaga
Hi,

I have multiple spark deployments using mesos.
I use spark.executor.uri to fetch the spark distribution to executor node.

Every time I upgrade spark, I download the default distribution, and just
add to it custom spark-env.sh to spark/conf folder.

Further more, any change I want to do in spark-env.sh, forces me to
re-package the distribution.

Trying to find a way to provide the executors the location of spark conf
dir by using executor.extraJavaOptions
(-DSPARK_CONF_DIR=/path/on/worker/node), but it doesn't seem to work.

Any idea how I achieve it?

Thanks


Re: SPARK Issue in Standalone cluster

2017-08-02 Thread Gourav Sengupta
Hi,

I am definitely sure that at this point of time everyone who has kindly
cared to respond to my query do need to go and check this link
https://spark.apache.org/docs/2.2.0/spark-standalone.html#spark-standalone-mode
.

It does mention that SPARK standalone cluster can have multiple machines
running as slaves.

The general idea of writing to the user group is that people who know
should answer, and not those who do not know.



Regards,
Gourav Sengupta

On Tue, Aug 1, 2017 at 4:50 AM, Mahesh Sawaiker <
mahesh_sawai...@persistent.com> wrote:

> Gourav,
>
> Riccardo’s answer is spot on.
>
> What is happening is one node of spark is writing to its own directory and
> telling a slave to read the data from there, when the slave goes to read
> it, the part is not found.
>
>
>
> Check the folder Users/gouravsengupta/Development/spark/sparkdata/
> test1/part-1-e79273b5-9b4e-4037-92f3-2e52f523dfdf-c000.snappy.parquet on
> the slave.
>
> The reason it ran on spark 1.5 may have been because the executor ran on
> the driver itself. There is not much use to a set up where you don’t have
> some kind of distributed file system, so I would encourage you to use hdfs,
> or a mounted file system shared by all nodes.
>
>
>
> Regards,
>
> Mahesh
>
>
>
>
>
> *From:* Gourav Sengupta [mailto:gourav.sengu...@gmail.com]
> *Sent:* Monday, July 31, 2017 9:54 PM
> *To:* Riccardo Ferrari
> *Cc:* user
> *Subject:* Re: SPARK Issue in Standalone cluster
>
>
>
> Hi Riccardo,
>
>
>
> I am grateful for your kind response.
>
>
>
> Also I am sure that your answer is completely wrong and errorneous. SPARK
> must be having a method so that different executors do not pick up the same
> files to process. You also did not answer the question why was the
> processing successful in SPARK 1.5 and not in SPARK 2.2.
>
>
>
> Also the exact same directory is is present across in both the nodes.
>
>
>
> I feel quite facinated when individuals respond before even understanding
> the issue, or trying out the code.
>
>
>
> It will be of great help if someone could kindly read my email and help me
> figure out the issue.
>
>
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
>
>
>
>
> On Mon, Jul 31, 2017 at 9:27 AM, Riccardo Ferrari 
> wrote:
>
> Hi Gourav,
>
>
>
> The issue here is the location where you're trying to write/read from :
> /Users/gouravsengupta/Development/spark/sparkdata/test1/p...
>
> When dealing with clusters all the paths and resources should be available
> to all executors (and driver), and that is reason why you generally use
> HDFS, S3, NFS or any shared file system.
>
>
>
> Spark assumes your data is generally available to all nodes and does not
> tries to pick up the data from a selected node, it rather tries to
> write/read in parallel from the executor nodes. Also given its control
> logic there is no way (read. you should not care) to know what executor is
> doing what task.
>
>
>
> Hope it helps,
>
> Riccardo
>
>
>
> On Mon, Jul 31, 2017 at 2:14 AM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
> Hi,
>
>
>
> I am working by creating a native SPARK standalone cluster (
> https://spark.apache.org/docs/2.2.0/spark-standalone.html)
>
>
>
> Therefore I  do not have a HDFS.
>
>
>
>
>
> EXERCISE:
>
> Its the most fundamental and simple exercise. Create a sample SPARK
> dataframe and then write it to a location and then read it back.
>
>
>
> SETTINGS:
>
> So after I have installed SPARK in two physical systems with the same:
>
> 1. SPARK version,
>
> 2. JAVA version,
>
> 3. PYTHON_PATH
>
> 4. SPARK_HOME
>
> 5. PYSPARK_PYTHON
>
> the user in both the systems is the root user therefore there are no
> permission issues anywhere.
>
>
>
> I am able to start:
>
> 1. ./spark-2.2.0-bin-hadoop2.7/sbin/start-master.sh
>
> 2. ./spark-2.2.0-bin-hadoop2.7/sbin/start-slave.sh (from two separate
> computers)
>
>
>
> After that I can see in the spark UI (at port 8080) two workers.
>
>
>
>
>
> CODE:
>
> Then I run the following code:
>
>
>
> ==
>
> import findspark
>
> import os
>
> os.environ["SPARK_HOME"] = '/Users/gouravsengupta/
> Development/spark/spark/'
>
> findspark.init()
>
> import pyspark
>
> from pyspark.sql import SparkSession
>
> spark = (SparkSession.builder
>
> .master("spark://mastersystem.local:7077")
>
> .appName("gouravtest")
>
> .enableHiveSupport()
>
> .getOrCreate())
>
> import pandas, numpy
>
> testdf = spark.createDataFrame(pandas.DataFrame(numpy.random.randn(1,
> 4), columns=list('ABCD')))
>
> testdf.cache()
>
> testdf.count()
>
> testdf.write.save("/Users/gouravsengupta/Development/
> spark/sparkdata/test2")
>
> spark.read.load("/Users/gouravsengupta/Development/
> spark/sparkdata/test2").count()
>
> ==
>
>
>
>
>
> ERROR I (in above code):
>
> ERROR in line: testdf.write.save("/Users/gouravsengupta/Development/
> spark/sparkdata/test2")
>
> This line does not 

Re: Quick one on evaluation

2017-08-02 Thread Jean Georges Perrin
Hey Jörn,

The "pending" was more something like a flag like myDf.hasCatalystWorkToDo() or 
myDf.isPendingActions(). Maybe an access to the DAG?

I just did that:
ordersDf = ordersDf.withColumn(
"time_to_ship", 
datediff(ordersDf.col("ship_date"), ordersDf.col("order_date")));

ordersDf.printSchema();
ordersDf.show();

and the schema and data is correct, so I was wondering what triggered 
Catalyst...

jg



> On Aug 2, 2017, at 8:29 AM, Jörn Franke  wrote:
> 
> I assume printschema would not trigger an evaluation. Show might partially 
> triggger an evaluation (not all data is shown only a certain number of rows 
> by default).
> Keep in mind that even a count might not trigger evaluation of all rows 
> (especially in the future) due to updates on the optimizer. 
> 
> What do you mean by pending ? You can see the status of the job in the UI. 
> 
>> On 2. Aug 2017, at 14:16, Jean Georges Perrin  wrote:
>> 
>> Hi Sparkians,
>> 
>> I understand the lazy evaluation mechanism with transformations and actions. 
>> My question is simpler: 1) are show() and/or printSchema() actions? I would 
>> assume so...
>> 
>> and optional question: 2) is there a way to know if there are 
>> transformations "pending"?
>> 
>> Thanks!
>> 
>> jg
>> 
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 



Reparitioning Hive tables - Container killed by YARN for exceeding memory limits

2017-08-02 Thread Chetan Khatri
Hello Spark Users,

I have Hbase table reading and writing to Hive managed table where i
applied partitioning by date column which worked fine but it has generate
more number of files in almost 700 partitions but i wanted to use
reparation to reduce File I/O by reducing number of files inside each
partition.

*But i ended up with below exception:*

ExecutorLostFailure (executor 11 exited caused by one of the running tasks)
Reason: Container killed by YARN for exceeding memory limits. 14.0 GB of 14
GB physical memory used. Consider boosting spark.yarn.executor.
memoryOverhead.

Driver memory=4g, executor mem=12g, num-executors=8, executor core=8

Do you think below setting can help me to overcome above issue:

spark.default.parellism=1000
spark.sql.shuffle.partitions=1000

Because default max number of partitions are 1000.


Re: Quick one on evaluation

2017-08-02 Thread Jörn Franke
I assume printschema would not trigger an evaluation. Show might partially 
triggger an evaluation (not all data is shown only a certain number of rows by 
default).
Keep in mind that even a count might not trigger evaluation of all rows 
(especially in the future) due to updates on the optimizer. 

What do you mean by pending ? You can see the status of the job in the UI. 

> On 2. Aug 2017, at 14:16, Jean Georges Perrin  wrote:
> 
> Hi Sparkians,
> 
> I understand the lazy evaluation mechanism with transformations and actions. 
> My question is simpler: 1) are show() and/or printSchema() actions? I would 
> assume so...
> 
> and optional question: 2) is there a way to know if there are transformations 
> "pending"?
> 
> Thanks!
> 
> jg
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Quick one on evaluation

2017-08-02 Thread Jean Georges Perrin
Hi Sparkians,

I understand the lazy evaluation mechanism with transformations and actions. My 
question is simpler: 1) are show() and/or printSchema() actions? I would assume 
so...

and optional question: 2) is there a way to know if there are transformations 
"pending"?

Thanks!

jg

 
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Inserting content of df to partitioned hive table (parquet format)

2017-08-02 Thread Jens Johannsen
Hi All,

I'm trying to insert the content of a dataframe to a partitioned
parquet-formatted hive table using

*df.write.mode(SaveMode.Append).insertInto(myTable)*

with *hive.exec.dynamic.partition = 'true' * and
*hive.exec.dynamic.partition.mode
= 'nonstrict'.*

I keep getting an *parquet.io.ParquetEncodingException* saying that

*"empty fields are illegal, the field should be ommited completely instead"*
.

The schema includes arrays, and the df do contain some empty entries for
these fields.

However, when I insert the df content into a *non*-partitioned table, I do
not get an error.

Can anyone give me an explanation about what is going on here ? Why does
this error appear only when the hive table is partitioned ?

Thanks,
Best Regards,
Jens


Spark-twitter Streaming

2017-08-02 Thread deependra singh
Hello everyone,


This is in reference to spark-twitter streaming.


val stream = TwitterUtils.createStream(ssc, None)

can anybody tell me why this dstream is no a proper JSON object
as I am not able to parse it further.


spark-version = 2.0.1
spark-api =scala
streaming jar = org.apache.bahir

Regards,
Deependra


PySpark Streaming S3 checkpointing

2017-08-02 Thread Riccardo Ferrari
Hi list!

I am working on a pyspark streaming job (ver 2.2.0) and I need to enable
checkpointing. At high level my python script goes like this:

class StreamingJob():

def __init__(..):
...
   sparkContext._jsc.hadoopConfiguration().set('fs.s3a.access.key',)
   sparkContext._jsc.hadoopConfiguration().set('fs.s3a.secret.key',)

def doJob(self):
   ssc = StreamingContext.getOrCreate('', )

and I run it:

myJob = StreamingJob(...)
myJob.doJob()

The problem is that StreamingContext.getOrCreate is not able to have access
to hadoop configuration configured in the constructor and fails to load
from checkpoint with

"com.amazonaws.AmazonClientException: Unable to load AWS credentials from
any provider in the chain"

If I export AWS credentials to the system ENV before starting the script it
works!

I see the Scala version has an option to provide the hadoop configuration
that is not available in python

I don't have the whole Hadoop, just Spark, so I don't really want to
configure hadoop's xmls and such

What is the cleanest way to achieve my goal?

 thanks!