Re: SizeEstimator

2018-02-26 Thread 叶先进
What type is for the buffer you mentioned? 


> On 27 Feb 2018, at 11:46 AM, David Capwell  wrote:
> 
> advancedxy , I don't remember the code as well 
> anymore but what we hit was a very simple schema (string, long). The issue is 
> the buffer had a million of these so SizeEstimator of the buffer had to keep 
> recalculating the same elements over and over again.  SizeEstimator was 
> on-cpu about 30% of the time, bounding the buffer got it to be < 5% (going 
> off memory so may be off).
> 
The class info(size of fields lay on heap) is cached for every occurred class, 
so the size info of the same elements would not be recalculated. However, for 
Collection class (or similar) SizeEstimator will scan all the elements in the 
container (`next` field in LinkedList for example).

And the array is a special case: SizeEstimator will sample array if 
array.length > ARRAY_SIZE_FOR_SAMPLING(400).
> The cost is really (assuming memory is O(1) which is not true) O(N × M) where 
> N is number of rows in buffer and M is size of schema.  My case could be 
> solved by not recomputing which would bring the cost to O(M) since 
> bookkeeping should be consistent time. There was logic to delay recalculating 
> bases off a change in frequency, but that didn't really do much for us, 
> bounding and spilling was the bigger win in our case.
> 
> 
> On Mon, Feb 26, 2018, 7:24 PM Xin Liu  > wrote:
> Thanks David. Another solution is to convert the protobuf object to byte 
> array, It does speed up SizeEstimator
> 
> On Mon, Feb 26, 2018 at 5:34 PM, David Capwell  > wrote:
> This is used to predict the current cost of memory so spark knows to flush or 
> not. This is very costly for us so we use a flag marked in the code as 
> private to lower the cost
> 
> spark.shuffle.spill.numElementsForceSpillThreshold (on phone hope no typo) - 
> how many records before flush
> 
> This lowers the cost because it let's us leave data in young, if we don't 
> bound we get everyone promoted to old and GC becomes a issue.  This doesn't 
> solve the fact that the walk is slow, but lowers the cost of GC. For us we 
> make sure to have spare memory on the system for page cache so spilling to 
> disk for us is a memory write 99% of the time.  If your host has less free 
> memory spilling may become more expensive.
> 
> 
> If the walk is your bottleneck and not GC then I would recommend JOL and 
> guessing to better predict memory.  
> 
> On Mon, Feb 26, 2018, 4:47 PM Xin Liu  > wrote:
> Hi folks,
> 
> We have a situation where, shuffled data is protobuf based, and SizeEstimator 
> is taking a lot of time.
> 
> We have tried to override SizeEstimator to return a constant value, which 
> speeds up things a lot.
> 
> My questions, what is the side effect of disabling SizeEstimator? Is it just 
> spark do memory reallocation, or there is more severe consequences?
> 
> Thanks!
> 



Re: SizeEstimator

2018-02-26 Thread David Capwell
advancedxy , I don't remember the code as well
anymore but what we hit was a very simple schema (string, long). The issue
is the buffer had a million of these so SizeEstimator of the buffer had to
keep recalculating the same elements over and over again.  SizeEstimator
was on-cpu about 30% of the time, bounding the buffer got it to be < 5%
(going off memory so may be off).

The cost is really (assuming memory is O(1) which is not true) O(N × M)
where N is number of rows in buffer and M is size of schema.  My case could
be solved by not recomputing which would bring the cost to O(M) since
bookkeeping should be consistent time. There was logic to delay
recalculating bases off a change in frequency, but that didn't really do
much for us, bounding and spilling was the bigger win in our case.

On Mon, Feb 26, 2018, 7:24 PM Xin Liu  wrote:

> Thanks David. Another solution is to convert the protobuf object to byte
> array, It does speed up SizeEstimator
>
> On Mon, Feb 26, 2018 at 5:34 PM, David Capwell  wrote:
>
>> This is used to predict the current cost of memory so spark knows to
>> flush or not. This is very costly for us so we use a flag marked in the
>> code as private to lower the cost
>>
>> spark.shuffle.spill.numElementsForceSpillThreshold (on phone hope no
>> typo) - how many records before flush
>>
>> This lowers the cost because it let's us leave data in young, if we don't
>> bound we get everyone promoted to old and GC becomes a issue.  This doesn't
>> solve the fact that the walk is slow, but lowers the cost of GC. For us we
>> make sure to have spare memory on the system for page cache so spilling to
>> disk for us is a memory write 99% of the time.  If your host has less free
>> memory spilling may become more expensive.
>>
>>
>> If the walk is your bottleneck and not GC then I would recommend JOL and
>> guessing to better predict memory.
>>
>> On Mon, Feb 26, 2018, 4:47 PM Xin Liu  wrote:
>>
>>> Hi folks,
>>>
>>> We have a situation where, shuffled data is protobuf based, and
>>> SizeEstimator is taking a lot of time.
>>>
>>> We have tried to override SizeEstimator to return a constant value,
>>> which speeds up things a lot.
>>>
>>> My questions, what is the side effect of disabling SizeEstimator? Is it
>>> just spark do memory reallocation, or there is more severe consequences?
>>>
>>> Thanks!
>>>
>>
>


Re: SizeEstimator

2018-02-26 Thread Xin Liu
Thanks David. Another solution is to convert the protobuf object to byte
array, It does speed up SizeEstimator

On Mon, Feb 26, 2018 at 5:34 PM, David Capwell  wrote:

> This is used to predict the current cost of memory so spark knows to flush
> or not. This is very costly for us so we use a flag marked in the code as
> private to lower the cost
>
> spark.shuffle.spill.numElementsForceSpillThreshold (on phone hope no
> typo) - how many records before flush
>
> This lowers the cost because it let's us leave data in young, if we don't
> bound we get everyone promoted to old and GC becomes a issue.  This doesn't
> solve the fact that the walk is slow, but lowers the cost of GC. For us we
> make sure to have spare memory on the system for page cache so spilling to
> disk for us is a memory write 99% of the time.  If your host has less free
> memory spilling may become more expensive.
>
>
> If the walk is your bottleneck and not GC then I would recommend JOL and
> guessing to better predict memory.
>
> On Mon, Feb 26, 2018, 4:47 PM Xin Liu  wrote:
>
>> Hi folks,
>>
>> We have a situation where, shuffled data is protobuf based, and
>> SizeEstimator is taking a lot of time.
>>
>> We have tried to override SizeEstimator to return a constant value, which
>> speeds up things a lot.
>>
>> My questions, what is the side effect of disabling SizeEstimator? Is it
>> just spark do memory reallocation, or there is more severe consequences?
>>
>> Thanks!
>>
>


Re: SizeEstimator

2018-02-26 Thread Xin Liu
Thanks!

Our protobuf object is fairly complex. Even O(N) takes a lot of time.

On Mon, Feb 26, 2018 at 6:33 PM, 叶先进  wrote:

> H Xin Liu,
>
> Could you provide a concrete user case if possible(code to reproduce
> protobuf object and comparisons between  protobuf and normal object)?
>
> I contributed a bit to SizeEstimator years ago, and to my understanding,
> the time complexity should be O(N) where N is the num of referenced fields
> recursively.
>
> We should definitely investigate this case if it indeed takes a lot of
> time on protobuf objects.
>
>
> On 27 Feb 2018, at 8:47 AM, Xin Liu  wrote:
>
> Hi folks,
>
> We have a situation where, shuffled data is protobuf based, and
> SizeEstimator is taking a lot of time.
>
> We have tried to override SizeEstimator to return a constant value, which
> speeds up things a lot.
>
> My questions, what is the side effect of disabling SizeEstimator? Is it
> just spark do memory reallocation, or there is more severe consequences?
>
> Thanks!
>
>
>


Re: SizeEstimator

2018-02-26 Thread 叶先进
H Xin Liu, 

Could you provide a concrete user case if possible(code to reproduce protobuf 
object and comparisons between  protobuf and normal object)?

I contributed a bit to SizeEstimator years ago, and to my understanding, the 
time complexity should be O(N) where N is the num of referenced fields 
recursively.

We should definitely investigate this case if it indeed takes a lot of time on 
protobuf objects.

> On 27 Feb 2018, at 8:47 AM, Xin Liu  wrote:
> 
> Hi folks,
> 
> We have a situation where, shuffled data is protobuf based, and SizeEstimator 
> is taking a lot of time.
> 
> We have tried to override SizeEstimator to return a constant value, which 
> speeds up things a lot.
> 
> My questions, what is the side effect of disabling SizeEstimator? Is it just 
> spark do memory reallocation, or there is more severe consequences?
> 
> Thanks!



Re: SizeEstimator

2018-02-26 Thread David Capwell
This is used to predict the current cost of memory so spark knows to flush
or not. This is very costly for us so we use a flag marked in the code as
private to lower the cost

spark.shuffle.spill.numElementsForceSpillThreshold (on phone hope no typo)
- how many records before flush

This lowers the cost because it let's us leave data in young, if we don't
bound we get everyone promoted to old and GC becomes a issue.  This doesn't
solve the fact that the walk is slow, but lowers the cost of GC. For us we
make sure to have spare memory on the system for page cache so spilling to
disk for us is a memory write 99% of the time.  If your host has less free
memory spilling may become more expensive.


If the walk is your bottleneck and not GC then I would recommend JOL and
guessing to better predict memory.

On Mon, Feb 26, 2018, 4:47 PM Xin Liu  wrote:

> Hi folks,
>
> We have a situation where, shuffled data is protobuf based, and
> SizeEstimator is taking a lot of time.
>
> We have tried to override SizeEstimator to return a constant value, which
> speeds up things a lot.
>
> My questions, what is the side effect of disabling SizeEstimator? Is it
> just spark do memory reallocation, or there is more severe consequences?
>
> Thanks!
>


SizeEstimator

2018-02-26 Thread Xin Liu
Hi folks,

We have a situation where, shuffled data is protobuf based, and
SizeEstimator is taking a lot of time.

We have tried to override SizeEstimator to return a constant value, which
speeds up things a lot.

My questions, what is the side effect of disabling SizeEstimator? Is it
just spark do memory reallocation, or there is more severe consequences?

Thanks!


how to add columns to row when column has a different encoder?

2018-02-26 Thread David Capwell
I have a row that looks like the following pojo

case class Wrapper(var id: String, var bytes: Array[Byte])

Those bytes are a serialized pojo that looks like this

case class Inner(var stuff: String, var moreStuff: String)

I right now have encoders for both the types, but I don't see how to merge
the two into a unified row that looks like the following


struct>

If I know how to deserialize the bytes and have a encoder, how could I get
the above schema?  I was looking at ds.withColumn("inner", ???) but wasn't
sure how to go from pojo + encoder to a column.  Is there a better way to
do this?

Thanks for your time reading this email


Re: Out of memory Error when using Collection Accumulator Spark 2.2

2018-02-26 Thread naresh Goud
what is your driver memory?

Thanks,
Naresh
www.linkedin.com/in/naresh-dulam
http://hadoopandspark.blogspot.com/


On Mon, Feb 26, 2018 at 3:45 AM, Patrick  wrote:

> Hi,
>
> We were getting OOM error when we are accumulating the results of each
> worker. We were trying to avoid collecting data to driver node instead used
> accumulator as per below code snippet,
>
> Is there any spark config to set the accumulator settings Or am i doing
> the wrong way to collect the huge data set?
>
>   CollectionAccumulator accumulate;
>   Dataset bin;
>
> bin.foreach((ForeachFunction) row -> {
>   accumulate.add(row.get(0) + "|" + row.get(1) + "|" + row.get(2));
> });
>
> accumulate.value().forEach(element -> {
>   String[] arr = element.split("\\|");
>   String count = arr[2];
>   double percentage =
>   (total == 0.0) ? 0.0 : (Double.valueOf(count) / total);
>   PayloadBin payload = new PayloadBin(arr[0],
>   arr[1], 0, Long.valueOf(count), percentage, sortBy, sortOrder);
>   binArray.add(payload);
>
> });
>
>
> 18/02/21 17:35:23 INFO storage.BlockManagerInfo: Added taskresult_5050 in
> memory on rhlhddfrd225.fairisaac.com:41640 (size: 3.7 MB, free: 8.3 GB)
>
> 18/02/21 17:35:24 INFO storage.BlockManagerInfo: Removed taskresult_5034
> on rhlhddfrd218.fairisaac.com:46584 in memory (size: 3.7 MB, free: 8.4 GB)
>
> 18/02/21 17:35:25 INFO scheduler.TaskSetManager: Finished task 59.0 in
> stage 20.0 (TID 5034) in 9908 ms on rhlhddfrd218.fairisaac.com (executor
> 92) (14/200)
>
> Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError:
> Java heap space
>
> at java.util.Arrays.copyOf(Arrays.java:3181)
>
> at java.util.ArrayList.toArray(ArrayList.java:376)
>
> at java.util.Collections$SynchronizedCollection.
> toArray(Collections.java:2024)
>
> at java.util.ArrayList.(ArrayList.java:177)
>
> at org.apache.spark.util.CollectionAccumulator.value(
> AccumulatorV2.scala:470)
>
>


Re: Spark EMR executor-core vs Vcores

2018-02-26 Thread yncxcw
hi, all

I also noticed this problem. The reason is that Yarn accounts each executor
for only 1, no matter how many cores you configured. 
Because Yarn only uses memory as the primary metrics for resource
allocation. It means that Yarn will pack as many as executors on each node
as long as the node has 
free memory space.

If you want to enable vcores to be accounted for resource allocation, you
can configure the resource calculator as DominantResoruceCalculator, as
following:

PropertyDescription
yarn.scheduler.capacity.resource-calculator The ResourceCalculator
implementation to be used to compare Resources in the scheduler. The default
i.e. org.apache.hadoop.yarn.util.resource.DefaultResourseCalculator only
uses Memory while DominantResourceCalculator uses Dominant-resource to
compare multi-dimensional resources such as Memory, CPU etc. A Java
ResourceCalculator class name is expected.


Please also refer this article:
https://hortonworks.com/blog/managing-cpu-resources-in-your-hadoop-yarn-clusters/


Thanks!

Wei Chen



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: partitionBy with partitioned column in output?

2018-02-26 Thread Alex Nastetsky
Yeah, was just discussing this with a co-worker and came to the same
conclusion -- need to essentially create a copy of the partition column.
Thanks.

Hacky, but it works. Seems counter-intuitive that Spark would remove the
column from the output... should at least give you an option to keep it.

On Mon, Feb 26, 2018 at 5:47 PM, naresh Goud 
wrote:

> is this helps?
>
> sc.parallelize(List((1,10),(2,20))).toDF("foo","bar").map(("
> foo","bar")=>("foo",("foo","bar"))).partitionBy("foo").json("json-out")
>
>
> On Mon, Feb 26, 2018 at 4:28 PM, Alex Nastetsky 
> wrote:
>
>> Is there a way to make outputs created with "partitionBy" to contain the
>> partitioned column? When reading the output with Spark or Hive or similar,
>> it's less of an issue because those tools know how to perform partition
>> discovery. But if I were to load the output into an external data warehouse
>> or database, it would have no idea.
>>
>> Example below -- a dataframe with two columns "foo" and "bar" is
>> partitioned by "foo", but the data only contains "bar", since it expects
>> the reader to know how to derive the value of "foo" from the parent
>> directory. Note that it's the same thing with Parquet and Avro as well, I
>> just chose to use JSON in my example.
>>
>> scala> sc.parallelize(List((1,10),(2,20))).toDF("foo","bar").write.
>> partitionBy("foo").json("json-out")
>>
>>
>> $ ls json-out/
>> foo=1  foo=2  _SUCCESS
>> $ cat json-out/foo=1/part-3-18ca93d0-c3b1-424b-8ad5-291d8a2952
>> 3b.json
>> {"bar":10}
>> $ cat json-out/foo=2/part-7-18ca93d0-c3b1-424b-8ad5-291d8a2952
>> 3b.json
>> {"bar":20}
>>
>> Thanks,
>> Alex.
>>
>
>


Re: partitionBy with partitioned column in output?

2018-02-26 Thread naresh Goud
is this helps?

sc.parallelize(List((1,10),(2,
20))).toDF("foo","bar").map(("foo","bar")=>("foo",("foo","bar"))).
partitionBy("foo").json("json-out")


On Mon, Feb 26, 2018 at 4:28 PM, Alex Nastetsky 
wrote:

> Is there a way to make outputs created with "partitionBy" to contain the
> partitioned column? When reading the output with Spark or Hive or similar,
> it's less of an issue because those tools know how to perform partition
> discovery. But if I were to load the output into an external data warehouse
> or database, it would have no idea.
>
> Example below -- a dataframe with two columns "foo" and "bar" is
> partitioned by "foo", but the data only contains "bar", since it expects
> the reader to know how to derive the value of "foo" from the parent
> directory. Note that it's the same thing with Parquet and Avro as well, I
> just chose to use JSON in my example.
>
> scala> sc.parallelize(List((1,10),(2,20))).toDF("foo","bar").write.
> partitionBy("foo").json("json-out")
>
>
> $ ls json-out/
> foo=1  foo=2  _SUCCESS
> $ cat json-out/foo=1/part-3-18ca93d0-c3b1-424b-8ad5-291d8a29523b.json
> {"bar":10}
> $ cat json-out/foo=2/part-7-18ca93d0-c3b1-424b-8ad5-291d8a29523b.json
> {"bar":20}
>
> Thanks,
> Alex.
>


Re: Spark EMR executor-core vs Vcores

2018-02-26 Thread Patrick Alwell
+1

AFAIK,

vCores are not the same as Cores in AWS. 
https://samrueby.com/2015/01/12/what-are-amazon-aws-vcpus/

I’ve always understood it as cores = num concurrent threads

These posts might help you with your research and why exceeding 5 cores per 
executor doesn’t make sense.

https://stackoverflow.com/questions/24622108/apache-spark-the-number-of-cores-vs-the-number-of-executors
http://site.clairvoyantsoft.com/understanding-resource-allocation-configurations-spark-application/

AWS/ EMR was always a challenge for me. Never understood why it didn’t seem to 
be using all my resources; as you noted.

I would see this as –num-executors = 15 –executor-cores= 5 –executor-memory = 
10gb and then test my application from there.

I only got better performance out of a different class of nodes, e.g. R-series 
instance types. Costs more than the M class; but wound up using less of them 
and my jobs ran faster. I was in the 10+TB jobs territory with TPC data.  ☺ The 
links I provided have a few use cases and trials.

Hope that helps,

-Pat


From: Selvam Raman 
Date: Monday, February 26, 2018 at 1:52 PM
To: Vadim Semenov 
Cc: user 
Subject: Re: Spark EMR executor-core vs Vcores

Thanks. That’s make sense.

I want to know one more think , available vcore per machine is 16 but threads 
per node 8. Am I missing to relate here.

What I m thinking now is number of vote = number of threads.



On Mon, 26 Feb 2018 at 18:45, Vadim Semenov 
> wrote:
All used cores aren't getting reported correctly in EMR, and YARN itself has no 
control over it, so whatever you put in `spark.executor.cores` will be used,
but in the ResourceManager you will only see 1 vcore used per nodemanager.

On Mon, Feb 26, 2018 at 5:20 AM, Selvam Raman 
> wrote:
Hi,

spark version - 2.0.0
spark distribution - EMR 5.0.0

Spark Cluster - one master, 5 slaves
Master node - m3.xlarge - 8 vCore, 15 GiB memory, 80 SSD GB storage
Slave node - m3.2xlarge - 16 vCore, 30 GiB memory, 160 SSD GB storage



Cluster Metrics
Apps Submitted

Apps Pending

Apps Running

Apps Completed

Containers Running

Memory Used

Memory Total

Memory Reserved

VCores Used

VCores Total

VCores Reserved

Active Nodes

Decommissioning Nodes

Decommissioned Nodes

Lost Nodes

Unhealthy Nodes

Rebooted Nodes

16

0

1

15

5

88.88 GB

90.50 GB

22 GB

5

79

1

5

0

0

5

0

0


I have submitted job with below configuration
--num-executors 5 --executor-cores 10 --executor-memory 20g







spark.task.cpus - be default 1


My understanding is there will be 5 executore each can run 10 task at a time 
and task can share total memory of 20g. Here, i could see only 5 vcores used 
which means 1 executor instance use 20g+10%overhead ram(22gb), 10 core(number 
of threads), 1 Vcore(cpu).

please correct me if my understand is wrong.


how can i utilize number of vcore in EMR effectively. Will Vcore boost 
performance?



--
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"

--
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


partitionBy with partitioned column in output?

2018-02-26 Thread Alex Nastetsky
Is there a way to make outputs created with "partitionBy" to contain the
partitioned column? When reading the output with Spark or Hive or similar,
it's less of an issue because those tools know how to perform partition
discovery. But if I were to load the output into an external data warehouse
or database, it would have no idea.

Example below -- a dataframe with two columns "foo" and "bar" is
partitioned by "foo", but the data only contains "bar", since it expects
the reader to know how to derive the value of "foo" from the parent
directory. Note that it's the same thing with Parquet and Avro as well, I
just chose to use JSON in my example.

scala>
sc.parallelize(List((1,10),(2,20))).toDF("foo","bar").write.partitionBy("foo").json("json-out")


$ ls json-out/
foo=1  foo=2  _SUCCESS
$ cat json-out/foo=1/part-3-18ca93d0-c3b1-424b-8ad5-291d8a29523b.json
{"bar":10}
$ cat json-out/foo=2/part-7-18ca93d0-c3b1-424b-8ad5-291d8a29523b.json
{"bar":20}

Thanks,
Alex.


Re: Spark EMR executor-core vs Vcores

2018-02-26 Thread Vadim Semenov
yeah, for some reason (unknown to me, but you can find on aws forums) they
double the actual number of cores for nodemanagers.

I assume that's done to maximize utilization, but doesn't really matter to
me, at least, since I only run Spark, so I, personally, set `total number
of cores - 1/2` saving one core for the OS/DataNode/NodeManager, because
Spark itself can create a significant load.

On Mon, Feb 26, 2018 at 4:51 PM, Selvam Raman  wrote:

> Thanks. That’s make sense.
>
> I want to know one more think , available vcore per machine is 16 but
> threads per node 8. Am I missing to relate here.
>
> What I m thinking now is number of vote = number of threads.
>
>
>
> On Mon, 26 Feb 2018 at 18:45, Vadim Semenov  wrote:
>
>> All used cores aren't getting reported correctly in EMR, and YARN itself
>> has no control over it, so whatever you put in `spark.executor.cores` will
>> be used,
>> but in the ResourceManager you will only see 1 vcore used per nodemanager.
>>
>> On Mon, Feb 26, 2018 at 5:20 AM, Selvam Raman  wrote:
>>
>>> Hi,
>>>
>>> spark version - 2.0.0
>>> spark distribution - EMR 5.0.0
>>>
>>> Spark Cluster - one master, 5 slaves
>>>
>>> Master node - m3.xlarge - 8 vCore, 15 GiB memory, 80 SSD GB storage
>>> Slave node - m3.2xlarge - 16 vCore, 30 GiB memory, 160 SSD GB storage
>>>
>>>
>>> Cluster Metrics
>>> Apps SubmittedApps PendingApps RunningApps CompletedContainers RunningMemory
>>> UsedMemory TotalMemory ReservedVCores UsedVCores TotalVCores ReservedActive
>>> NodesDecommissioning NodesDecommissioned NodesLost NodesUnhealthy 
>>> NodesRebooted
>>> Nodes
>>> 16 0 1 15 5 88.88 GB 90.50 GB 22 GB 5 79 1 5
>>>  0
>>>  0
>>>  5
>>>  0
>>>  0
>>> 
>>> I have submitted job with below configuration
>>> --num-executors 5 --executor-cores 10 --executor-memory 20g
>>>
>>>
>>>
>>> spark.task.cpus - be default 1
>>>
>>>
>>> My understanding is there will be 5 executore each can run 10 task at a
>>> time and task can share total memory of 20g. Here, i could see only 5
>>> vcores used which means 1 executor instance use 20g+10%overhead ram(22gb),
>>> 10 core(number of threads), 1 Vcore(cpu).
>>>
>>> please correct me if my understand is wrong.
>>>
>>> how can i utilize number of vcore in EMR effectively. Will Vcore boost
>>> performance?
>>>
>>>
>>> --
>>> Selvam Raman
>>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>>
>>
>> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>


Re: Spark EMR executor-core vs Vcores

2018-02-26 Thread Selvam Raman
Thanks. That’s make sense.

I want to know one more think , available vcore per machine is 16 but
threads per node 8. Am I missing to relate here.

What I m thinking now is number of vote = number of threads.



On Mon, 26 Feb 2018 at 18:45, Vadim Semenov  wrote:

> All used cores aren't getting reported correctly in EMR, and YARN itself
> has no control over it, so whatever you put in `spark.executor.cores` will
> be used,
> but in the ResourceManager you will only see 1 vcore used per nodemanager.
>
> On Mon, Feb 26, 2018 at 5:20 AM, Selvam Raman  wrote:
>
>> Hi,
>>
>> spark version - 2.0.0
>> spark distribution - EMR 5.0.0
>>
>> Spark Cluster - one master, 5 slaves
>>
>> Master node - m3.xlarge - 8 vCore, 15 GiB memory, 80 SSD GB storage
>> Slave node - m3.2xlarge - 16 vCore, 30 GiB memory, 160 SSD GB storage
>>
>>
>> Cluster Metrics
>> Apps SubmittedApps PendingApps RunningApps CompletedContainers RunningMemory
>> UsedMemory TotalMemory ReservedVCores UsedVCores TotalVCores ReservedActive
>> NodesDecommissioning NodesDecommissioned NodesLost NodesUnhealthy 
>> NodesRebooted
>> Nodes
>> 16 0 1 15 5 88.88 GB 90.50 GB 22 GB 5 79 1 5
>>  0
>>  0
>>  5
>>  0
>>  0
>> 
>> I have submitted job with below configuration
>> --num-executors 5 --executor-cores 10 --executor-memory 20g
>>
>>
>>
>> spark.task.cpus - be default 1
>>
>>
>> My understanding is there will be 5 executore each can run 10 task at a
>> time and task can share total memory of 20g. Here, i could see only 5
>> vcores used which means 1 executor instance use 20g+10%overhead ram(22gb),
>> 10 core(number of threads), 1 Vcore(cpu).
>>
>> please correct me if my understand is wrong.
>>
>> how can i utilize number of vcore in EMR effectively. Will Vcore boost
>> performance?
>>
>>
>> --
>> Selvam Raman
>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>
>
> --
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: Spark on K8s - using files fetched by init-container?

2018-02-26 Thread Jenna Hoole
Oh, duh. I completely forgot that file:// is a prefix I can use. Up and
running now :)

Thank you so much!
Jenna

On Mon, Feb 26, 2018 at 1:00 PM, Yinan Li  wrote:

> OK, it looks like you will need to use 
> `file:///var/spark-data/spark-files/flights.csv`
> instead. The 'file://' scheme must be explicitly used as it seems it
> defaults to 'hdfs' in your setup.
>
> On Mon, Feb 26, 2018 at 12:57 PM, Jenna Hoole 
> wrote:
>
>> Thank you for the quick response! However, I'm still having problems.
>>
>> When I try to look for /var/spark-data/spark-files/flights.csv I get
>> told:
>>
>> Error: Error in loadDF : analysis error - Path does not exist: hdfs://
>> 192.168.0.1:8020/var/spark-data/spark-files/flights.csv;
>>
>> Execution halted
>>
>> Exception in thread "main" org.apache.spark.SparkUserAppException: User
>> application exited with 1
>>
>> at org.apache.spark.deploy.RRunner$.main(RRunner.scala:104)
>>
>> at org.apache.spark.deploy.RRunner.main(RRunner.scala)
>>
>> And when I try to look for local:///var/spark-data/spark-files/flights.csv,
>> I get:
>>
>> Error in file(file, "rt") : cannot open the connection
>>
>> Calls: read.csv -> read.table -> file
>>
>> In addition: Warning message:
>>
>> In file(file, "rt") :
>>
>>   cannot open file 'local:///var/spark-data/spark-files/flights.csv': No
>> such file or directory
>>
>> Execution halted
>>
>> Exception in thread "main" org.apache.spark.SparkUserAppException: User
>> application exited with 1
>>
>> at org.apache.spark.deploy.RRunner$.main(RRunner.scala:104)
>>
>> at org.apache.spark.deploy.RRunner.main(RRunner.scala)
>>
>> I can see from a kubectl describe that the directory is getting mounted.
>>
>> Mounts:
>>
>>   /etc/hadoop/conf from hadoop-properties (rw)
>>
>>   /var/run/secrets/kubernetes.io/serviceaccount from
>> spark-token-pxz79 (ro)
>>
>>   /var/spark-data/spark-files from download-files (rw)
>>
>>   /var/spark-data/spark-jars from download-jars-volume (rw)
>>
>>   /var/spark/tmp from spark-local-dir-0-tmp (rw)
>>
>> Is there something else I need to be doing in my set up?
>>
>> Thanks,
>> Jenna
>>
>> On Mon, Feb 26, 2018 at 12:02 PM, Yinan Li  wrote:
>>
>>> The files specified through --files are localized by the init-container
>>> to /var/spark-data/spark-files by default. So in your case, the file should
>>> be located at /var/spark-data/spark-files/flights.csv locally in the
>>> container.
>>>
>>> On Mon, Feb 26, 2018 at 10:51 AM, Jenna Hoole 
>>> wrote:
>>>
 This is probably stupid user error, but I can't for the life of me
 figure out how to access the files that are staged by the init-container.

 I'm trying to run the SparkR example data-manipulation.R which requires
 the path to its datafile. I supply the hdfs location via --files and then
 the full hdfs path.


 --files hdfs://192.168.0.1:8020/user/jhoole/flights.csv
 local:///opt/spark/examples/src/main/r/data-manipulation.R hdfs://
 192.168.0.1:8020/user/jhoole/flights.csv

 The init-container seems to load my file.

 18/02/26 18:29:09 INFO spark.SparkContext: Added file hdfs://
 192.168.0.1:8020/user/jhoole/flights.csv at hdfs://
 192.168.0.1:8020/user/jhoole/flights.csv with timestamp 1519669749519

 18/02/26 18:29:09 INFO util.Utils: Fetching hdfs://
 192.168.0.1:8020/user/jhoole/flights.csv to
 /var/spark/tmp/spark-d943dae6-9b95-4df0-87a3-9f7978d6d4d2/us
 erFiles-4112b7aa-b9e7-47a9-bcbc-7f7a01f93e38/fetchFileTemp78
 72615076522023165.tmp

 However, I get an error that my file does not exist.

 Error in file(file, "rt") : cannot open the connection

 Calls: read.csv -> read.table -> file

 In addition: Warning message:

 In file(file, "rt") :

   cannot open file 'hdfs://192.168.0.1:8020/user/jhoole/flights.csv':
 No such file or directory

 Execution halted

 Exception in thread "main" org.apache.spark.SparkUserAppException:
 User application exited with 1

 at org.apache.spark.deploy.RRunner$.main(RRunner.scala:104)

 at org.apache.spark.deploy.RRunner.main(RRunner.scala)

 If I try supplying just flights.csv, I get a different error

 --files hdfs://192.168.0.1:8020/user/jhoole/flights.csv
 local:///opt/spark/examples/src/main/r/data-manipulation.R flights.csv

 Error: Error in loadDF : analysis error - Path does not exist: hdfs://
 192.168.0.1:8020/user/root/flights.csv;

 Execution halted

 Exception in thread "main" org.apache.spark.SparkUserAppException:
 User application exited with 1

 at org.apache.spark.deploy.RRunner$.main(RRunner.scala:104)

 at org.apache.spark.deploy.RRunner.main(RRunner.scala)

 If the path /user/root/flights.csv does exist and I only supply
 "flights.csv" as the file path, 

Re: Unsubscribe

2018-02-26 Thread Romero, Saul
Unsubscribe


On Mon, Feb 26, 2018 at 8:58 AM, Mallanagouda Patil <
mallanagouda.c.pa...@gmail.com> wrote:

> Unsubscribe
>


Re: Spark on K8s - using files fetched by init-container?

2018-02-26 Thread Yinan Li
The files specified through --files are localized by the init-container
to /var/spark-data/spark-files by default. So in your case, the file should
be located at /var/spark-data/spark-files/flights.csv locally in the
container.

On Mon, Feb 26, 2018 at 10:51 AM, Jenna Hoole  wrote:

> This is probably stupid user error, but I can't for the life of me figure
> out how to access the files that are staged by the init-container.
>
> I'm trying to run the SparkR example data-manipulation.R which requires
> the path to its datafile. I supply the hdfs location via --files and then
> the full hdfs path.
>
>
> --files hdfs://192.168.0.1:8020/user/jhoole/flights.csv
> local:///opt/spark/examples/src/main/r/data-manipulation.R hdfs://
> 192.168.0.1:8020/user/jhoole/flights.csv
>
> The init-container seems to load my file.
>
> 18/02/26 18:29:09 INFO spark.SparkContext: Added file hdfs://
> 192.168.0.1:8020/user/jhoole/flights.csv at hdfs://192.168.0.1:8020/user/
> jhoole/flights.csv with timestamp 1519669749519
>
> 18/02/26 18:29:09 INFO util.Utils: Fetching hdfs://192.168.0.1:8020/user/
> jhoole/flights.csv to /var/spark/tmp/spark-d943dae6-
> 9b95-4df0-87a3-9f7978d6d4d2/userFiles-4112b7aa-b9e7-47a9-
> bcbc-7f7a01f93e38/fetchFileTemp7872615076522023165.tmp
>
> However, I get an error that my file does not exist.
>
> Error in file(file, "rt") : cannot open the connection
>
> Calls: read.csv -> read.table -> file
>
> In addition: Warning message:
>
> In file(file, "rt") :
>
>   cannot open file 'hdfs://192.168.0.1:8020/user/jhoole/flights.csv': No
> such file or directory
>
> Execution halted
>
> Exception in thread "main" org.apache.spark.SparkUserAppException: User
> application exited with 1
>
> at org.apache.spark.deploy.RRunner$.main(RRunner.scala:104)
>
> at org.apache.spark.deploy.RRunner.main(RRunner.scala)
>
> If I try supplying just flights.csv, I get a different error
>
> --files hdfs://192.168.0.1:8020/user/jhoole/flights.csv
> local:///opt/spark/examples/src/main/r/data-manipulation.R flights.csv
>
> Error: Error in loadDF : analysis error - Path does not exist: hdfs://
> 192.168.0.1:8020/user/root/flights.csv;
>
> Execution halted
>
> Exception in thread "main" org.apache.spark.SparkUserAppException: User
> application exited with 1
>
> at org.apache.spark.deploy.RRunner$.main(RRunner.scala:104)
>
> at org.apache.spark.deploy.RRunner.main(RRunner.scala)
>
> If the path /user/root/flights.csv does exist and I only supply
> "flights.csv" as the file path, it runs to completion successfully.
> However, if I provide the file path as "hdfs://192.168.0.1:8020/user/
> root/flights.csv," I get the same "No such file or directory" error as I
> do initially.
>
> Since I obviously can't put all my hdfs files under /user/root, how do I
> get it to use the file that the init-container is fetching?
>
> Thanks,
> Jenna
>


Spark on K8s - using files fetched by init-container?

2018-02-26 Thread Jenna Hoole
This is probably stupid user error, but I can't for the life of me figure
out how to access the files that are staged by the init-container.

I'm trying to run the SparkR example data-manipulation.R which requires the
path to its datafile. I supply the hdfs location via --files and then the
full hdfs path.


--files hdfs://192.168.0.1:8020/user/jhoole/flights.csv
local:///opt/spark/examples/src/main/r/data-manipulation.R hdfs://
192.168.0.1:8020/user/jhoole/flights.csv

The init-container seems to load my file.

18/02/26 18:29:09 INFO spark.SparkContext: Added file hdfs://
192.168.0.1:8020/user/jhoole/flights.csv at hdfs://
192.168.0.1:8020/user/jhoole/flights.csv with timestamp 1519669749519

18/02/26 18:29:09 INFO util.Utils: Fetching hdfs://
192.168.0.1:8020/user/jhoole/flights.csv to
/var/spark/tmp/spark-d943dae6-9b95-4df0-87a3-9f7978d6d4d2/userFiles-4112b7aa-b9e7-47a9-bcbc-7f7a01f93e38/fetchFileTemp7872615076522023165.tmp

However, I get an error that my file does not exist.

Error in file(file, "rt") : cannot open the connection

Calls: read.csv -> read.table -> file

In addition: Warning message:

In file(file, "rt") :

  cannot open file 'hdfs://192.168.0.1:8020/user/jhoole/flights.csv': No
such file or directory

Execution halted

Exception in thread "main" org.apache.spark.SparkUserAppException: User
application exited with 1

at org.apache.spark.deploy.RRunner$.main(RRunner.scala:104)

at org.apache.spark.deploy.RRunner.main(RRunner.scala)

If I try supplying just flights.csv, I get a different error

--files hdfs://192.168.0.1:8020/user/jhoole/flights.csv
local:///opt/spark/examples/src/main/r/data-manipulation.R flights.csv

Error: Error in loadDF : analysis error - Path does not exist: hdfs://
192.168.0.1:8020/user/root/flights.csv;

Execution halted

Exception in thread "main" org.apache.spark.SparkUserAppException: User
application exited with 1

at org.apache.spark.deploy.RRunner$.main(RRunner.scala:104)

at org.apache.spark.deploy.RRunner.main(RRunner.scala)

If the path /user/root/flights.csv does exist and I only supply
"flights.csv" as the file path, it runs to completion successfully.
However, if I provide the file path as "hdfs://
192.168.0.1:8020/user/root/flights.csv," I get the same "No such file or
directory" error as I do initially.

Since I obviously can't put all my hdfs files under /user/root, how do I
get it to use the file that the init-container is fetching?

Thanks,
Jenna


Unsubscribe

2018-02-26 Thread purna pradeep


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark EMR executor-core vs Vcores

2018-02-26 Thread akshay naidu
Putting all cores won't solve the purpose alone, you'll have to mention
executors as well executor memory accordingly to it..

On Tue 27 Feb, 2018, 12:15 AM Vadim Semenov,  wrote:

> All used cores aren't getting reported correctly in EMR, and YARN itself
> has no control over it, so whatever you put in `spark.executor.cores` will
> be used,
> but in the ResourceManager you will only see 1 vcore used per nodemanager.
>
> On Mon, Feb 26, 2018 at 5:20 AM, Selvam Raman  wrote:
>
>> Hi,
>>
>> spark version - 2.0.0
>> spark distribution - EMR 5.0.0
>>
>> Spark Cluster - one master, 5 slaves
>>
>> Master node - m3.xlarge - 8 vCore, 15 GiB memory, 80 SSD GB storage
>> Slave node - m3.2xlarge - 16 vCore, 30 GiB memory, 160 SSD GB storage
>>
>>
>> Cluster Metrics
>> Apps SubmittedApps PendingApps RunningApps CompletedContainers RunningMemory
>> UsedMemory TotalMemory ReservedVCores UsedVCores TotalVCores ReservedActive
>> NodesDecommissioning NodesDecommissioned NodesLost NodesUnhealthy 
>> NodesRebooted
>> Nodes
>> 16 0 1 15 5 88.88 GB 90.50 GB 22 GB 5 79 1 5
>>  0
>>  0
>>  5
>>  0
>>  0
>> 
>> I have submitted job with below configuration
>> --num-executors 5 --executor-cores 10 --executor-memory 20g
>>
>>
>>
>> spark.task.cpus - be default 1
>>
>>
>> My understanding is there will be 5 executore each can run 10 task at a
>> time and task can share total memory of 20g. Here, i could see only 5
>> vcores used which means 1 executor instance use 20g+10%overhead ram(22gb),
>> 10 core(number of threads), 1 Vcore(cpu).
>>
>> please correct me if my understand is wrong.
>>
>> how can i utilize number of vcore in EMR effectively. Will Vcore boost
>> performance?
>>
>>
>> --
>> Selvam Raman
>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>
>
>


Re: Spark EMR executor-core vs Vcores

2018-02-26 Thread Vadim Semenov
All used cores aren't getting reported correctly in EMR, and YARN itself
has no control over it, so whatever you put in `spark.executor.cores` will
be used,
but in the ResourceManager you will only see 1 vcore used per nodemanager.

On Mon, Feb 26, 2018 at 5:20 AM, Selvam Raman  wrote:

> Hi,
>
> spark version - 2.0.0
> spark distribution - EMR 5.0.0
>
> Spark Cluster - one master, 5 slaves
>
> Master node - m3.xlarge - 8 vCore, 15 GiB memory, 80 SSD GB storage
> Slave node - m3.2xlarge - 16 vCore, 30 GiB memory, 160 SSD GB storage
>
>
> Cluster Metrics
> Apps SubmittedApps PendingApps RunningApps CompletedContainers RunningMemory
> UsedMemory TotalMemory ReservedVCores UsedVCores TotalVCores ReservedActive
> NodesDecommissioning NodesDecommissioned NodesLost NodesUnhealthy 
> NodesRebooted
> Nodes
> 16 0 1 15 5 88.88 GB 90.50 GB 22 GB 5 79 1 5
>  0
>  0
>  5
>  0
>  0
> 
> I have submitted job with below configuration
> --num-executors 5 --executor-cores 10 --executor-memory 20g
>
>
>
> spark.task.cpus - be default 1
>
>
> My understanding is there will be 5 executore each can run 10 task at a
> time and task can share total memory of 20g. Here, i could see only 5
> vcores used which means 1 executor instance use 20g+10%overhead ram(22gb),
> 10 core(number of threads), 1 Vcore(cpu).
>
> please correct me if my understand is wrong.
>
> how can i utilize number of vcore in EMR effectively. Will Vcore boost
> performance?
>
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>


Re: Can spark handle this scenario?

2018-02-26 Thread Lian Jiang
Thanks Vijay. After changing the programming model (create a context class
for the workers), it finally worked for me. Cheers.

On Fri, Feb 23, 2018 at 5:42 PM, vijay.bvp  wrote:

> when HTTP connection is opened you are opening a connection between
> specific
> machine (with IP and NIC card) to another specific machine, so this can't
> be
> serialized and used on other machine right!!
>
> This isn't spark limitation.
>
> I made a simple diagram if it helps. The Objects created at driver and
> passed to worker need to be serialized. The objects created at workers need
> not.
>
> In the diagram you have to create HTTPConnection on the executors
> independently of the driver.
> The HTTPConnection created at Executor-1 can be used for partitions P1-P3
> of
> RDD available on that executor.
>
> Spark is tolerant and does allow passing objects from driver to workers,
> but
> in case if it reports "Task not serializable"  it does indicate some object
> is having issue. mark the class as Serializable if you think if the object
> of it can be serialized. As I said in the beginning not everything could
> serializable particularly http connections, JDBC connections etc..
>
>  file/t8878/Picture1.png>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Unsubscribe

2018-02-26 Thread Mallanagouda Patil
Unsubscribe


RE: spark 2 new stuff

2018-02-26 Thread Stefan Panayotov
To me Delta is very valuable.

Stefan Panayotov, PhD
spanayo...@outlook.com
spanayo...@comcast.net
Cell: 610-517-5586

From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
Sent: Monday, February 26, 2018 9:26 AM
To: user @spark 
Subject: spark 2 new stuff

just a quick query.

From a practitioner's point of view what new stuff of Spark 2 have been most 
value for money.

I hear different and often conflicting stuff.Hhowever, I was wondering if the 
user group has more practical takes.


regards,




Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




spark 2 new stuff

2018-02-26 Thread Mich Talebzadeh
just a quick query.

>From a practitioner's point of view what new stuff of Spark 2 have been
most value for money.

I hear different and often conflicting stuff.Hhowever, I was wondering if
the user group has more practical takes.


regards,



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Trigger.ProcessingTime("10 seconds") & Trigger.Continuous(10.seconds)

2018-02-26 Thread naresh Goud
Thanks, I'll check it out.

On Mon, Feb 26, 2018 at 12:11 AM Tathagata Das 
wrote:

> The continuous one is our new low latency continuous processing engine in
> Structured Streaming (to be released in 2.3).
> Here is the pre-release doc -
> https://dist.apache.org/repos/dist/dev/spark/v2.3.0-rc5-docs/_site/structured-streaming-programming-guide.html#continuous-processing
>
> On Sun, Feb 25, 2018 at 12:26 PM, naresh Goud 
> wrote:
>
>> Hello Spark Experts,
>>
>> What is the difference between Trigger.Continuous(10.seconds) and
>> Trigger.ProcessingTime("10 seconds") ?
>>
>>
>>
>> Thank you,
>> Naresh
>>
>
>


Re: Spark EMR executor-core vs Vcores

2018-02-26 Thread Selvam Raman
Hi Fawze,

Yes, it is true that i am running in yarn mode, 5 containers represents
4executor and 1 master.
But i am not expecting this details as i already aware of this. What i want
to know is relationship between Vcores(Emr yarn) vs executor-core(Spark).


>From my slave configuration i understand that only 8 thread available in my
slave machine which means 8 thread run at a time at max.

Thread(s) per core:8
Core(s) per socket:1
Socket(s): 1


so i don't think so it is valid to give executore-core-10 in my
spark-submission.

On Mon, Feb 26, 2018 at 10:54 AM, Fawze Abujaber  wrote:

> It's recommended to sue executor-cores of 5.
>
> Each executor here will utilize 20 GB which mean the spark job will
> utilize 50 cpu cores and 100GB memory.
>
> You can not run more than 4 executors because your cluster doesn't have
> enough memory.
>
> Use see 5 executor because 4 for the job and one for the application
> master.
>
> serr the used menory and the total memory.
>
> On Mon, Feb 26, 2018 at 12:20 PM, Selvam Raman  wrote:
>
>> Hi,
>>
>> spark version - 2.0.0
>> spark distribution - EMR 5.0.0
>>
>> Spark Cluster - one master, 5 slaves
>>
>> Master node - m3.xlarge - 8 vCore, 15 GiB memory, 80 SSD GB storage
>> Slave node - m3.2xlarge - 16 vCore, 30 GiB memory, 160 SSD GB storage
>>
>>
>> Cluster Metrics
>> Apps SubmittedApps PendingApps RunningApps CompletedContainers RunningMemory
>> UsedMemory TotalMemory ReservedVCores UsedVCores TotalVCores ReservedActive
>> NodesDecommissioning NodesDecommissioned NodesLost NodesUnhealthy 
>> NodesRebooted
>> Nodes
>> 16 0 1 15 5 88.88 GB 90.50 GB 22 GB 5 79 1 5
>>  0
>>  0
>>  5
>>  0
>>  0
>> 
>> I have submitted job with below configuration
>> --num-executors 5 --executor-cores 10 --executor-memory 20g
>>
>>
>>
>> spark.task.cpus - be default 1
>>
>>
>> My understanding is there will be 5 executore each can run 10 task at a
>> time and task can share total memory of 20g. Here, i could see only 5
>> vcores used which means 1 executor instance use 20g+10%overhead ram(22gb),
>> 10 core(number of threads), 1 Vcore(cpu).
>>
>> please correct me if my understand is wrong.
>>
>> how can i utilize number of vcore in EMR effectively. Will Vcore boost
>> performance?
>>
>>
>> --
>> Selvam Raman
>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>
>
>


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Unsubscribe

2018-02-26 Thread purna pradeep


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Data loss in spark job

2018-02-26 Thread Faraz Mateen
 Hi,

I think I have a situation where spark is silently failing to write data to
my Cassandra table. Let me explain my current situation.

I have a table consisting of around 402 million records. The table consists
of 84 columns. Table schema is something like this:


*id (text)  |   datetime (timestamp)  |   field1 (text) | . |   field
84 (text)*


To optimize queries on the data, I am splitting it into multiple tables
using spark job mentioned below. Each separated table must have data from
just one field from the source table. New table has the following structure:


*id (text)  |   datetime (timestamp)  |   day (date)  |   value (text)*


where, "value" column will contain the field column from the source table.
Source table has around *402 million* records which is around *85 GB* of
data distributed on *3 nodes (27 + 32 + 26)*. New table being populated is
supposed to have the same number of records but it is missing some data.

Initially, I assumed some problem with the data in source table. So, I
copied 1 weeks of data from the source table into another table with the
same schema. Then I split the data like I did before but this time, field
specific table had the same number of records as the source table. I
repeated this again with another data set from another time period and
again number of records in field specific table  were equal to number of
records in the source table.

This has led me to believe that there is some problem with spark's handling
of large data set. Here is my spark submit command to separate the data:

*~/spark-2.1.0-bin-hadoop2.7/bin/spark-submit --master
spark://10.128.0.18:7077   --packages
datastax:spark-cassandra-connector:2.0.1-s_2.11 --con**f
spark.cassandra.connection.host="10.128.1.1,10.128.1.2,10.128.1.3" --conf
"spark.storage.memoryFraction=1" --conf spark.local.dir=/media/db/
--executor-memory 10G --num-executors=6 --executo**r-cores=3
--total-executor-cores 18 split_data.py*


*split_data.py* is the name of my pyspark application. It is essentially
executing the following query:


*("select id,datetime,DATE_FORMAT(datetime,'-MM-dd') as day, "+field+"
as value  from data  " )*

The spark job does not crash after these errors and warnings. However when
I check the number of records in the new table, it is always less than the
number of records in source table. Moreover, the number of records in
destination table is not the same after each run of the query. I changed
logging level for spark submit to WARN and saw the following WARNINGS and
ERRORS on the console:

https://gist.github.com/anonymous/e05f1aaa131348c9a5a9a2db6d
141f8c#file-gistfile1-txt

My cluster consists of *3 gcloud VMs*. A spark and a cassandra node is
deployed on each VM.
Each VM has *8 cores* of CPU and* 30 GB* RAM. Spark is deployed in
standalone cluster mode.
Spark version is *2.1.0*
I am using datastax spark cassandra connector version *2.0.1*
Cassandra Version is *3.9*
Each spark executor is allowed 10 GB of RAM and there are 2 executors
running on each node.

Is the problem related to my machine resources? How can I root cause or fix
this?
Any help will be greatly appreciated.

Thanks,
Faraz


Re: Spark EMR executor-core vs Vcores

2018-02-26 Thread Fawze Abujaber
It's recommended to sue executor-cores of 5.

Each executor here will utilize 20 GB which mean the spark job will utilize
50 cpu cores and 100GB memory.

You can not run more than 4 executors because your cluster doesn't have
enough memory.

Use see 5 executor because 4 for the job and one for the application master.

serr the used menory and the total memory.

On Mon, Feb 26, 2018 at 12:20 PM, Selvam Raman  wrote:

> Hi,
>
> spark version - 2.0.0
> spark distribution - EMR 5.0.0
>
> Spark Cluster - one master, 5 slaves
>
> Master node - m3.xlarge - 8 vCore, 15 GiB memory, 80 SSD GB storage
> Slave node - m3.2xlarge - 16 vCore, 30 GiB memory, 160 SSD GB storage
>
>
> Cluster Metrics
> Apps SubmittedApps PendingApps RunningApps CompletedContainers RunningMemory
> UsedMemory TotalMemory ReservedVCores UsedVCores TotalVCores ReservedActive
> NodesDecommissioning NodesDecommissioned NodesLost NodesUnhealthy 
> NodesRebooted
> Nodes
> 16 0 1 15 5 88.88 GB 90.50 GB 22 GB 5 79 1 5
>  0
>  0
>  5
>  0
>  0
> 
> I have submitted job with below configuration
> --num-executors 5 --executor-cores 10 --executor-memory 20g
>
>
>
> spark.task.cpus - be default 1
>
>
> My understanding is there will be 5 executore each can run 10 task at a
> time and task can share total memory of 20g. Here, i could see only 5
> vcores used which means 1 executor instance use 20g+10%overhead ram(22gb),
> 10 core(number of threads), 1 Vcore(cpu).
>
> please correct me if my understand is wrong.
>
> how can i utilize number of vcore in EMR effectively. Will Vcore boost
> performance?
>
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>


Re: Spark EMR executor-core vs Vcores

2018-02-26 Thread Selvam Raman
Master Node details:
lscpu
Architecture:  x86_64
CPU op-mode(s):32-bit, 64-bit
Byte Order:Little Endian
CPU(s):4
On-line CPU(s) list:   0-3
Thread(s) per core:4
Core(s) per socket:1
Socket(s): 1
NUMA node(s):  1
Vendor ID: GenuineIntel
CPU family:6
Model: 62
Model name:Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Stepping:  4
CPU MHz:   2494.066
BogoMIPS:  4988.13
Hypervisor vendor: Xen
Virtualization type:   full
L1d cache: 32K
L1i cache: 32K
L2 cache:  256K
L3 cache:  25600K
NUMA node0 CPU(s): 0-3




Slave Node Details:
Architecture:  x86_64
CPU op-mode(s):32-bit, 64-bit
Byte Order:Little Endian
CPU(s):8
On-line CPU(s) list:   0-7
Thread(s) per core:8
Core(s) per socket:1
Socket(s): 1
NUMA node(s):  1
Vendor ID: GenuineIntel
CPU family:6
Model: 62
Model name:Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Stepping:  4
CPU MHz:   2500.054
BogoMIPS:  5000.10
Hypervisor vendor: Xen
Virtualization type:   full
L1d cache: 32K
L1i cache: 32K
L2 cache:  256K
L3 cache:  25600K
NUMA node0 CPU(s): 0-7

On Mon, Feb 26, 2018 at 10:20 AM, Selvam Raman  wrote:

> Hi,
>
> spark version - 2.0.0
> spark distribution - EMR 5.0.0
>
> Spark Cluster - one master, 5 slaves
>
> Master node - m3.xlarge - 8 vCore, 15 GiB memory, 80 SSD GB storage
> Slave node - m3.2xlarge - 16 vCore, 30 GiB memory, 160 SSD GB storage
>
>
> Cluster Metrics
> Apps SubmittedApps PendingApps RunningApps CompletedContainers RunningMemory
> UsedMemory TotalMemory ReservedVCores UsedVCores TotalVCores ReservedActive
> NodesDecommissioning NodesDecommissioned NodesLost NodesUnhealthy 
> NodesRebooted
> Nodes
> 16 0 1 15 5 88.88 GB 90.50 GB 22 GB 5 79 1 5
>  0
>  0
>  5
>  0
>  0
> 
> I have submitted job with below configuration
> --num-executors 5 --executor-cores 10 --executor-memory 20g
>
>
>
> spark.task.cpus - be default 1
>
>
> My understanding is there will be 5 executore each can run 10 task at a
> time and task can share total memory of 20g. Here, i could see only 5
> vcores used which means 1 executor instance use 20g+10%overhead ram(22gb),
> 10 core(number of threads), 1 Vcore(cpu).
>
> please correct me if my understand is wrong.
>
> how can i utilize number of vcore in EMR effectively. Will Vcore boost
> performance?
>
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>



-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Spark EMR executor-core vs Vcores

2018-02-26 Thread Selvam Raman
Hi,

spark version - 2.0.0
spark distribution - EMR 5.0.0

Spark Cluster - one master, 5 slaves

Master node - m3.xlarge - 8 vCore, 15 GiB memory, 80 SSD GB storage
Slave node - m3.2xlarge - 16 vCore, 30 GiB memory, 160 SSD GB storage


Cluster Metrics
Apps SubmittedApps PendingApps RunningApps CompletedContainers RunningMemory
UsedMemory TotalMemory ReservedVCores UsedVCores TotalVCores ReservedActive
NodesDecommissioning NodesDecommissioned NodesLost NodesUnhealthy NodesRebooted
Nodes
16 0 1 15 5 88.88 GB 90.50 GB 22 GB 5 79 1 5
 0
 0
 5
 0
 0

I have submitted job with below configuration
--num-executors 5 --executor-cores 10 --executor-memory 20g



spark.task.cpus - be default 1


My understanding is there will be 5 executore each can run 10 task at a
time and task can share total memory of 20g. Here, i could see only 5
vcores used which means 1 executor instance use 20g+10%overhead ram(22gb),
10 core(number of threads), 1 Vcore(cpu).

please correct me if my understand is wrong.

how can i utilize number of vcore in EMR effectively. Will Vcore boost
performance?


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Out of memory Error when using Collection Accumulator Spark 2.2

2018-02-26 Thread Patrick
Hi,

We were getting OOM error when we are accumulating the results of each
worker. We were trying to avoid collecting data to driver node instead used
accumulator as per below code snippet,

Is there any spark config to set the accumulator settings Or am i doing the
wrong way to collect the huge data set?

  CollectionAccumulator accumulate;
  Dataset bin;

bin.foreach((ForeachFunction) row -> {
  accumulate.add(row.get(0) + "|" + row.get(1) + "|" + row.get(2));
});

accumulate.value().forEach(element -> {
  String[] arr = element.split("\\|");
  String count = arr[2];
  double percentage =
  (total == 0.0) ? 0.0 : (Double.valueOf(count) / total);
  PayloadBin payload = new PayloadBin(arr[0],
  arr[1], 0, Long.valueOf(count), percentage, sortBy, sortOrder);
  binArray.add(payload);

});


18/02/21 17:35:23 INFO storage.BlockManagerInfo: Added taskresult_5050 in
memory on rhlhddfrd225.fairisaac.com:41640 (size: 3.7 MB, free: 8.3 GB)

18/02/21 17:35:24 INFO storage.BlockManagerInfo: Removed taskresult_5034 on
rhlhddfrd218.fairisaac.com:46584 in memory (size: 3.7 MB, free: 8.4 GB)

18/02/21 17:35:25 INFO scheduler.TaskSetManager: Finished task 59.0 in
stage 20.0 (TID 5034) in 9908 ms on rhlhddfrd218.fairisaac.com (executor
92) (14/200)

Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError:
Java heap space

at java.util.Arrays.copyOf(Arrays.java:3181)

at java.util.ArrayList.toArray(ArrayList.java:376)

at
java.util.Collections$SynchronizedCollection.toArray(Collections.java:2024)

at java.util.ArrayList.(ArrayList.java:177)

at
org.apache.spark.util.CollectionAccumulator.value(AccumulatorV2.scala:470)


Is there a way to query dataframe views directly without going through scheduler?

2018-02-26 Thread kant kodali
Hi All,

I wonder if there is a way to query data frame views directly without going
through scheduler? for example.

say I have the following code

DataSet kafkaDf = session.readStream().format("kafka").load();
kafkaDf.createOrReplaceView("table")

Now Can I query the view "table" without going through job scheduling
process?

Flink, for example, has some called QueryableState and also a Client which
will ask the JobManager where QueryableState is and just get the value but
it is limited to just K/V queries. It would be a lot more powerful if that
state can be queried using SQL since that would eliminate writing to an
external data store at least for a streaming dataset.

The problem currently is I know that I can query the view "table" in spark
my submitting another Job but the response is not in milliseconds (other
words, not realtime) so we would have to write to external datastore
and query it from there.

Thanks!