Quick question about hive-exec 1.2.1.spark2

2016-08-03 Thread Tao Li
Hi,

The spark-hive module has a dependency on hive-exec module (a custom built 
module from "Hive on Spark” project). Can someone point me to the source code 
repo of the hive-exec module? Thanks.

Here is the maven repo link: 
https://mvnrepository.com/artifact/org.spark-project.hive/hive-exec/1.2.1.spark2


Re: AccumulatorV2 += operator

2016-08-03 Thread Holden Karau
Ah in that case the programming guides text is still talking about the
deprecated accumulator API despite having an updated code sample (the way
it suggests making an accumulator is also deprecated). I think the fix is
updating the programming guide rather than adding += to the API.

On Wednesday, August 3, 2016, Bryan Cutler  wrote:

> No, I was referring to the programming guide section on accumulators, it
> says " Tasks running on a cluster can then add to it using the add method
> or the += operator (in Scala and Python)."
>
> On Aug 2, 2016 2:52 PM, "Holden Karau"  > wrote:
>
>> I believe it was intentional with the idea that it would be more unified
>> between Java and Scala APIs. If your talking about the javadoc mention in
>> https://github.com/apache/spark/pull/14466/files - I believe the += is
>> meant to refer to what the internal implementation of the add function can
>> be for someone extending the accumulator (but it certainly could cause
>> confusion).
>>
>> Reynold can provide a more definitive answer in this case.
>>
>> On Tue, Aug 2, 2016 at 1:46 PM, Bryan Cutler > > wrote:
>>
>>> It seems like the += operator is missing from the new accumulator API,
>>> although the docs still make reference to it.  Anyone know if it was
>>> intentionally not put in?  I'm happy to do a PR for it or update the docs
>>> to just use the add() method, just want to check if there was some reason
>>> first.
>>>
>>> Bryan
>>>
>>
>>
>>
>> --
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau
>>
>

-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: How does MapWithStateRDD distribute the data

2016-08-03 Thread Cody Koeninger
Are you using KafkaUtils.createDirectStream?

On Wed, Aug 3, 2016 at 9:42 AM, Soumitra Johri
 wrote:
> Hi,
>
> I am running a steaming job with 4 executors and 16 cores so that each
> executor has two cores to work with. The input Kafka topic has 4 partitions.
> With this given configuration I was expecting MapWithStateRDD to be evenly
> distributed across all executors, how ever I see that it uses only two
> executors on which MapWithStateRDD data is distributed. Sometimes the data
> goes only to one executor.
>
> How can this be explained and pretty sure there would be some math to
> understand this behavior.
>
> I am using the standard standalone 1.6.2 cluster.
>
> Thanks
> Soumitra

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: AccumulatorV2 += operator

2016-08-03 Thread Bryan Cutler
No, I was referring to the programming guide section on accumulators, it
says " Tasks running on a cluster can then add to it using the add method
or the += operator (in Scala and Python)."

On Aug 2, 2016 2:52 PM, "Holden Karau"  wrote:

> I believe it was intentional with the idea that it would be more unified
> between Java and Scala APIs. If your talking about the javadoc mention in
> https://github.com/apache/spark/pull/14466/files - I believe the += is
> meant to refer to what the internal implementation of the add function can
> be for someone extending the accumulator (but it certainly could cause
> confusion).
>
> Reynold can provide a more definitive answer in this case.
>
> On Tue, Aug 2, 2016 at 1:46 PM, Bryan Cutler  wrote:
>
>> It seems like the += operator is missing from the new accumulator API,
>> although the docs still make reference to it.  Anyone know if it was
>> intentionally not put in?  I'm happy to do a PR for it or update the docs
>> to just use the add() method, just want to check if there was some reason
>> first.
>>
>> Bryan
>>
>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>


How does MapWithStateRDD distribute the data

2016-08-03 Thread Soumitra Johri
Hi,

I am running a steaming job with 4 executors and 16 cores so that each
executor has two cores to work with. The input Kafka topic has 4 partitions.
With this given configuration I was expecting MapWithStateRDD to be evenly
distributed across all executors, how ever I see that it uses only two
executors on which MapWithStateRDD data is distributed. Sometimes the data
goes only to one executor.

How can this be explained and pretty sure there would be some math to
understand this behavior.

I am using the standard standalone 1.6.2 cluster.

Thanks
Soumitra


Re: What happens in Dataset limit followed by rdd

2016-08-03 Thread Maciej Szymkiewicz
Pushing down across mapping would be great. If you're used to SQL or
work frequently with lazy collections this is a behavior you learn to
expect.

On 08/02/2016 02:12 PM, Sun Rui wrote:
> Spark does optimise subsequent limits, for example:
>
> scala> df1.limit(3).limit(1).explain
> == Physical Plan ==
> CollectLimit 1
> +- *SerializeFromObject [assertnotnull(input[0,
> $line14.$read$$iw$$iw$my, true], top level non-flat input
> object).x AS x#2]
>+- Scan ExternalRDDScan[obj#1]
>
> However, limit can not be simply pushes down across mapping functions,
> because the number of rows may change across functions. for example,
> flatMap()
>
> It seems that limit can be pushed across map() which won’t change the
> number of rows. Maybe this is a room for Spark optimisation.
>
>> On Aug 2, 2016, at 18:51, Maciej Szymkiewicz > > wrote:
>>
>> Thank you for your prompt response and great examples Sun Rui but I am
>> still confused about one thing. Do you see any particular reason to not
>> to merge subsequent limits? Following case
>>
>>(limit n (map f (limit m ds)))
>>
>> could be optimized to:
>>
>>(map f (limit n (limit m ds)))
>>
>> and further to
>>
>>(map f (limit (min n m) ds))
>>
>> couldn't it?
>>
>>
>> On 08/02/2016 11:57 AM, Sun Rui wrote:
>>> Based on your code, here is simpler test case on Spark 2.0
>>>
>>>case class my (x: Int)
>>>val rdd = sc.parallelize(0.until(1), 1000).map { x => my(x) }
>>>val df1 = spark.createDataFrame(rdd)
>>>val df2 = df1.limit(1)
>>>df1.map { r => r.getAs[Int](0) }.first
>>>df2.map { r => r.getAs[Int](0) }.first // Much slower than the
>>>previous line
>>>
>>> Actually, Dataset.first is equivalent to Dataset.limit(1).collect, so
>>> check the physical plan of the two cases:
>>>
>>>scala> df1.map { r => r.getAs[Int](0) }.limit(1).explain
>>>== Physical Plan ==
>>>CollectLimit 1
>>>+- *SerializeFromObject [input[0, int, true] AS value#124]
>>>   +- *MapElements , obj#123: int
>>>  +- *DeserializeToObject createexternalrow(x#74,
>>>StructField(x,IntegerType,false)), obj#122: org.apache.spark.sql.Row
>>> +- Scan ExistingRDD[x#74]
>>>
>>>scala> df2.map { r => r.getAs[Int](0) }.limit(1).explain
>>>== Physical Plan ==
>>>CollectLimit 1
>>>+- *SerializeFromObject [input[0, int, true] AS value#131]
>>>   +- *MapElements , obj#130: int
>>>  +- *DeserializeToObject createexternalrow(x#74,
>>>StructField(x,IntegerType,false)), obj#129: org.apache.spark.sql.Row
>>> +- *GlobalLimit 1
>>>+- Exchange SinglePartition
>>>   +- *LocalLimit 1
>>>  +- Scan ExistingRDD[x#74]
>>>
>>>
>>> For the first case, it is related to an optimisation in
>>> the CollectLimitExec physical operator. That is, it will first fetch
>>> the first partition to get limit number of row, 1 in this case, if not
>>> satisfied, then fetch more partitions, until the desired limit is
>>> reached. So generally, if the first partition is not empty, only the
>>> first partition will be calculated and fetched. Other partitions will
>>> even not be computed.
>>>
>>> However, in the second case, the optimisation in the CollectLimitExec
>>> does not help, because the previous limit operation involves a shuffle
>>> operation. All partitions will be computed, and running LocalLimit(1)
>>> on each partition to get 1 row, and then all partitions are shuffled
>>> into a single partition. CollectLimitExec will fetch 1 row from the
>>> resulted single partition.
>>>
>>>
 On Aug 2, 2016, at 09:08, Maciej Szymkiewicz
 
 > wrote:

 Hi everyone,

 This doesn't look like something expected, does it?

 http://stackoverflow.com/q/38710018/1560062

 Quick glance at the UI suggest that there is a shuffle involved and
 input for first is ShuffledRowRDD.
 -- 
 Best regards,
 Maciej Szymkiewicz
>>>
>>
>> -- 
>> Maciej Szymkiewicz
>

-- 
Maciej Szymkiewicz



Spark SQL and Kryo registration

2016-08-03 Thread Olivier Girardot
Hi everyone, I'm currently to use Spark 2.0.0 and making Dataframes work with
kryo.registrationRequired=true Is it even possible at all considering the 
codegen ?
Regards,
Olivier Girardot | Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94

Re: Spark on yarn, only 1 or 2 vcores getting allocated to the containers getting created.

2016-08-03 Thread Saisai Shao
Use dominant resource calculator instead of default resource calculator
will get the expected vcores as you wanted. Basically by default yarn does
not honor cpu cores as resource, so you will always see vcore is 1 no
matter what number of cores you set in spark.

On Wed, Aug 3, 2016 at 12:11 PM, satyajit vegesna <
satyajit.apas...@gmail.com> wrote:

> Hi All,
>
> I am trying to run a spark job using yarn, and i specify --executor-cores
> value as 20.
> But when i go check the "nodes of the cluster" page in
> http://hostname:8088/cluster/nodes then i see 4 containers getting
> created on each of the node in cluster.
>
> But can only see 1 vcore getting assigned for each containier, even when i
> specify --executor-cores 20 while submitting job using spark-submit.
>
> yarn-site.xml
> 
> yarn.scheduler.maximum-allocation-mb
> 6
> 
> 
> yarn.scheduler.minimum-allocation-vcores
> 1
> 
> 
> yarn.scheduler.maximum-allocation-vcores
> 40
> 
> 
> yarn.nodemanager.resource.memory-mb
> 7
> 
> 
> yarn.nodemanager.resource.cpu-vcores
> 20
> 
>
>
> Did anyone face the same issue??
>
> Regards,
> Satyajit.
>