dataset algos slow because of too many shuffles

2017-02-02 Thread Koert Kuipers
we noticed that some algos we ported from rdd to dataset are significantly
slower, and the main reason seems to be more shuffles that we successfully
avoid for rdds by careful partitioning. this seems to be dataset specific
as it works ok for dataframe.

see also here:
http://blog.hydronitrogen.com/2016/05/13/shuffle-free-joins-in-spark-sql/

it kind of boils down to this... if i partition and sort dataframes that
get used for joins repeatedly i can avoid shuffles:

System.setProperty("spark.sql.autoBroadcastJoinThreshold", "-1")

val df1 = Seq((0, 0), (1, 1)).toDF("key", "value")

.repartition(col("key")).sortWithinPartitions(col("key")).persist(StorageLevel.DISK_ONLY)
val df2 = Seq((0, 0), (1, 1)).toDF("key2", "value2")

.repartition(col("key2")).sortWithinPartitions(col("key2")).persist(StorageLevel.DISK_ONLY)

val joined = df1.join(df2, col("key") === col("key2"))
joined.explain

== Physical Plan ==
*SortMergeJoin [key#5], [key2#27], Inner
:- InMemoryTableScan [key#5, value#6]
: +- InMemoryRelation [key#5, value#6], true, 1, StorageLevel(disk,
1 replicas)
:   +- *Sort [key#5 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(key#5, 4)
: +- LocalTableScan [key#5, value#6]
+- InMemoryTableScan [key2#27, value2#28]
  +- InMemoryRelation [key2#27, value2#28], true, 1,
StorageLevel(disk, 1 replicas)
+- *Sort [key2#27 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(key2#27, 4)
  +- LocalTableScan [key2#27, value2#28]

notice how the persisted dataframes are not shuffled or sorted anymore
before being used in the join. however if i try to do the same with dataset
i have no luck:

val ds1 = Seq((0, 0), (1, 1)).toDS

.repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
val ds2 = Seq((0, 0), (1, 1)).toDS

.repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)

val joined1 = ds1.joinWith(ds2, ds1("_1") === ds2("_1"))
joined1.explain

== Physical Plan ==
*SortMergeJoin [_1#105._1], [_2#106._1], Inner
:- *Sort [_1#105._1 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(_1#105._1, 4)
: +- *Project [named_struct(_1, _1#83, _2, _2#84) AS _1#105]
:+- InMemoryTableScan [_1#83, _2#84]
:  +- InMemoryRelation [_1#83, _2#84], true, 1,
StorageLevel(disk, 1 replicas)
:+- *Sort [_1#83 ASC NULLS FIRST], false, 0
:   +- Exchange hashpartitioning(_1#83, 4)
:  +- LocalTableScan [_1#83, _2#84]
+- *Sort [_2#106._1 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(_2#106._1, 4)
  +- *Project [named_struct(_1, _1#100, _2, _2#101) AS _2#106]
 +- InMemoryTableScan [_1#100, _2#101]
   +- InMemoryRelation [_1#100, _2#101], true, 1,
StorageLevel(disk, 1 replicas)
 +- *Sort [_1#83 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(_1#83, 4)
   +- LocalTableScan [_1#83, _2#84]

notice how my persisted Datasets are shuffled and sorted again. part of the
issue seems to be in joinWith, which does some preprocessing that seems to
confuse the planner. if i change the joinWith to join (which returns a
dataframe) it looks a little better in that only one side gets shuffled
again, but still not optimal:

val ds1 = Seq((0, 0), (1, 1)).toDS

.repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
val ds2 = Seq((0, 0), (1, 1)).toDS

.repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)

val joined1 = ds1.join(ds2, ds1("_1") === ds2("_1"))
joined1.explain

== Physical Plan ==
*SortMergeJoin [_1#83], [_1#100], Inner
:- InMemoryTableScan [_1#83, _2#84]
: +- InMemoryRelation [_1#83, _2#84], true, 1, StorageLevel(disk, 1
replicas)
:   +- *Sort [_1#83 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(_1#83, 4)
: +- LocalTableScan [_1#83, _2#84]
+- *Sort [_1#100 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(_1#100, 4)
  +- InMemoryTableScan [_1#100, _2#101]
+- InMemoryRelation [_1#100, _2#101], true, 1,
StorageLevel(disk, 1 replicas)
  +- *Sort [_1#83 ASC NULLS FIRST], false, 0
 +- Exchange hashpartitioning(_1#83, 4)
+- LocalTableScan [_1#83, _2#84]


persistence iops and throughput check? Re: Running a spark code on multiple machines using google cloud platform

2017-02-02 Thread Heji Kim
Dear Anahita,

When we run performance tests for Spark/YARN clusters on GCP, we have to
make sure we are within iops and throughput limits.  Depending on disk type
(standard or SSD) and size of disk, you will only get so many max sustained
iops and throughput per sec. The GCP instance metrics graphs are not great
but enough to determine if you are over the limit.

https://cloud.google.com/compute/docs/disks/performance

Heji

On Thu, Feb 2, 2017 at 4:29 AM, Anahita Talebi 
wrote:

> Dear all,
>
> I am trying to run a spark code on multiple machines using submit job in
> google cloud platform.
> As the inputs of my code, I have a training and testing datasets.
>
> When I use small training data set like (10kb), the code can be
> successfully ran on the google cloud while when I have a large data set
> like 50Gb, I received the following error:
>
> 17/02/01 19:08:06 ERROR org.apache.spark.scheduler.LiveListenerBus: 
> SparkListenerBus has already stopped! Dropping event 
> SparkListenerTaskEnd(2,0,ResultTask,TaskKilled,org.apache.spark.scheduler.TaskInfo@3101f3b3,null)
>
> Does anyone can give me a hint how I can solve my problem?
>
> PS: I cannot use small training data set because I have an optimization code 
> which needs to use all the data.
>
> I have to use google could platform because I need to run the code on 
> multiple machines.
>
> Thanks a lot,
>
> Anahita
>
>


Re: Dynamic resource allocation to Spark on Mesos

2017-02-02 Thread ji yan
got it, thanks for clarifying!

On Thu, Feb 2, 2017 at 2:57 PM, Michael Gummelt 
wrote:

> Yes, that's expected.  spark.executor.cores sizes a single executor.  It
> doesn't limit the number of executors.  For that, you need spark.cores.max
> (--total-executor-cores).
>
> And rdd.parallelize does not specify the number of executors.  It
> specifies the number of partitions, which relates to the number of tasks,
> not executors.  Unless you're running with dynamic allocation enabled, the
> number of executors for your job is static, and determined at start time.
> It's not influenced by your job itself.
>
>
> On Thu, Feb 2, 2017 at 2:42 PM, Ji Yan  wrote:
>
>> I tried setting spark.executor.cores per executor, but Spark seems to be
>> spinning up as many executors as possible up to spark.cores.max or however
>> many cpu cores available on the cluster, and this may be undesirable
>> because the number of executors in rdd.parallelize(collection, # of
>> partitions) is being overriden
>>
>> On Thu, Feb 2, 2017 at 1:30 PM, Michael Gummelt 
>> wrote:
>>
>>> As of Spark 2.0, Mesos mode does support setting cores on the executor
>>> level, but you might need to set the property directly (--conf
>>> spark.executor.cores=).  I've written about this here:
>>> https://docs.mesosphere.com/1.8/usage/service-guides/spark/j
>>> ob-scheduling/.  That doc is for DC/OS, but the configuration is the
>>> same.
>>>
>>> On Thu, Feb 2, 2017 at 1:06 PM, Ji Yan  wrote:
>>>
 I was mainly confused why this is the case with memory, but with cpu
 cores, it is not specified on per executor level

 On Thu, Feb 2, 2017 at 1:02 PM, Michael Gummelt  wrote:

> It sounds like you've answered your own question, right?
> --executor-memory means the memory per executor.  If you have no executor
> w/ 200GB memory, then the driver will accept no offers.
>
> On Thu, Feb 2, 2017 at 1:01 PM, Ji Yan  wrote:
>
>> sorry, to clarify, i was using --executor-memory for memory,
>> and --total-executor-cores for cpu cores
>>
>> On Thu, Feb 2, 2017 at 12:56 PM, Michael Gummelt <
>> mgumm...@mesosphere.io> wrote:
>>
>>> What CLI args are your referring to?  I'm aware of spark-submit's
>>> arguments (--executor-memory, --total-executor-cores, and 
>>> --executor-cores)
>>>
>>> On Thu, Feb 2, 2017 at 12:41 PM, Ji Yan  wrote:
>>>
 I have done a experiment on this today. It shows that only CPUs are
 tolerant of insufficient cluster size when a job starts. On my 
 cluster, I
 have 180Gb of memory and 64 cores, when I run spark-submit ( on mesos )
 with --cpu_cores set to 1000, the job starts up with 64 cores. but 
 when I
 set --memory to 200Gb, the job fails to start with "Initial job
 has not accepted any resources; check your cluster UI to ensure that
 workers are registered and have sufficient resources"

 Also it is confusing to me that --cpu_cores specifies the number of
 cpu cores across all executors, but --memory specifies per executor 
 memory
 requirement.

 On Mon, Jan 30, 2017 at 11:34 AM, Michael Gummelt <
 mgumm...@mesosphere.io> wrote:

>
>
> On Mon, Jan 30, 2017 at 9:47 AM, Ji Yan  wrote:
>
>> Tasks begin scheduling as soon as the first executor comes up
>>
>>
>> Thanks all for the clarification. Is this the default behavior of
>> Spark on Mesos today? I think this is what we are looking for because
>> sometimes a job can take up lots of resources and later jobs could 
>> not get
>> all the resources that it asks for. If a Spark job starts with only a
>> subset of resources that it asks for, does it know to expand its 
>> resources
>> later when more resources become available?
>>
>
> Yes.
>
>
>>
>> Launch each executor with at least 1GB RAM, but if mesos offers
>>> 2GB at some moment, then launch an executor with 2GB RAM
>>
>>
>> This is less useful in our use case. But I am also quite
>> interested in cases in which this could be helpful. I think this 
>> will also
>> help with overall resource utilization on the cluster if when 
>> another job
>> starts up that has a hard requirement on resources, the extra 
>> resources to
>> the first job can be flexibly re-allocated to the second job.
>>
>> On Sat, Jan 28, 2017 at 2:32 PM, Michael Gummelt <
>> mgumm...@mesosphere.io> wrote:
>>
>>> We've talked about that, but it hasn't become a priority because
>>> 

Re: eager? in dataframe's checkpoint

2017-02-02 Thread Jean Georges Perrin
i wrote this piece based on all that, hopefully it will help:
http://jgp.net/2017/02/02/what-are-spark-checkpoints-on-dataframes/ 


> On Jan 31, 2017, at 4:18 PM, Burak Yavuz  wrote:
> 
> Hi Koert,
> 
> When eager is true, we return you a new DataFrame that depends on the files 
> written out to the checkpoint directory.
> All previous operations on the checkpointed DataFrame are gone forever. You 
> basically start fresh. AFAIK, when eager is true, the method will not return 
> until the DataFrame is completely checkpointed. If you look at the 
> RDD.checkpoint implementation, the checkpoint location is updated 
> synchronously therefore during the count, `isCheckpointed` will be true.
> 
> Best,
> Burak
> 
> On Tue, Jan 31, 2017 at 12:52 PM, Koert Kuipers  > wrote:
> i understand that checkpoint cuts the lineage, but i am not fully sure i 
> understand the role of eager. 
> 
> eager simply seems to materialize the rdd early with a count, right after the 
> rdd has been checkpointed. but why is that useful? rdd.checkpoint is 
> asynchronous, so when the rdd.count happens most likely rdd.isCheckpointed 
> will be false, and the count will be on the rdd before it was checkpointed. 
> what is the benefit of that?
> 
> 
> On Thu, Jan 26, 2017 at 11:19 PM, Burak Yavuz  > wrote:
> Hi,
> 
> One of the goals of checkpointing is to cut the RDD lineage. Otherwise you 
> run into StackOverflowExceptions. If you eagerly checkpoint, you basically 
> cut the lineage there, and the next operations all depend on the checkpointed 
> DataFrame. If you don't checkpoint, you continue to build the lineage, 
> therefore while that lineage is being resolved, you may hit the 
> StackOverflowException.
> 
> HTH,
> Burak
> 
> On Thu, Jan 26, 2017 at 10:36 AM, Jean Georges Perrin  > wrote:
> Hey Sparkers,
> 
> Trying to understand the Dataframe's checkpoint (not in the context of 
> streaming) 
> https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Dataset.html#checkpoint(boolean)
>  
> 
> 
> What is the goal of the eager flag?
> 
> Thanks!
> 
> jg
> 
> 
> 



Re: Dynamic resource allocation to Spark on Mesos

2017-02-02 Thread Michael Gummelt
Yes, that's expected.  spark.executor.cores sizes a single executor.  It
doesn't limit the number of executors.  For that, you need spark.cores.max
(--total-executor-cores).

And rdd.parallelize does not specify the number of executors.  It specifies
the number of partitions, which relates to the number of tasks, not
executors.  Unless you're running with dynamic allocation enabled, the
number of executors for your job is static, and determined at start time.
It's not influenced by your job itself.

On Thu, Feb 2, 2017 at 2:42 PM, Ji Yan  wrote:

> I tried setting spark.executor.cores per executor, but Spark seems to be
> spinning up as many executors as possible up to spark.cores.max or however
> many cpu cores available on the cluster, and this may be undesirable
> because the number of executors in rdd.parallelize(collection, # of
> partitions) is being overriden
>
> On Thu, Feb 2, 2017 at 1:30 PM, Michael Gummelt 
> wrote:
>
>> As of Spark 2.0, Mesos mode does support setting cores on the executor
>> level, but you might need to set the property directly (--conf
>> spark.executor.cores=).  I've written about this here:
>> https://docs.mesosphere.com/1.8/usage/service-guides/spark/j
>> ob-scheduling/.  That doc is for DC/OS, but the configuration is the
>> same.
>>
>> On Thu, Feb 2, 2017 at 1:06 PM, Ji Yan  wrote:
>>
>>> I was mainly confused why this is the case with memory, but with cpu
>>> cores, it is not specified on per executor level
>>>
>>> On Thu, Feb 2, 2017 at 1:02 PM, Michael Gummelt 
>>> wrote:
>>>
 It sounds like you've answered your own question, right?
 --executor-memory means the memory per executor.  If you have no executor
 w/ 200GB memory, then the driver will accept no offers.

 On Thu, Feb 2, 2017 at 1:01 PM, Ji Yan  wrote:

> sorry, to clarify, i was using --executor-memory for memory,
> and --total-executor-cores for cpu cores
>
> On Thu, Feb 2, 2017 at 12:56 PM, Michael Gummelt <
> mgumm...@mesosphere.io> wrote:
>
>> What CLI args are your referring to?  I'm aware of spark-submit's
>> arguments (--executor-memory, --total-executor-cores, and 
>> --executor-cores)
>>
>> On Thu, Feb 2, 2017 at 12:41 PM, Ji Yan  wrote:
>>
>>> I have done a experiment on this today. It shows that only CPUs are
>>> tolerant of insufficient cluster size when a job starts. On my cluster, 
>>> I
>>> have 180Gb of memory and 64 cores, when I run spark-submit ( on mesos )
>>> with --cpu_cores set to 1000, the job starts up with 64 cores. but when 
>>> I
>>> set --memory to 200Gb, the job fails to start with "Initial job has
>>> not accepted any resources; check your cluster UI to ensure that workers
>>> are registered and have sufficient resources"
>>>
>>> Also it is confusing to me that --cpu_cores specifies the number of
>>> cpu cores across all executors, but --memory specifies per executor 
>>> memory
>>> requirement.
>>>
>>> On Mon, Jan 30, 2017 at 11:34 AM, Michael Gummelt <
>>> mgumm...@mesosphere.io> wrote:
>>>


 On Mon, Jan 30, 2017 at 9:47 AM, Ji Yan  wrote:

> Tasks begin scheduling as soon as the first executor comes up
>
>
> Thanks all for the clarification. Is this the default behavior of
> Spark on Mesos today? I think this is what we are looking for because
> sometimes a job can take up lots of resources and later jobs could 
> not get
> all the resources that it asks for. If a Spark job starts with only a
> subset of resources that it asks for, does it know to expand its 
> resources
> later when more resources become available?
>

 Yes.


>
> Launch each executor with at least 1GB RAM, but if mesos offers
>> 2GB at some moment, then launch an executor with 2GB RAM
>
>
> This is less useful in our use case. But I am also quite
> interested in cases in which this could be helpful. I think this will 
> also
> help with overall resource utilization on the cluster if when another 
> job
> starts up that has a hard requirement on resources, the extra 
> resources to
> the first job can be flexibly re-allocated to the second job.
>
> On Sat, Jan 28, 2017 at 2:32 PM, Michael Gummelt <
> mgumm...@mesosphere.io> wrote:
>
>> We've talked about that, but it hasn't become a priority because
>> we haven't had a driving use case.  If anyone has a good argument for
>> "variable" resource allocation like this, please let me know.
>>
>> On Sat, Jan 28, 2017 at 9:17 AM, Shuai Lin <
>> 

Re: HBase Spark

2017-02-02 Thread Benjamin Kim
Hi Asher,

I modified the pom to be the same Spark (1.6.0), HBase (1.2.0), and Java (1.8) 
version as our installation. The Scala (2.10.5) version is already the same as 
ours. But I’m still getting the same error. Can you think of anything else?

Cheers,
Ben


> On Feb 2, 2017, at 11:06 AM, Asher Krim  wrote:
> 
> Ben,
> 
> That looks like a scala version mismatch. Have you checked your dep tree?
> 
> Asher Krim
> Senior Software Engineer
> 
> 
> On Thu, Feb 2, 2017 at 1:28 PM, Benjamin Kim  > wrote:
> Elek,
> 
> Can you give me some sample code? I can’t get mine to work.
> 
> import org.apache.spark.sql.{SQLContext, _}
> import org.apache.spark.sql.execution.datasources.hbase._
> import org.apache.spark.{SparkConf, SparkContext}
> 
> def cat = s"""{
> |"table":{"namespace":"ben", "name":"dmp_test", 
> "tableCoder":"PrimitiveType"},
> |"rowkey":"key",
> |"columns":{
> |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
> |"col1":{"cf":"d", "col":"google_gid", "type":"string"}
> |}
> |}""".stripMargin
> 
> import sqlContext.implicits._
> 
> def withCatalog(cat: String): DataFrame = {
> sqlContext
> .read
> .options(Map(HBaseTableCatalog.tableCatalog->cat))
> .format("org.apache.spark.sql.execution.datasources.hbase")
> .load()
> }
> 
> val df = withCatalog(cat)
> df.show
> 
> It gives me this error.
> 
> java.lang.NoSuchMethodError: 
> scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
>   at 
> org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:232)
>   at 
> org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:77)
>   at 
> org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:51)
>   at 
> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
> 
> If you can please help, I would be grateful.
> 
> Cheers,
> Ben
> 
> 
>> On Jan 31, 2017, at 1:02 PM, Marton, Elek > > wrote:
>> 
>> 
>> I tested this one with hbase 1.2.4:
>> 
>> https://github.com/hortonworks-spark/shc 
>> 
>> 
>> Marton
>> 
>> On 01/31/2017 09:17 PM, Benjamin Kim wrote:
>>> Does anyone know how to backport the HBase Spark module to HBase 1.2.0? I 
>>> tried to build it from source, but I cannot get it to work.
>>> 
>>> Thanks,
>>> Ben
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>>> 
>>> 
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>> 
>> 
> 
> 



Re: Dynamic resource allocation to Spark on Mesos

2017-02-02 Thread Ji Yan
I tried setting spark.executor.cores per executor, but Spark seems to be
spinning up as many executors as possible up to spark.cores.max or however
many cpu cores available on the cluster, and this may be undesirable
because the number of executors in rdd.parallelize(collection, # of
partitions) is being overriden

On Thu, Feb 2, 2017 at 1:30 PM, Michael Gummelt 
wrote:

> As of Spark 2.0, Mesos mode does support setting cores on the executor
> level, but you might need to set the property directly (--conf
> spark.executor.cores=).  I've written about this here:
> https://docs.mesosphere.com/1.8/usage/service-guides/spark/job-scheduling/.
> That doc is for DC/OS, but the configuration is the same.
>
> On Thu, Feb 2, 2017 at 1:06 PM, Ji Yan  wrote:
>
>> I was mainly confused why this is the case with memory, but with cpu
>> cores, it is not specified on per executor level
>>
>> On Thu, Feb 2, 2017 at 1:02 PM, Michael Gummelt 
>> wrote:
>>
>>> It sounds like you've answered your own question, right?
>>> --executor-memory means the memory per executor.  If you have no executor
>>> w/ 200GB memory, then the driver will accept no offers.
>>>
>>> On Thu, Feb 2, 2017 at 1:01 PM, Ji Yan  wrote:
>>>
 sorry, to clarify, i was using --executor-memory for memory,
 and --total-executor-cores for cpu cores

 On Thu, Feb 2, 2017 at 12:56 PM, Michael Gummelt <
 mgumm...@mesosphere.io> wrote:

> What CLI args are your referring to?  I'm aware of spark-submit's
> arguments (--executor-memory, --total-executor-cores, and 
> --executor-cores)
>
> On Thu, Feb 2, 2017 at 12:41 PM, Ji Yan  wrote:
>
>> I have done a experiment on this today. It shows that only CPUs are
>> tolerant of insufficient cluster size when a job starts. On my cluster, I
>> have 180Gb of memory and 64 cores, when I run spark-submit ( on mesos )
>> with --cpu_cores set to 1000, the job starts up with 64 cores. but when I
>> set --memory to 200Gb, the job fails to start with "Initial job has
>> not accepted any resources; check your cluster UI to ensure that workers
>> are registered and have sufficient resources"
>>
>> Also it is confusing to me that --cpu_cores specifies the number of
>> cpu cores across all executors, but --memory specifies per executor 
>> memory
>> requirement.
>>
>> On Mon, Jan 30, 2017 at 11:34 AM, Michael Gummelt <
>> mgumm...@mesosphere.io> wrote:
>>
>>>
>>>
>>> On Mon, Jan 30, 2017 at 9:47 AM, Ji Yan  wrote:
>>>
 Tasks begin scheduling as soon as the first executor comes up


 Thanks all for the clarification. Is this the default behavior of
 Spark on Mesos today? I think this is what we are looking for because
 sometimes a job can take up lots of resources and later jobs could not 
 get
 all the resources that it asks for. If a Spark job starts with only a
 subset of resources that it asks for, does it know to expand its 
 resources
 later when more resources become available?

>>>
>>> Yes.
>>>
>>>

 Launch each executor with at least 1GB RAM, but if mesos offers 2GB
> at some moment, then launch an executor with 2GB RAM


 This is less useful in our use case. But I am also quite interested
 in cases in which this could be helpful. I think this will also help 
 with
 overall resource utilization on the cluster if when another job starts 
 up
 that has a hard requirement on resources, the extra resources to the 
 first
 job can be flexibly re-allocated to the second job.

 On Sat, Jan 28, 2017 at 2:32 PM, Michael Gummelt <
 mgumm...@mesosphere.io> wrote:

> We've talked about that, but it hasn't become a priority because
> we haven't had a driving use case.  If anyone has a good argument for
> "variable" resource allocation like this, please let me know.
>
> On Sat, Jan 28, 2017 at 9:17 AM, Shuai Lin  > wrote:
>
>> An alternative behavior is to launch the job with the best
>>> resource offer Mesos is able to give
>>
>>
>> Michael has just made an excellent explanation about dynamic
>> allocation support in mesos. But IIUC, what you want to achieve is
>> something like (using RAM as an example) : "Launch each executor 
>> with at
>> least 1GB RAM, but if mesos offers 2GB at some moment, then launch an
>> executor with 2GB RAM".
>>
>> I wonder what's benefit of that? To reduce the "resource
>> fragmentation"?
>>
>> Anyway, that is not supported at this 

4 days left to submit your abstract to Spark Summit SF

2017-02-02 Thread Scott walent
We are just 4 days away from closing the CFP for Spark Summit 2017.

We have expanded the tracks in SF to include sessions that focus on AI,
Machine Learning and a 60 min deep dive track with technical demos.

Submit your presentation today and join us for the 10th Spark Summit!
Hurry, the CFP closes on February 6th!

https://spark-summit.org/2017/call-for-presentations/


Spark: Scala Shell Very Slow (Unresponsive)

2017-02-02 Thread jimitkr
Friends,

After i launch spark-shell, the default Scala shell appears but is
unresponsive. 

When i type any command on the shell, nothing appears on my screen 
shell is completely unresponsive. 

My server has 32 gigs of memory and approx 18 GB is empty after launching
spark-shell, so it may not be a memory issue. Is there some JVM size i need
to change somewhere?

How do i get the scala shell to work as designed?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Scala-Shell-Very-Slow-Unresponsive-tp28358.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Dynamic resource allocation to Spark on Mesos

2017-02-02 Thread Michael Gummelt
As of Spark 2.0, Mesos mode does support setting cores on the executor
level, but you might need to set the property directly (--conf
spark.executor.cores=).  I've written about this here:
https://docs.mesosphere.com/1.8/usage/service-guides/spark/job-scheduling/.
That doc is for DC/OS, but the configuration is the same.

On Thu, Feb 2, 2017 at 1:06 PM, Ji Yan  wrote:

> I was mainly confused why this is the case with memory, but with cpu
> cores, it is not specified on per executor level
>
> On Thu, Feb 2, 2017 at 1:02 PM, Michael Gummelt 
> wrote:
>
>> It sounds like you've answered your own question, right?
>> --executor-memory means the memory per executor.  If you have no executor
>> w/ 200GB memory, then the driver will accept no offers.
>>
>> On Thu, Feb 2, 2017 at 1:01 PM, Ji Yan  wrote:
>>
>>> sorry, to clarify, i was using --executor-memory for memory,
>>> and --total-executor-cores for cpu cores
>>>
>>> On Thu, Feb 2, 2017 at 12:56 PM, Michael Gummelt >> > wrote:
>>>
 What CLI args are your referring to?  I'm aware of spark-submit's
 arguments (--executor-memory, --total-executor-cores, and --executor-cores)

 On Thu, Feb 2, 2017 at 12:41 PM, Ji Yan  wrote:

> I have done a experiment on this today. It shows that only CPUs are
> tolerant of insufficient cluster size when a job starts. On my cluster, I
> have 180Gb of memory and 64 cores, when I run spark-submit ( on mesos )
> with --cpu_cores set to 1000, the job starts up with 64 cores. but when I
> set --memory to 200Gb, the job fails to start with "Initial job has
> not accepted any resources; check your cluster UI to ensure that workers
> are registered and have sufficient resources"
>
> Also it is confusing to me that --cpu_cores specifies the number of
> cpu cores across all executors, but --memory specifies per executor memory
> requirement.
>
> On Mon, Jan 30, 2017 at 11:34 AM, Michael Gummelt <
> mgumm...@mesosphere.io> wrote:
>
>>
>>
>> On Mon, Jan 30, 2017 at 9:47 AM, Ji Yan  wrote:
>>
>>> Tasks begin scheduling as soon as the first executor comes up
>>>
>>>
>>> Thanks all for the clarification. Is this the default behavior of
>>> Spark on Mesos today? I think this is what we are looking for because
>>> sometimes a job can take up lots of resources and later jobs could not 
>>> get
>>> all the resources that it asks for. If a Spark job starts with only a
>>> subset of resources that it asks for, does it know to expand its 
>>> resources
>>> later when more resources become available?
>>>
>>
>> Yes.
>>
>>
>>>
>>> Launch each executor with at least 1GB RAM, but if mesos offers 2GB
 at some moment, then launch an executor with 2GB RAM
>>>
>>>
>>> This is less useful in our use case. But I am also quite interested
>>> in cases in which this could be helpful. I think this will also help 
>>> with
>>> overall resource utilization on the cluster if when another job starts 
>>> up
>>> that has a hard requirement on resources, the extra resources to the 
>>> first
>>> job can be flexibly re-allocated to the second job.
>>>
>>> On Sat, Jan 28, 2017 at 2:32 PM, Michael Gummelt <
>>> mgumm...@mesosphere.io> wrote:
>>>
 We've talked about that, but it hasn't become a priority because we
 haven't had a driving use case.  If anyone has a good argument for
 "variable" resource allocation like this, please let me know.

 On Sat, Jan 28, 2017 at 9:17 AM, Shuai Lin 
 wrote:

> An alternative behavior is to launch the job with the best
>> resource offer Mesos is able to give
>
>
> Michael has just made an excellent explanation about dynamic
> allocation support in mesos. But IIUC, what you want to achieve is
> something like (using RAM as an example) : "Launch each executor with 
> at
> least 1GB RAM, but if mesos offers 2GB at some moment, then launch an
> executor with 2GB RAM".
>
> I wonder what's benefit of that? To reduce the "resource
> fragmentation"?
>
> Anyway, that is not supported at this moment. In all the supported
> cluster managers of spark (mesos, yarn, standalone, and the 
> up-to-coming
> spark on kubernetes), you have to specify the cores and memory of each
> executor.
>
> It may not be supported in the future, because only mesos has the
> concepts of offers because of its two-level scheduling model.
>
>
> On Sat, Jan 28, 2017 at 1:35 AM, Ji Yan  wrote:
>
>> Dear Spark Users,
>>

Re: Spark 2 + Java + UDF + unknown return type...

2017-02-02 Thread Jörn Franke
 Not sure what your udf is exactly doing, but why not on udf / type ? You avoid 
issues converting it, it is more obvious for the user of your udf etc
You could of course return a complex type with one long, one string and one 
double and you fill them in the udf as needed, but this would be probably not a 
clean solution...

> On 2 Feb 2017, at 22:05, Jean Georges Perrin  wrote:
> 
> Hi fellow Sparkans,
> 
> I am building a UDF (in Java) that can return various data types, basically 
> the signature of the function itself is:
> 
>   public Object call(String a, Object b, String c, Object d, String e) 
> throws Exception
> 
> When I register my function, I need to provide a type, e.g.:
> 
>   spark.udf().register("f2", new Udf5(), DataTypes.LongType);
> 
> In my test it is a long now, but can become a string or a float. Of course, I 
> do not know the expected return type before I call the function, which I call 
> like:
> 
>   df = df.selectExpr("*", "f2('x1', x, 'c2', y, 'op') as op");
> 
> Is there a way to have an Object being returned from a UDF and to store an 
> Object in a Dataset/dataframe? I don't need to know the datatype at that 
> point and can leave it hanging for now? Or should I play it safe and always 
> return a DataTypes.StringType (and then try to transform it if needed)?
> 
> I hope I am clear enough :).
> 
> Thanks for any tip/idea/comment...
> 
> jg


Re: Dynamic resource allocation to Spark on Mesos

2017-02-02 Thread Ji Yan
I was mainly confused why this is the case with memory, but with cpu cores,
it is not specified on per executor level

On Thu, Feb 2, 2017 at 1:02 PM, Michael Gummelt 
wrote:

> It sounds like you've answered your own question, right?
> --executor-memory means the memory per executor.  If you have no executor
> w/ 200GB memory, then the driver will accept no offers.
>
> On Thu, Feb 2, 2017 at 1:01 PM, Ji Yan  wrote:
>
>> sorry, to clarify, i was using --executor-memory for memory,
>> and --total-executor-cores for cpu cores
>>
>> On Thu, Feb 2, 2017 at 12:56 PM, Michael Gummelt 
>> wrote:
>>
>>> What CLI args are your referring to?  I'm aware of spark-submit's
>>> arguments (--executor-memory, --total-executor-cores, and --executor-cores)
>>>
>>> On Thu, Feb 2, 2017 at 12:41 PM, Ji Yan  wrote:
>>>
 I have done a experiment on this today. It shows that only CPUs are
 tolerant of insufficient cluster size when a job starts. On my cluster, I
 have 180Gb of memory and 64 cores, when I run spark-submit ( on mesos )
 with --cpu_cores set to 1000, the job starts up with 64 cores. but when I
 set --memory to 200Gb, the job fails to start with "Initial job has
 not accepted any resources; check your cluster UI to ensure that workers
 are registered and have sufficient resources"

 Also it is confusing to me that --cpu_cores specifies the number of cpu
 cores across all executors, but --memory specifies per executor memory
 requirement.

 On Mon, Jan 30, 2017 at 11:34 AM, Michael Gummelt <
 mgumm...@mesosphere.io> wrote:

>
>
> On Mon, Jan 30, 2017 at 9:47 AM, Ji Yan  wrote:
>
>> Tasks begin scheduling as soon as the first executor comes up
>>
>>
>> Thanks all for the clarification. Is this the default behavior of
>> Spark on Mesos today? I think this is what we are looking for because
>> sometimes a job can take up lots of resources and later jobs could not 
>> get
>> all the resources that it asks for. If a Spark job starts with only a
>> subset of resources that it asks for, does it know to expand its 
>> resources
>> later when more resources become available?
>>
>
> Yes.
>
>
>>
>> Launch each executor with at least 1GB RAM, but if mesos offers 2GB
>>> at some moment, then launch an executor with 2GB RAM
>>
>>
>> This is less useful in our use case. But I am also quite interested
>> in cases in which this could be helpful. I think this will also help with
>> overall resource utilization on the cluster if when another job starts up
>> that has a hard requirement on resources, the extra resources to the 
>> first
>> job can be flexibly re-allocated to the second job.
>>
>> On Sat, Jan 28, 2017 at 2:32 PM, Michael Gummelt <
>> mgumm...@mesosphere.io> wrote:
>>
>>> We've talked about that, but it hasn't become a priority because we
>>> haven't had a driving use case.  If anyone has a good argument for
>>> "variable" resource allocation like this, please let me know.
>>>
>>> On Sat, Jan 28, 2017 at 9:17 AM, Shuai Lin 
>>> wrote:
>>>
 An alternative behavior is to launch the job with the best resource
> offer Mesos is able to give


 Michael has just made an excellent explanation about dynamic
 allocation support in mesos. But IIUC, what you want to achieve is
 something like (using RAM as an example) : "Launch each executor with 
 at
 least 1GB RAM, but if mesos offers 2GB at some moment, then launch an
 executor with 2GB RAM".

 I wonder what's benefit of that? To reduce the "resource
 fragmentation"?

 Anyway, that is not supported at this moment. In all the supported
 cluster managers of spark (mesos, yarn, standalone, and the 
 up-to-coming
 spark on kubernetes), you have to specify the cores and memory of each
 executor.

 It may not be supported in the future, because only mesos has the
 concepts of offers because of its two-level scheduling model.


 On Sat, Jan 28, 2017 at 1:35 AM, Ji Yan  wrote:

> Dear Spark Users,
>
> Currently is there a way to dynamically allocate resources to
> Spark on Mesos? Within Spark we can specify the CPU cores, memory 
> before
> running job. The way I understand is that the Spark job will not run 
> if the
> CPU/Mem requirement is not met. This may lead to decrease in overall
> utilization of the cluster. An alternative behavior is to launch the 
> job
> with the best resource offer Mesos is able to give. Is this possible 

Spark 2 + Java + UDF + unknown return type...

2017-02-02 Thread Jean Georges Perrin
Hi fellow Sparkans,

I am building a UDF (in Java) that can return various data types, basically the 
signature of the function itself is:

public Object call(String a, Object b, String c, Object d, String e) 
throws Exception

When I register my function, I need to provide a type, e.g.:

spark.udf().register("f2", new Udf5(), DataTypes.LongType);

In my test it is a long now, but can become a string or a float. Of course, I 
do not know the expected return type before I call the function, which I call 
like:

df = df.selectExpr("*", "f2('x1', x, 'c2', y, 'op') as op");

Is there a way to have an Object being returned from a UDF and to store an 
Object in a Dataset/dataframe? I don't need to know the datatype at that point 
and can leave it hanging for now? Or should I play it safe and always return a 
DataTypes.StringType (and then try to transform it if needed)?

I hope I am clear enough :).

Thanks for any tip/idea/comment...

jg

Re: Dynamic resource allocation to Spark on Mesos

2017-02-02 Thread Michael Gummelt
It sounds like you've answered your own question, right?  --executor-memory
means the memory per executor.  If you have no executor w/ 200GB memory,
then the driver will accept no offers.

On Thu, Feb 2, 2017 at 1:01 PM, Ji Yan  wrote:

> sorry, to clarify, i was using --executor-memory for memory,
> and --total-executor-cores for cpu cores
>
> On Thu, Feb 2, 2017 at 12:56 PM, Michael Gummelt 
> wrote:
>
>> What CLI args are your referring to?  I'm aware of spark-submit's
>> arguments (--executor-memory, --total-executor-cores, and --executor-cores)
>>
>> On Thu, Feb 2, 2017 at 12:41 PM, Ji Yan  wrote:
>>
>>> I have done a experiment on this today. It shows that only CPUs are
>>> tolerant of insufficient cluster size when a job starts. On my cluster, I
>>> have 180Gb of memory and 64 cores, when I run spark-submit ( on mesos )
>>> with --cpu_cores set to 1000, the job starts up with 64 cores. but when I
>>> set --memory to 200Gb, the job fails to start with "Initial job has not
>>> accepted any resources; check your cluster UI to ensure that workers are
>>> registered and have sufficient resources"
>>>
>>> Also it is confusing to me that --cpu_cores specifies the number of cpu
>>> cores across all executors, but --memory specifies per executor memory
>>> requirement.
>>>
>>> On Mon, Jan 30, 2017 at 11:34 AM, Michael Gummelt <
>>> mgumm...@mesosphere.io> wrote:
>>>


 On Mon, Jan 30, 2017 at 9:47 AM, Ji Yan  wrote:

> Tasks begin scheduling as soon as the first executor comes up
>
>
> Thanks all for the clarification. Is this the default behavior of
> Spark on Mesos today? I think this is what we are looking for because
> sometimes a job can take up lots of resources and later jobs could not get
> all the resources that it asks for. If a Spark job starts with only a
> subset of resources that it asks for, does it know to expand its resources
> later when more resources become available?
>

 Yes.


>
> Launch each executor with at least 1GB RAM, but if mesos offers 2GB at
>> some moment, then launch an executor with 2GB RAM
>
>
> This is less useful in our use case. But I am also quite interested in
> cases in which this could be helpful. I think this will also help with
> overall resource utilization on the cluster if when another job starts up
> that has a hard requirement on resources, the extra resources to the first
> job can be flexibly re-allocated to the second job.
>
> On Sat, Jan 28, 2017 at 2:32 PM, Michael Gummelt <
> mgumm...@mesosphere.io> wrote:
>
>> We've talked about that, but it hasn't become a priority because we
>> haven't had a driving use case.  If anyone has a good argument for
>> "variable" resource allocation like this, please let me know.
>>
>> On Sat, Jan 28, 2017 at 9:17 AM, Shuai Lin 
>> wrote:
>>
>>> An alternative behavior is to launch the job with the best resource
 offer Mesos is able to give
>>>
>>>
>>> Michael has just made an excellent explanation about dynamic
>>> allocation support in mesos. But IIUC, what you want to achieve is
>>> something like (using RAM as an example) : "Launch each executor with at
>>> least 1GB RAM, but if mesos offers 2GB at some moment, then launch an
>>> executor with 2GB RAM".
>>>
>>> I wonder what's benefit of that? To reduce the "resource
>>> fragmentation"?
>>>
>>> Anyway, that is not supported at this moment. In all the supported
>>> cluster managers of spark (mesos, yarn, standalone, and the up-to-coming
>>> spark on kubernetes), you have to specify the cores and memory of each
>>> executor.
>>>
>>> It may not be supported in the future, because only mesos has the
>>> concepts of offers because of its two-level scheduling model.
>>>
>>>
>>> On Sat, Jan 28, 2017 at 1:35 AM, Ji Yan  wrote:
>>>
 Dear Spark Users,

 Currently is there a way to dynamically allocate resources to Spark
 on Mesos? Within Spark we can specify the CPU cores, memory before 
 running
 job. The way I understand is that the Spark job will not run if the 
 CPU/Mem
 requirement is not met. This may lead to decrease in overall 
 utilization of
 the cluster. An alternative behavior is to launch the job with the best
 resource offer Mesos is able to give. Is this possible with the current
 implementation?

 Thanks
 Ji

 The information in this email is confidential and may be legally
 privileged. It is intended solely for the addressee. Access to this 
 email
 by anyone else is unauthorized. If you are not the intended recipient, 
 any
 

Re: Dynamic resource allocation to Spark on Mesos

2017-02-02 Thread Ji Yan
sorry, to clarify, i was using --executor-memory for memory,
and --total-executor-cores for cpu cores

On Thu, Feb 2, 2017 at 12:56 PM, Michael Gummelt 
wrote:

> What CLI args are your referring to?  I'm aware of spark-submit's
> arguments (--executor-memory, --total-executor-cores, and --executor-cores)
>
> On Thu, Feb 2, 2017 at 12:41 PM, Ji Yan  wrote:
>
>> I have done a experiment on this today. It shows that only CPUs are
>> tolerant of insufficient cluster size when a job starts. On my cluster, I
>> have 180Gb of memory and 64 cores, when I run spark-submit ( on mesos )
>> with --cpu_cores set to 1000, the job starts up with 64 cores. but when I
>> set --memory to 200Gb, the job fails to start with "Initial job has not
>> accepted any resources; check your cluster UI to ensure that workers are
>> registered and have sufficient resources"
>>
>> Also it is confusing to me that --cpu_cores specifies the number of cpu
>> cores across all executors, but --memory specifies per executor memory
>> requirement.
>>
>> On Mon, Jan 30, 2017 at 11:34 AM, Michael Gummelt > > wrote:
>>
>>>
>>>
>>> On Mon, Jan 30, 2017 at 9:47 AM, Ji Yan  wrote:
>>>
 Tasks begin scheduling as soon as the first executor comes up


 Thanks all for the clarification. Is this the default behavior of Spark
 on Mesos today? I think this is what we are looking for because sometimes a
 job can take up lots of resources and later jobs could not get all the
 resources that it asks for. If a Spark job starts with only a subset of
 resources that it asks for, does it know to expand its resources later when
 more resources become available?

>>>
>>> Yes.
>>>
>>>

 Launch each executor with at least 1GB RAM, but if mesos offers 2GB at
> some moment, then launch an executor with 2GB RAM


 This is less useful in our use case. But I am also quite interested in
 cases in which this could be helpful. I think this will also help with
 overall resource utilization on the cluster if when another job starts up
 that has a hard requirement on resources, the extra resources to the first
 job can be flexibly re-allocated to the second job.

 On Sat, Jan 28, 2017 at 2:32 PM, Michael Gummelt <
 mgumm...@mesosphere.io> wrote:

> We've talked about that, but it hasn't become a priority because we
> haven't had a driving use case.  If anyone has a good argument for
> "variable" resource allocation like this, please let me know.
>
> On Sat, Jan 28, 2017 at 9:17 AM, Shuai Lin 
> wrote:
>
>> An alternative behavior is to launch the job with the best resource
>>> offer Mesos is able to give
>>
>>
>> Michael has just made an excellent explanation about dynamic
>> allocation support in mesos. But IIUC, what you want to achieve is
>> something like (using RAM as an example) : "Launch each executor with at
>> least 1GB RAM, but if mesos offers 2GB at some moment, then launch an
>> executor with 2GB RAM".
>>
>> I wonder what's benefit of that? To reduce the "resource
>> fragmentation"?
>>
>> Anyway, that is not supported at this moment. In all the supported
>> cluster managers of spark (mesos, yarn, standalone, and the up-to-coming
>> spark on kubernetes), you have to specify the cores and memory of each
>> executor.
>>
>> It may not be supported in the future, because only mesos has the
>> concepts of offers because of its two-level scheduling model.
>>
>>
>> On Sat, Jan 28, 2017 at 1:35 AM, Ji Yan  wrote:
>>
>>> Dear Spark Users,
>>>
>>> Currently is there a way to dynamically allocate resources to Spark
>>> on Mesos? Within Spark we can specify the CPU cores, memory before 
>>> running
>>> job. The way I understand is that the Spark job will not run if the 
>>> CPU/Mem
>>> requirement is not met. This may lead to decrease in overall 
>>> utilization of
>>> the cluster. An alternative behavior is to launch the job with the best
>>> resource offer Mesos is able to give. Is this possible with the current
>>> implementation?
>>>
>>> Thanks
>>> Ji
>>>
>>> The information in this email is confidential and may be legally
>>> privileged. It is intended solely for the addressee. Access to this 
>>> email
>>> by anyone else is unauthorized. If you are not the intended recipient, 
>>> any
>>> disclosure, copying, distribution or any action taken or omitted to be
>>> taken in reliance on it, is prohibited and may be unlawful.
>>>
>>
>>
>
>
> --
> Michael Gummelt
> Software Engineer
> Mesosphere
>


 The information in this email is confidential and may be legally
 privileged. It is intended solely 

Re: Dynamic resource allocation to Spark on Mesos

2017-02-02 Thread Michael Gummelt
What CLI args are your referring to?  I'm aware of spark-submit's arguments
(--executor-memory, --total-executor-cores, and --executor-cores)

On Thu, Feb 2, 2017 at 12:41 PM, Ji Yan  wrote:

> I have done a experiment on this today. It shows that only CPUs are
> tolerant of insufficient cluster size when a job starts. On my cluster, I
> have 180Gb of memory and 64 cores, when I run spark-submit ( on mesos )
> with --cpu_cores set to 1000, the job starts up with 64 cores. but when I
> set --memory to 200Gb, the job fails to start with "Initial job has not
> accepted any resources; check your cluster UI to ensure that workers are
> registered and have sufficient resources"
>
> Also it is confusing to me that --cpu_cores specifies the number of cpu
> cores across all executors, but --memory specifies per executor memory
> requirement.
>
> On Mon, Jan 30, 2017 at 11:34 AM, Michael Gummelt 
> wrote:
>
>>
>>
>> On Mon, Jan 30, 2017 at 9:47 AM, Ji Yan  wrote:
>>
>>> Tasks begin scheduling as soon as the first executor comes up
>>>
>>>
>>> Thanks all for the clarification. Is this the default behavior of Spark
>>> on Mesos today? I think this is what we are looking for because sometimes a
>>> job can take up lots of resources and later jobs could not get all the
>>> resources that it asks for. If a Spark job starts with only a subset of
>>> resources that it asks for, does it know to expand its resources later when
>>> more resources become available?
>>>
>>
>> Yes.
>>
>>
>>>
>>> Launch each executor with at least 1GB RAM, but if mesos offers 2GB at
 some moment, then launch an executor with 2GB RAM
>>>
>>>
>>> This is less useful in our use case. But I am also quite interested in
>>> cases in which this could be helpful. I think this will also help with
>>> overall resource utilization on the cluster if when another job starts up
>>> that has a hard requirement on resources, the extra resources to the first
>>> job can be flexibly re-allocated to the second job.
>>>
>>> On Sat, Jan 28, 2017 at 2:32 PM, Michael Gummelt >> > wrote:
>>>
 We've talked about that, but it hasn't become a priority because we
 haven't had a driving use case.  If anyone has a good argument for
 "variable" resource allocation like this, please let me know.

 On Sat, Jan 28, 2017 at 9:17 AM, Shuai Lin 
 wrote:

> An alternative behavior is to launch the job with the best resource
>> offer Mesos is able to give
>
>
> Michael has just made an excellent explanation about dynamic
> allocation support in mesos. But IIUC, what you want to achieve is
> something like (using RAM as an example) : "Launch each executor with at
> least 1GB RAM, but if mesos offers 2GB at some moment, then launch an
> executor with 2GB RAM".
>
> I wonder what's benefit of that? To reduce the "resource
> fragmentation"?
>
> Anyway, that is not supported at this moment. In all the supported
> cluster managers of spark (mesos, yarn, standalone, and the up-to-coming
> spark on kubernetes), you have to specify the cores and memory of each
> executor.
>
> It may not be supported in the future, because only mesos has the
> concepts of offers because of its two-level scheduling model.
>
>
> On Sat, Jan 28, 2017 at 1:35 AM, Ji Yan  wrote:
>
>> Dear Spark Users,
>>
>> Currently is there a way to dynamically allocate resources to Spark
>> on Mesos? Within Spark we can specify the CPU cores, memory before 
>> running
>> job. The way I understand is that the Spark job will not run if the 
>> CPU/Mem
>> requirement is not met. This may lead to decrease in overall utilization 
>> of
>> the cluster. An alternative behavior is to launch the job with the best
>> resource offer Mesos is able to give. Is this possible with the current
>> implementation?
>>
>> Thanks
>> Ji
>>
>> The information in this email is confidential and may be legally
>> privileged. It is intended solely for the addressee. Access to this email
>> by anyone else is unauthorized. If you are not the intended recipient, 
>> any
>> disclosure, copying, distribution or any action taken or omitted to be
>> taken in reliance on it, is prohibited and may be unlawful.
>>
>
>


 --
 Michael Gummelt
 Software Engineer
 Mesosphere

>>>
>>>
>>> The information in this email is confidential and may be legally
>>> privileged. It is intended solely for the addressee. Access to this email
>>> by anyone else is unauthorized. If you are not the intended recipient, any
>>> disclosure, copying, distribution or any action taken or omitted to be
>>> taken in reliance on it, is prohibited and may be unlawful.
>>>
>>
>>
>>
>> --
>> Michael Gummelt
>> Software Engineer

Spark 2 - Creating datasets from dataframes with extra columns

2017-02-02 Thread Don Drake
In 1.6, when you created a Dataset from a Dataframe that had extra columns,
the columns not in the case class were dropped from the Dataset.

For example in 1.6, the column c4 is gone:

scala> case class F(f1: String, f2: String, f3:String)

defined class F


scala> import sqlContext.implicits._

import sqlContext.implicits._


scala> val df = Seq(("a","b","c","x"), ("d", "e", "f","y"), ("h", "i",
"j","z")).toDF("f1", "f2", "f3", "c4")

df: org.apache.spark.sql.DataFrame = [f1: string, f2: string, f3: string,
c4: string]


scala> val ds = df.as[F]

ds: org.apache.spark.sql.Dataset[F] = [f1: string, f2: string, f3: string]


scala> ds.show

+---+---+---+

| f1| f2| f3|

+---+---+---+

|  a|  b|  c|

|  d|  e|  f|

|  h|  i|  j|


This seems to have changed in Spark 2.0 and also 2.1:

Spark 2.1.0:

scala> case class F(f1: String, f2: String, f3:String)
defined class F

scala> import spark.implicits._
import spark.implicits._

scala> val df = Seq(("a","b","c","x"), ("d", "e", "f","y"), ("h", "i",
"j","z")).toDF("f1", "f2", "f3", "c4")
df: org.apache.spark.sql.DataFrame = [f1: string, f2: string ... 2 more
fields]

scala> val ds = df.as[F]
ds: org.apache.spark.sql.Dataset[F] = [f1: string, f2: string ... 2 more
fields]

scala> ds.show
+---+---+---+---+
| f1| f2| f3| c4|
+---+---+---+---+
|  a|  b|  c|  x|
|  d|  e|  f|  y|
|  h|  i|  j|  z|
+---+---+---+---+

Is there a way to get a Dataset that conforms to the case class in Spark
2.1.0?  Basically, I'm attempting to use the case class to define an output
schema, and these extra columns are getting in the way.

Thanks.

-Don

-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake 
800-733-2143


Re: Dynamic resource allocation to Spark on Mesos

2017-02-02 Thread Ji Yan
I have done a experiment on this today. It shows that only CPUs are
tolerant of insufficient cluster size when a job starts. On my cluster, I
have 180Gb of memory and 64 cores, when I run spark-submit ( on mesos )
with --cpu_cores set to 1000, the job starts up with 64 cores. but when I
set --memory to 200Gb, the job fails to start with "Initial job has not
accepted any resources; check your cluster UI to ensure that workers are
registered and have sufficient resources"

Also it is confusing to me that --cpu_cores specifies the number of cpu
cores across all executors, but --memory specifies per executor memory
requirement.

On Mon, Jan 30, 2017 at 11:34 AM, Michael Gummelt 
wrote:

>
>
> On Mon, Jan 30, 2017 at 9:47 AM, Ji Yan  wrote:
>
>> Tasks begin scheduling as soon as the first executor comes up
>>
>>
>> Thanks all for the clarification. Is this the default behavior of Spark
>> on Mesos today? I think this is what we are looking for because sometimes a
>> job can take up lots of resources and later jobs could not get all the
>> resources that it asks for. If a Spark job starts with only a subset of
>> resources that it asks for, does it know to expand its resources later when
>> more resources become available?
>>
>
> Yes.
>
>
>>
>> Launch each executor with at least 1GB RAM, but if mesos offers 2GB at
>>> some moment, then launch an executor with 2GB RAM
>>
>>
>> This is less useful in our use case. But I am also quite interested in
>> cases in which this could be helpful. I think this will also help with
>> overall resource utilization on the cluster if when another job starts up
>> that has a hard requirement on resources, the extra resources to the first
>> job can be flexibly re-allocated to the second job.
>>
>> On Sat, Jan 28, 2017 at 2:32 PM, Michael Gummelt 
>> wrote:
>>
>>> We've talked about that, but it hasn't become a priority because we
>>> haven't had a driving use case.  If anyone has a good argument for
>>> "variable" resource allocation like this, please let me know.
>>>
>>> On Sat, Jan 28, 2017 at 9:17 AM, Shuai Lin 
>>> wrote:
>>>
 An alternative behavior is to launch the job with the best resource
> offer Mesos is able to give


 Michael has just made an excellent explanation about dynamic allocation
 support in mesos. But IIUC, what you want to achieve is something like
 (using RAM as an example) : "Launch each executor with at least 1GB RAM,
 but if mesos offers 2GB at some moment, then launch an executor with 2GB
 RAM".

 I wonder what's benefit of that? To reduce the "resource fragmentation"?

 Anyway, that is not supported at this moment. In all the supported
 cluster managers of spark (mesos, yarn, standalone, and the up-to-coming
 spark on kubernetes), you have to specify the cores and memory of each
 executor.

 It may not be supported in the future, because only mesos has the
 concepts of offers because of its two-level scheduling model.


 On Sat, Jan 28, 2017 at 1:35 AM, Ji Yan  wrote:

> Dear Spark Users,
>
> Currently is there a way to dynamically allocate resources to Spark on
> Mesos? Within Spark we can specify the CPU cores, memory before running
> job. The way I understand is that the Spark job will not run if the 
> CPU/Mem
> requirement is not met. This may lead to decrease in overall utilization 
> of
> the cluster. An alternative behavior is to launch the job with the best
> resource offer Mesos is able to give. Is this possible with the current
> implementation?
>
> Thanks
> Ji
>
> The information in this email is confidential and may be legally
> privileged. It is intended solely for the addressee. Access to this email
> by anyone else is unauthorized. If you are not the intended recipient, any
> disclosure, copying, distribution or any action taken or omitted to be
> taken in reliance on it, is prohibited and may be unlawful.
>


>>>
>>>
>>> --
>>> Michael Gummelt
>>> Software Engineer
>>> Mesosphere
>>>
>>
>>
>> The information in this email is confidential and may be legally
>> privileged. It is intended solely for the addressee. Access to this email
>> by anyone else is unauthorized. If you are not the intended recipient, any
>> disclosure, copying, distribution or any action taken or omitted to be
>> taken in reliance on it, is prohibited and may be unlawful.
>>
>
>
>
> --
> Michael Gummelt
> Software Engineer
> Mesosphere
>

-- 
 

The information in this email is confidential and may be legally 
privileged. It is intended solely for the addressee. Access to this email 
by anyone else is unauthorized. If you are not the intended recipient, any 
disclosure, copying, distribution or any action taken or omitted to be 
taken in reliance on it, is prohibited and may be 

Re: pivot over non numerical data

2017-02-02 Thread Darshan Pandya
Thanks Kevin,
Worked like a charm.
FYI for readers,
val temp1 =
temp.groupBy("reference_id").pivot("char_name").agg(max($"char_value"))

I didn't know I can use 'agg' with a string max. I was using it incorrectly
as below
temp.groupBy("reference_id").pivot("char_name").max("char_value")

On Wed, Feb 1, 2017 at 11:56 PM, Kevin Mellott 
wrote:

> This should work for non-numerical data as well - can you please elaborate
> on the error you are getting and provide a code sample? As a preliminary
> hint, you can "aggregate" text values using *max*.
>
> df.groupBy("someCol")
>   .pivot("anotherCol")
>   .agg(max($"textCol"))
>
> Thanks,
> Kevin
>
> On Wed, Feb 1, 2017 at 2:02 PM, Darshan Pandya 
> wrote:
>
>> Hello,
>>
>> I am trying to transpose some data using groupBy pivot aggr as mentioned
>> in this blog
>> https://databricks.com/blog/2016/02/09/reshaping-data-with-
>> pivot-in-apache-spark.html
>>
>> But this works only for numerical data.
>> Any hints for doing the same thing for non numerical data ?
>>
>>
>> --
>> Sincerely,
>> Darshan
>>
>>
>


-- 
Sincerely,
Darshan


Re: frustration with field names in Dataset

2017-02-02 Thread Koert Kuipers
great its an easy fix. i will create jira and pullreq

On Thu, Feb 2, 2017 at 2:13 PM, Michael Armbrust 
wrote:

> That might be reasonable.  At least I can't think of any problems with
> doing that.
>
> On Thu, Feb 2, 2017 at 7:39 AM, Koert Kuipers  wrote:
>
>> since a dataset is a typed object you ideally don't have to think about
>> field names.
>>
>> however there are operations on Dataset that require you to provide a
>> Column, like for example joinWith (and joinWith returns a strongly typed
>> Dataset, not DataFrame). once you have to provide a Column you are back to
>> thinking in field names, and worrying about duplicate field names, which is
>> something that can easily happen in a Dataset without you realizing it.
>>
>> so under the hood Dataset has unique identifiers for every column, as in
>> dataset.queryExecution.logical.output, but these are expressions
>> (attributes) that i cannot turn back into columns since the constructors
>> for this are private in spark.
>>
>> so how about having Dataset.apply(i: Int): Column to allow me to pick
>> columns by position without having to worry about (duplicate) field names?
>> then i could do something like:
>>
>> dataset.joinWith(otherDataset, dataset(0) === otherDataset(0), joinType)
>>
>
>


Re: frustration with field names in Dataset

2017-02-02 Thread Michael Armbrust
That might be reasonable.  At least I can't think of any problems with
doing that.

On Thu, Feb 2, 2017 at 7:39 AM, Koert Kuipers  wrote:

> since a dataset is a typed object you ideally don't have to think about
> field names.
>
> however there are operations on Dataset that require you to provide a
> Column, like for example joinWith (and joinWith returns a strongly typed
> Dataset, not DataFrame). once you have to provide a Column you are back to
> thinking in field names, and worrying about duplicate field names, which is
> something that can easily happen in a Dataset without you realizing it.
>
> so under the hood Dataset has unique identifiers for every column, as in
> dataset.queryExecution.logical.output, but these are expressions
> (attributes) that i cannot turn back into columns since the constructors
> for this are private in spark.
>
> so how about having Dataset.apply(i: Int): Column to allow me to pick
> columns by position without having to worry about (duplicate) field names?
> then i could do something like:
>
> dataset.joinWith(otherDataset, dataset(0) === otherDataset(0), joinType)
>


Re: HBase Spark

2017-02-02 Thread Asher Krim
Ben,

That looks like a scala version mismatch. Have you checked your dep tree?

Asher Krim
Senior Software Engineer

On Thu, Feb 2, 2017 at 1:28 PM, Benjamin Kim  wrote:

> Elek,
>
> Can you give me some sample code? I can’t get mine to work.
>
> import org.apache.spark.sql.{SQLContext, _}
> import org.apache.spark.sql.execution.datasources.hbase._
> import org.apache.spark.{SparkConf, SparkContext}
>
> def cat = s"""{
> |"table":{"namespace":"ben", "name":"dmp_test",
> "tableCoder":"PrimitiveType"},
> |"rowkey":"key",
> |"columns":{
> |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
> |"col1":{"cf":"d", "col":"google_gid", "type":"string"}
> |}
> |}""".stripMargin
>
> import sqlContext.implicits._
>
> def withCatalog(cat: String): DataFrame = {
> sqlContext
> .read
> .options(Map(HBaseTableCatalog.tableCatalog->cat))
> .format("org.apache.spark.sql.execution.datasources.hbase")
> .load()
> }
>
> val df = withCatalog(cat)
> df.show
>
>
> It gives me this error.
>
> java.lang.NoSuchMethodError: scala.runtime.ObjectRef.
> create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
> at org.apache.spark.sql.execution.datasources.hbase.
> HBaseTableCatalog$.apply(HBaseTableCatalog.scala:232)
> at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(
> HBaseRelation.scala:77)
> at org.apache.spark.sql.execution.datasources.hbase.
> DefaultSource.createRelation(HBaseRelation.scala:51)
> at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(
> ResolvedDataSource.scala:158)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
>
>
> If you can please help, I would be grateful.
>
> Cheers,
> Ben
>
>
> On Jan 31, 2017, at 1:02 PM, Marton, Elek  wrote:
>
>
> I tested this one with hbase 1.2.4:
>
> https://github.com/hortonworks-spark/shc
>
> Marton
>
> On 01/31/2017 09:17 PM, Benjamin Kim wrote:
>
> Does anyone know how to backport the HBase Spark module to HBase 1.2.0? I
> tried to build it from source, but I cannot get it to work.
>
> Thanks,
> Ben
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>


Re: HBase Spark

2017-02-02 Thread Benjamin Kim
Elek,

Can you give me some sample code? I can’t get mine to work.

import org.apache.spark.sql.{SQLContext, _}
import org.apache.spark.sql.execution.datasources.hbase._
import org.apache.spark.{SparkConf, SparkContext}

def cat = s"""{
|"table":{"namespace":"ben", "name":"dmp_test", 
"tableCoder":"PrimitiveType"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"d", "col":"google_gid", "type":"string"}
|}
|}""".stripMargin

import sqlContext.implicits._

def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->cat))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
}

val df = withCatalog(cat)
df.show

It gives me this error.

java.lang.NoSuchMethodError: 
scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
at 
org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:232)
at 
org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:77)
at 
org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:51)
at 
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)

If you can please help, I would be grateful.

Cheers,
Ben


> On Jan 31, 2017, at 1:02 PM, Marton, Elek  wrote:
> 
> 
> I tested this one with hbase 1.2.4:
> 
> https://github.com/hortonworks-spark/shc
> 
> Marton
> 
> On 01/31/2017 09:17 PM, Benjamin Kim wrote:
>> Does anyone know how to backport the HBase Spark module to HBase 1.2.0? I 
>> tried to build it from source, but I cannot get it to work.
>> 
>> Thanks,
>> Ben
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 



[ML] MLeap: Deploy Spark ML Pipelines w/o SparkContext

2017-02-02 Thread Hollin Wilkins
Hey everyone,


Some of you may have seen Mikhail and I talk at Spark/Hadoop Summits about
MLeap and how you can use it to build production services from your
Spark-trained ML pipelines. MLeap is an open-source technology that allows
Data Scientists and Engineers to deploy Spark-trained ML Pipelines and
Models to a scoring engine instantly. The MLeap execution engine has no
dependencies on a Spark context and the serialization format is entirely
based on Protobuf 3 and JSON.


The recent 0.5.0 release provides serialization and inference support for
close to 100% of Spark transformers (we don’t yet support ALS and LDA).


MLeap is open-source, take a look at our Github page:

https://github.com/combust/mleap


Or join the conversation on Gitter:

https://gitter.im/combust/mleap


We have a set of documentation to help get you started here:

http://mleap-docs.combust.ml/


We even have a set of demos, for training ML Pipelines and linear, logistic
and random forest models:

https://github.com/combust/mleap-demo


Check out our latest MLeap-serving Docker image, which allows you to expose
a REST interface to your Spark ML pipeline models:

http://mleap-docs.combust.ml/mleap-serving/


Several companies are using MLeap in production and even more are currently
evaluating it. Take a look and tell us what you think! We hope to talk with
you soon and welcome feedback/suggestions!


Sincerely,

Hollin and Mikhail


Re: frustration with field names in Dataset

2017-02-02 Thread Koert Kuipers
another example is if i have a Dataset[(K, V)] and i want to repartition it
by the key K. repartition requires a Column which means i am suddenly back
to worrying about duplicate field names. i would like to be able to say:

dataset.repartition(dataset(0))

On Thu, Feb 2, 2017 at 10:39 AM, Koert Kuipers  wrote:

> since a dataset is a typed object you ideally don't have to think about
> field names.
>
> however there are operations on Dataset that require you to provide a
> Column, like for example joinWith (and joinWith returns a strongly typed
> Dataset, not DataFrame). once you have to provide a Column you are back to
> thinking in field names, and worrying about duplicate field names, which is
> something that can easily happen in a Dataset without you realizing it.
>
> so under the hood Dataset has unique identifiers for every column, as in
> dataset.queryExecution.logical.output, but these are expressions
> (attributes) that i cannot turn back into columns since the constructors
> for this are private in spark.
>
> so how about having Dataset.apply(i: Int): Column to allow me to pick
> columns by position without having to worry about (duplicate) field names?
> then i could do something like:
>
> dataset.joinWith(otherDataset, dataset(0) === otherDataset(0), joinType)
>


frustration with field names in Dataset

2017-02-02 Thread Koert Kuipers
since a dataset is a typed object you ideally don't have to think about
field names.

however there are operations on Dataset that require you to provide a
Column, like for example joinWith (and joinWith returns a strongly typed
Dataset, not DataFrame). once you have to provide a Column you are back to
thinking in field names, and worrying about duplicate field names, which is
something that can easily happen in a Dataset without you realizing it.

so under the hood Dataset has unique identifiers for every column, as in
dataset.queryExecution.logical.output, but these are expressions
(attributes) that i cannot turn back into columns since the constructors
for this are private in spark.

so how about having Dataset.apply(i: Int): Column to allow me to pick
columns by position without having to worry about (duplicate) field names?
then i could do something like:

dataset.joinWith(otherDataset, dataset(0) === otherDataset(0), joinType)


Re: Running a spark code on multiple machines using google cloud platform

2017-02-02 Thread Anahita Talebi
Thanks for your answer.
do you mean Amazon EMR?

On Thu, Feb 2, 2017 at 2:30 PM, Marco Mistroni  wrote:

> U can use EMR if u want to run. On a cluster
> Kr
>
> On 2 Feb 2017 12:30 pm, "Anahita Talebi" 
> wrote:
>
>> Dear all,
>>
>> I am trying to run a spark code on multiple machines using submit job in
>> google cloud platform.
>> As the inputs of my code, I have a training and testing datasets.
>>
>> When I use small training data set like (10kb), the code can be
>> successfully ran on the google cloud while when I have a large data set
>> like 50Gb, I received the following error:
>>
>> 17/02/01 19:08:06 ERROR org.apache.spark.scheduler.LiveListenerBus: 
>> SparkListenerBus has already stopped! Dropping event 
>> SparkListenerTaskEnd(2,0,ResultTask,TaskKilled,org.apache.spark.scheduler.TaskInfo@3101f3b3,null)
>>
>> Does anyone can give me a hint how I can solve my problem?
>>
>> PS: I cannot use small training data set because I have an optimization code 
>> which needs to use all the data.
>>
>> I have to use google could platform because I need to run the code on 
>> multiple machines.
>>
>> Thanks a lot,
>>
>> Anahita
>>
>>


Re: Running a spark code on multiple machines using google cloud platform

2017-02-02 Thread Marco Mistroni
U can use EMR if u want to run. On a cluster
Kr

On 2 Feb 2017 12:30 pm, "Anahita Talebi"  wrote:

> Dear all,
>
> I am trying to run a spark code on multiple machines using submit job in
> google cloud platform.
> As the inputs of my code, I have a training and testing datasets.
>
> When I use small training data set like (10kb), the code can be
> successfully ran on the google cloud while when I have a large data set
> like 50Gb, I received the following error:
>
> 17/02/01 19:08:06 ERROR org.apache.spark.scheduler.LiveListenerBus: 
> SparkListenerBus has already stopped! Dropping event 
> SparkListenerTaskEnd(2,0,ResultTask,TaskKilled,org.apache.spark.scheduler.TaskInfo@3101f3b3,null)
>
> Does anyone can give me a hint how I can solve my problem?
>
> PS: I cannot use small training data set because I have an optimization code 
> which needs to use all the data.
>
> I have to use google could platform because I need to run the code on 
> multiple machines.
>
> Thanks a lot,
>
> Anahita
>
>


Running a spark code on multiple machines using google cloud platform

2017-02-02 Thread Anahita Talebi
Dear all,

I am trying to run a spark code on multiple machines using submit job in
google cloud platform.
As the inputs of my code, I have a training and testing datasets.

When I use small training data set like (10kb), the code can be
successfully ran on the google cloud while when I have a large data set
like 50Gb, I received the following error:

17/02/01 19:08:06 ERROR org.apache.spark.scheduler.LiveListenerBus:
SparkListenerBus has already stopped! Dropping event
SparkListenerTaskEnd(2,0,ResultTask,TaskKilled,org.apache.spark.scheduler.TaskInfo@3101f3b3,null)

Does anyone can give me a hint how I can solve my problem?

PS: I cannot use small training data set because I have an
optimization code which needs to use all the data.

I have to use google could platform because I need to run the code on
multiple machines.

Thanks a lot,

Anahita


Re: FP growth - Items in a transaction must be unique

2017-02-02 Thread Patrick Plaatje
Hi,

 

This indicates you have duplicate products per row in your dataframe, the FP 
implementation only allows unique products per row, so you will need to dedupe 
duplicate products before running the FPGrowth algorithm.

 

Best,

Patrick

 

From: "Devi P.V" 
Date: Thursday, 2 February 2017 at 07:17
To: "user @spark" 
Subject: FP growth - Items in a transaction must be unique

 

Hi all,

I am trying to run FP growth algorithm using spark and scala.sample input 
dataframe is following,

+---+
|productName

+---+
|Apple Iphone 7 128GB Jet Black with Facetime   

|Levi’s Blue Slim Fit Jeans- L5112,Rimmel London Lasting Finish Matte by Kate 
Moss 101 Dusky|
|Iphone 6 Plus (5.5",Limited Stocks, TRA Oman Approved) 

+---+

Each row contains unique items.

 

I converted it into rdd like following
val transactions = names.as[String].rdd.map(s =>s.split(","))

val fpg = new FPGrowth().
  setMinSupport(0.3).
  setNumPartitions(100)


val model = fpg.run(transactions)
But I got error

WARN TaskSetManager: Lost task 2.0 in stage 27.0 (TID 622, localhost):
org.apache.spark.SparkException: 
Items in a transaction must be unique but got WrappedArray(
Huawei GR3 Dual Sim 16GB 13MP 5Inch 4G,
 Huawei G8 Gold 32GB,  4G,  
5.5 Inches, HTC Desire 816 (Dual Sim, 3G, 8GB),
 Samsung Galaxy S7 Single Sim - 32GB,  4G LTE,  
Gold, Huawei P8 Lite 16GB,  4G LTE, Huawei Y625, 
Samsung Galaxy Note 5 - 32GB,  4G LTE, 
Samsung Galaxy S7 Dual Sim - 32GB)

How to solve this?

Thanks



 

 



Re: Suprised!!!!!Spark-shell showing inconsistent results

2017-02-02 Thread Marco Mistroni
Hi
 Have u tried to sort the results before comparing?


On 2 Feb 2017 10:03 am, "Alex"  wrote:

> Hi As shown below same query when ran back to back showing inconsistent
> results..
>
> testtable1 is Avro Serde table...
>
> [image: Inline image 1]
>
>
>
>  hc.sql("select * from testtable1 order by col1 limit 1").collect;
> res14: Array[org.apache.spark.sql.Row] = Array([1570,3364,201607,Y,APJ,
> PHILIPPINES,8518944,null,null,null,null,-15.992583,0.0,-15.
> 992583,null,null,MONTH_ITEM_GROUP])
>
> scala> hc.sql("select * from testtable1 order by col1 limit 1").collect;
> res15: Array[org.apache.spark.sql.Row] = Array([1570,485888,20163,N,
> AMERICAS,BRAZIL,null,null,null,null,null,6019.2999,17198.0,6019.
> 2999,null,null,QUARTER_GROUP])
>
> scala> hc.sql("select * from testtable1 order by col1 limit 1").collect;
> res16: Array[org.apache.spark.sql.Row] = Array([1570,3930,201607,Y,APJ,INDIA
> SUB-CONTINENT,8741220,null,null,null,null,-208.485216,0.
> 0,-208.485216,null,null,MONTH_ITEM_GROUP])
>
>


Re: Suprised!!!!!Spark-shell showing inconsistent results

2017-02-02 Thread Didac Gil
Is 1570 the value of Col1?
If so, you have ordered by that column and selected only the first item. It 
seems that both results have the same Col1 value, therefore any of them would 
be a right answer to return. Right?

> On 2 Feb 2017, at 11:03, Alex  wrote:
> 
> Hi As shown below same query when ran back to back showing inconsistent 
> results..
> 
> testtable1 is Avro Serde table... 
> 
> 
> 
> 
> 
>  hc.sql("select * from testtable1 order by col1 limit 1").collect;
> res14: Array[org.apache.spark.sql.Row] = 
> Array([1570,3364,201607,Y,APJ,PHILIPPINES,8518944,null,null,null,null,-15.992583,0.0,-15.992583,null,null,MONTH_ITEM_GROUP])
> 
> scala> hc.sql("select * from testtable1 order by col1 limit 1").collect;
> res15: Array[org.apache.spark.sql.Row] = 
> Array([1570,485888,20163,N,AMERICAS,BRAZIL,null,null,null,null,null,6019.2999,17198.0,6019.2999,null,null,QUARTER_GROUP])
> 
> scala> hc.sql("select * from testtable1 order by col1 limit 1").collect;
> res16: Array[org.apache.spark.sql.Row] = Array([1570,3930,201607,Y,APJ,INDIA 
> SUB-CONTINENT,8741220,null,null,null,null,-208.485216,0.0,-208.485216,null,null,MONTH_ITEM_GROUP])
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: filters Pushdown

2017-02-02 Thread vincent gromakowski
There are some native (in the doc) and some third party (in spark package
https://spark-packages.org/?q=tags%3A"Data%20Sources;)
Parquet is prefered native. Cassandra/filodb provides most advanced
pushdown.

Le 2 févr. 2017 11:23 AM, "Peter Shmukler"  a écrit :

> Hi Vincent,
>
> Thank you for answer. (I don’t see your answer in mailing list, so I’m
> answering directly)
>
>
>
> What connectors can I work with from Spark?
>
> Can you provide any link to read about it because I see nothing in Spark
> documentation?
>
>
>
>
>
> *From:* vincent gromakowski [mailto:vincent.gromakow...@gmail.com]
> *Sent:* Thursday, February 2, 2017 12:12 PM
> *To:* Peter Shmukler 
> *Cc:* user@spark.apache.org
> *Subject:* Re: filters Pushdown
>
>
>
> Pushdowns depend on the source connector.
> Join pushdown with Cassandra only
> Filter pushdown with mainly all sources with some specific constraints
>
>
>
> Le 2 févr. 2017 10:42 AM, "Peter Sg"  a écrit :
>
> Can community help me to figure out some details about Spark:
> -   Does Spark support filter Pushdown for types:
>   o Int/long
>   o DateTime
>   o String
> -   Does Spark support Pushdown of join operations for partitioned
> tables (in
> case of join condition includes partitioning field)?
> -   Does Spark support Pushdown on Parquet, ORC ?
>   o Should I use Hadoop or NTFS/NFS is option was well?
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/filters-Pushdown-tp28357.html
> 
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
> This email and any attachments thereto may contain private, confidential,
> and privileged material for the sole use of the intended recipient. Any
> review, copying, or distribution of this email (or any attachments thereto)
> by others is strictly prohibited. If you are not the intended recipient,
> please contact the sender immediately and permanently delete the original
> and any copies of this email and any attachments thereto.
>


Re: filters Pushdown

2017-02-02 Thread ayan guha
Look for spark packages website. If your questions were targeted for hive,
then i think in general all answers are yes
On Thu, 2 Feb 2017 at 9:23 pm, Peter Shmukler  wrote:

> Hi Vincent,
>
> Thank you for answer. (I don’t see your answer in mailing list, so I’m
> answering directly)
>
>
>
> What connectors can I work with from Spark?
>
> Can you provide any link to read about it because I see nothing in Spark
> documentation?
>
>
>
>
>
> *From:* vincent gromakowski [mailto:vincent.gromakow...@gmail.com]
> *Sent:* Thursday, February 2, 2017 12:12 PM
> *To:* Peter Shmukler 
> *Cc:* user@spark.apache.org
> *Subject:* Re: filters Pushdown
>
>
>
> Pushdowns depend on the source connector.
> Join pushdown with Cassandra only
> Filter pushdown with mainly all sources with some specific constraints
>
>
>
> Le 2 févr. 2017 10:42 AM, "Peter Sg"  a écrit :
>
> Can community help me to figure out some details about Spark:
> -   Does Spark support filter Pushdown for types:
>   o Int/long
>   o DateTime
>   o String
> -   Does Spark support Pushdown of join operations for partitioned
> tables (in
> case of join condition includes partitioning field)?
> -   Does Spark support Pushdown on Parquet, ORC ?
>   o Should I use Hadoop or NTFS/NFS is option was well?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/filters-Pushdown-tp28357.html
> 
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
> This email and any attachments thereto may contain private, confidential,
> and privileged material for the sole use of the intended recipient. Any
> review, copying, or distribution of this email (or any attachments thereto)
> by others is strictly prohibited. If you are not the intended recipient,
> please contact the sender immediately and permanently delete the original
> and any copies of this email and any attachments thereto.
>
-- 
Best Regards,
Ayan Guha


RE: filters Pushdown

2017-02-02 Thread Peter Shmukler
Hi Vincent,
Thank you for answer. (I don’t see your answer in mailing list, so I’m 
answering directly)

What connectors can I work with from Spark?
Can you provide any link to read about it because I see nothing in Spark 
documentation?


From: vincent gromakowski [mailto:vincent.gromakow...@gmail.com]
Sent: Thursday, February 2, 2017 12:12 PM
To: Peter Shmukler 
Cc: user@spark.apache.org
Subject: Re: filters Pushdown


Pushdowns depend on the source connector.
Join pushdown with Cassandra only
Filter pushdown with mainly all sources with some specific constraints

Le 2 févr. 2017 10:42 AM, "Peter Sg" 
> a écrit :
Can community help me to figure out some details about Spark:
-   Does Spark support filter Pushdown for types:
  o Int/long
  o DateTime
  o String
-   Does Spark support Pushdown of join operations for partitioned tables 
(in
case of join condition includes partitioning field)?
-   Does Spark support Pushdown on Parquet, ORC ?
  o Should I use Hadoop or NTFS/NFS is option was well?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/filters-Pushdown-tp28357.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org


This email and any attachments thereto may contain private, confidential, and 
privileged material for the sole use of the intended recipient. Any review, 
copying, or distribution of this email (or any attachments thereto) by others 
is strictly prohibited. If you are not the intended recipient, please contact 
the sender immediately and permanently delete the original and any copies of 
this email and any attachments thereto.


Re: filters Pushdown

2017-02-02 Thread vincent gromakowski
Pushdowns depend on the source connector.
Join pushdown with Cassandra only
Filter pushdown with mainly all sources with some specific constraints

Le 2 févr. 2017 10:42 AM, "Peter Sg"  a écrit :

> Can community help me to figure out some details about Spark:
> -   Does Spark support filter Pushdown for types:
>   o Int/long
>   o DateTime
>   o String
> -   Does Spark support Pushdown of join operations for partitioned
> tables (in
> case of join condition includes partitioning field)?
> -   Does Spark support Pushdown on Parquet, ORC ?
>   o Should I use Hadoop or NTFS/NFS is option was well?
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/filters-Pushdown-tp28357.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Closing resources in the executor

2017-02-02 Thread Appu K
https://mid.mail-archive.com/search?l=user@spark.apache.org=subject:%22Executor+shutdown+hook+and+initialization%22=newest=1

I see this thread where it is mentioned that per-partition resource
management is recommended over global state(within an executor)
What would be the way to achieve this in data-frames

Is shutdown hook the only solution right now ?

thanks
sajith


On 2 February 2017 at 11:58:27 AM, Appu K (kut...@gmail.com) wrote:



What would be the recommended way to close resources opened or shared by
executors?

A few use cases

#1) Let's say the enrichment process needs to convert ip / lat+long to
city/country. To achieve this, executors could open a file in the hdfs and
build a map or use a memory mapped file  - the implementation could be a
transient lazy val singleton or something similar .  Now, the udf defined
would perform lookups on these data structures and return geo data.

#2) Let's say there is a need to do a lookup on a KV store like redis from
the executor. Each executor would create a connection pool and provide
connections for tasks running in them to perform lookups.

In scenarios, like this when the executor is closed, what would be the best
way to close the open resources ( streams etc)


Any pointers to places where i could read up a bit more about the best
practices around it would be highly appreciated!

thanks
appu


Suprised!!!!!Spark-shell showing inconsistent results

2017-02-02 Thread Alex
Hi As shown below same query when ran back to back showing inconsistent
results..

testtable1 is Avro Serde table...

[image: Inline image 1]



 hc.sql("select * from testtable1 order by col1 limit 1").collect;
res14: Array[org.apache.spark.sql.Row] =
Array([1570,3364,201607,Y,APJ,PHILIPPINES,8518944,null,null,null,null,-15.992583,0.0,-15.992583,null,null,MONTH_ITEM_GROUP])

scala> hc.sql("select * from testtable1 order by col1 limit 1").collect;
res15: Array[org.apache.spark.sql.Row] =
Array([1570,485888,20163,N,AMERICAS,BRAZIL,null,null,null,null,null,6019.2999,17198.0,6019.2999,null,null,QUARTER_GROUP])

scala> hc.sql("select * from testtable1 order by col1 limit 1").collect;
res16: Array[org.apache.spark.sql.Row] =
Array([1570,3930,201607,Y,APJ,INDIA
SUB-CONTINENT,8741220,null,null,null,null,-208.485216,0.0,-208.485216,null,null,MONTH_ITEM_GROUP])


filters Pushdown

2017-02-02 Thread Peter Sg
Can community help me to figure out some details about Spark:
-   Does Spark support filter Pushdown for types:
  o Int/long
  o DateTime
  o String
-   Does Spark support Pushdown of join operations for partitioned tables 
(in
case of join condition includes partitioning field)?
-   Does Spark support Pushdown on Parquet, ORC ?
  o Should I use Hadoop or NTFS/NFS is option was well?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/filters-Pushdown-tp28357.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Is it okay to run Hive Java UDFS in Spark-sql. Anybody's still doing it?

2017-02-02 Thread Jörn Franke

There are many performance aspects here which may not only related to the UDF 
itself, but on configuration of platform, data etc.

You seem to have a performance problem with your UDFs. Maybe you can elaborate 
on 
1) what data you process (format, etc)
2) what you try to Analyse
3) how you implemented your udfs. Maybe the implementation is not optimal and 
then simply moving it from hive to spark does not give you any benefits. Bad 
code is still bad code in SparkSql


> On 2 Feb 2017, at 09:33, Alex  wrote:
> 
> Hi Team,
> 
> Do you really think if we make Hive Java UDF's to run on spark-sql  it will 
> make performance difference???  IS anybody here actually doing it.. 
> converting Hive UDF's to run on Spark-sql..
> 
> What would be your approach if asked to make Hive Java UDFS project run on 
> spark-sql
> 
> Would yu run the same java UDF using Spark-sql
>  
> or 
> 
> You would recode all java UDF to scala UDF and then run?
> 
> 
> Regards,
> Alex

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Is it okay to run Hive Java UDFS in Spark-sql. Anybody's still doing it?

2017-02-02 Thread Alex
Hi Team,

Do you really think if we make Hive Java UDF's to run on spark-sql  it will
make performance difference???  IS anybody here actually doing it..
converting Hive UDF's to run on Spark-sql..

What would be your approach if asked to make Hive Java UDFS project run on
spark-sql

Would yu run the same java UDF using Spark-sql

or

You would recode all java UDF to scala UDF and then run?


Regards,
Alex