Re: Spark 2.4.1 on Kubernetes - DNS resolution of driver fails

2019-05-03 Thread Olivier Girardot
Hi,
I did not try on another vendor, so I can't say if it's only related to
gke, and no, I did not notice anything on the kubelet or kube-dns
processes...

Regards

Le ven. 3 mai 2019 à 03:05, Li Gao  a écrit :

> hi Olivier,
>
> This seems a GKE specific issue? have you tried on other vendors ? Also on
> the kubelet nodes did you notice any pressure on the DNS side?
>
> Li
>
>
> On Mon, Apr 29, 2019, 5:43 AM Olivier Girardot <
> o.girar...@lateral-thoughts.com> wrote:
>
>> Hi everyone,
>> I have ~300 spark job on Kubernetes (GKE) using the cluster auto-scaler,
>> and sometimes while running these jobs a pretty bad thing happens, the
>> driver (in cluster mode) gets scheduled on Kubernetes and launches many
>> executor pods.
>> So far so good, but the k8s "Service" associated to the driver does not
>> seem to be propagated in terms of DNS resolution so all the executor fails
>> with a "spark-application-..cluster.svc.local" does not exists.
>>
>> All executors failing the driver should be failing too, but it considers
>> that it's a "pending" initial allocation and stay stuck forever in a loop
>> of "Initial job has not accepted any resources, please check Cluster UI"
>>
>> Has anyone else observed this king of behaviour ?
>> We had it on 2.3.1 and I upgraded to 2.4.1 but this issue still seems to
>> exist even after the "big refactoring" in the kubernetes cluster scheduler
>> backend.
>>
>> I can work on a fix / workaround but I'd like to check with you the
>> proper way forward :
>>
>>- Some processes (like the airflow helm recipe) rely on a "sleep 30s"
>>before launching the dependent pods (that could be added to
>>/opt/entrypoint.sh used in the kubernetes packing)
>>- We can add a simple step to the init container trying to do the DNS
>>resolution and failing after 60s if it did not work
>>
>> But these steps won't change the fact that the driver will stay stuck
>> thinking we're still in the case of the Initial allocation delay.
>>
>> Thoughts ?
>>
>> --
>> *Olivier Girardot*
>> o.girar...@lateral-thoughts.com
>>
>


Spark 2.4.1 on Kubernetes - DNS resolution of driver fails

2019-04-29 Thread Olivier Girardot
Hi everyone,
I have ~300 spark job on Kubernetes (GKE) using the cluster auto-scaler,
and sometimes while running these jobs a pretty bad thing happens, the
driver (in cluster mode) gets scheduled on Kubernetes and launches many
executor pods.
So far so good, but the k8s "Service" associated to the driver does not
seem to be propagated in terms of DNS resolution so all the executor fails
with a "spark-application-..cluster.svc.local" does not exists.

All executors failing the driver should be failing too, but it considers
that it's a "pending" initial allocation and stay stuck forever in a loop
of "Initial job has not accepted any resources, please check Cluster UI"

Has anyone else observed this king of behaviour ?
We had it on 2.3.1 and I upgraded to 2.4.1 but this issue still seems to
exist even after the "big refactoring" in the kubernetes cluster scheduler
backend.

I can work on a fix / workaround but I'd like to check with you the proper
way forward :

   - Some processes (like the airflow helm recipe) rely on a "sleep 30s"
   before launching the dependent pods (that could be added to
   /opt/entrypoint.sh used in the kubernetes packing)
   - We can add a simple step to the init container trying to do the DNS
   resolution and failing after 60s if it did not work

But these steps won't change the fact that the driver will stay stuck
thinking we're still in the case of the Initial allocation delay.

Thoughts ?

-- 
*Olivier Girardot*
o.girar...@lateral-thoughts.com


Back to SQL

2018-10-03 Thread Olivier Girardot
Hi everyone,
Is there any known way to go from a Spark SQL Logical Plan (optimised ?)
Back to a SQL query ?

Regards,

Olivier.


Spark Structured Streaming and compacted topic in Kafka

2017-09-06 Thread Olivier Girardot
Hi everyone,
I'm aware of the issue regarding direct stream 0.10 consumer in spark and
compacted topics (c.f. https://issues.apache.org/jira/browse/SPARK-17147).
Is there any chance that spark structured-streaming kafka is compatible
with compacted topics ?

Regards,

-- 
*Olivier Girardot*


Nested "struct" fonction call creates a compilation error in Spark SQL

2017-06-15 Thread Olivier Girardot
Hi everyone,
when we create recursive calls to "struct" (up to 5 levels) for extending a
complex datastructure we end up with the following compilation error :

org.codehaus.janino.JaninoRuntimeException: Code of method
"(I[Lscala/collection/Iterator;)V" of class
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator"
grows beyond 64 KB

The CreateStruct code itself is properly using the ctx.splitExpression
command but the "end result" of the df.select( struct(struct(struct()
))) ends up being too much.

Should I open a JIRA or is there a workaround ?

Regards,

-- 
*Olivier Girardot* | Associé
o.girar...@lateral-thoughts.com


Re: Pyspark 2.1.0 weird behavior with repartition

2017-03-11 Thread Olivier Girardot
I kinda reproduced that, with pyspark 2.1 also for hadoop 2.6 and with
python 3.x
I'll look into it a bit more after I've fixed a few other issues regarding
the salting of strings on the cluster.

2017-01-30 20:19 GMT+01:00 Blaž Šnuderl <snud...@gmail.com>:

> I am loading a simple text file using pyspark. Repartitioning it seems to
> produce garbage data.
>
> I got this results using spark 2.1 prebuilt for hadoop 2.7 using pyspark
> shell.
>
> >>> sc.textFile("outc").collect()
> [u'a', u'b', u'c', u'd', u'e', u'f', u'g', u'h', u'i', u'j', u'k', u'l']
> >>> sc.textFile("outc", use_unicode=False).collect()
> ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l']
>
> Repartitioning seems to produce garbarge and also only only 2 records here
> >>> sc.textFile("outc", use_unicode=False).repartition(10).collect()
> ['\x80\x02]q\x01(U\x01aU\x01bU\x01cU\x01dU\x01eU\x01fU\x01ge.',
> '\x80\x02]q\x01(U\x01hU\x01iU\x01jU\x01kU\x01le.']
> >>> sc.textFile("outc", use_unicode=False).repartition(10).count()
> 2
>
>
> Without setting use_unicode=False we can't even repartition at all
> >>> sc.textFile("outc").repartition(19).collect()
> Traceback (most recent call last):
>   File "", line 1, in 
>   File
> "/home/snuderl/scrappstore/thirdparty/spark-2.1.0-bin-hadoop
> 2.7/python/pyspark/rdd.py",
> line 810, in collect
> return list(_load_from_socket(port, self._jrdd_deserializer))
>   File
> "/home/snuderl/scrappstore/thirdparty/spark-2.1.0-bin-hadoop
> 2.7/python/pyspark/rdd.py",
> line 140, in _load_from_socket
> for item in serializer.load_stream(rf):
>   File
> "/home/snuderl/scrappstore/thirdparty/spark-2.1.0-bin-hadoop
> 2.7/python/pyspark/serializers.py",
> line 529, in load_stream
> yield self.loads(stream)
>   File
> "/home/snuderl/scrappstore/thirdparty/spark-2.1.0-bin-hadoop
> 2.7/python/pyspark/serializers.py",
> line 524, in loads
> return s.decode("utf-8") if self.use_unicode else s
>   File
> "/home/snuderl/scrappstore/virtualenv/lib/python2.7/encodings/utf_8.py",
> line 16, in decode
> return codecs.utf_8_decode(input, errors, True)
> UnicodeDecodeError: 'utf8' codec can't decode byte 0x80 in position 0:
> invalid start byte
>
>
>
> Input file contents:
> a
> b
> c
> d
> e
> f
> g
> h
> i
> j
> k
> l
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Pyspark-2-1-0-weird-behavior-with-repa
> rtition-tp28350.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
*Olivier Girardot* | Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94


Re: Nested ifs in sparksql

2017-01-10 Thread Olivier Girardot
Are you using the "case when" functions ? what do you mean by slow ? can you
share a snippet ?
 





On Tue, Jan 10, 2017 8:15 PM, Georg Heiler georg.kf.hei...@gmail.com
wrote:
Maybe you can create an UDF?
Raghavendra Pandey <raghavendra.pan...@gmail.com> schrieb am Di., 10. Jan. 2017
um 20:04 Uhr:
I have of around 41 level of nested if else in spark sql. I have programmed it
using apis on dataframe. But it takes too much time. 
Is there anything I can do to improve on time here? 



Olivier Girardot| Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94

Re: Could not parse Master URL for Mesos on Spark 2.1.0

2017-01-10 Thread Olivier Girardot
nop, there is no "distribution", no spark-submit at the start of my process.But
I found the problem, the behavior when loading mesos native dependency changed,
and the static initialization block inside org.apache.mesos.MesosSchedulerDriver
needed the specific reference to libmesos-1.0.0.so.
So just for the record, setting the env variable
MESOS_NATIVE_JAVA_LIBRARY="//libmesos-1.0.0.so" fixed the whole thing.
Thanks for the help !
@michael if you want to talk about the setup we're using, we can talk about it
directly .
 





On Tue, Jan 10, 2017 9:31 PM, Michael Gummelt mgumm...@mesosphere.io
wrote:
What do you mean your driver has all the dependencies packaged?  What are "all
the dependencies"?  Is the distribution you use to launch your driver built with
-Pmesos?

On Tue, Jan 10, 2017 at 12:18 PM, Olivier Girardot <
o.girar...@lateral-thoughts.com>  wrote:
Hi Michael,I did so, but it's not exactly the problem, you see my driver has all
the dependencies packaged, and only the executors fetch via the
spark.executor.uri the tgz,The strange thing is that I see in my classpath the
org.apache.mesos:mesos-1.0.0-shaded-protobuf dependency packaged in the final
dist of my app…So everything should work in theory.

 





On Tue, Jan 10, 2017 7:22 PM, Michael Gummelt mgumm...@mesosphere.io
wrote:
Just build with -Pmesos 
http://spark.apache.org/docs/latest/building-spark.html#
building-with-mesos-support

On Tue, Jan 10, 2017 at 8:56 AM, Olivier Girardot <
o.girar...@lateral-thoughts.com>  wrote:
I had the same problem, added spark-mesos as dependency and now I get :
[2017-01-10 17:45:16,575] {bash_operator.py:77} INFO - Exception in thread
"main" java.lang.NoClassDefFoundError: Could not initialize class
org.apache.mesos.MesosSchedulerDriver[2017-01-10 17:45:16,576]
{bash_operator.py:77} INFO - at org.apache.spark.scheduler.clu
ster.mesos.MesosSchedulerUtils$class.createSchedulerDriver(M
esosSchedulerUtils.scala:105)[2017-01-10 17:45:16,576] {bash_operator.py:77}
INFO - at org.apache.spark.scheduler.cluster.mesos.MesosCoarseGrainedS
chedulerBackend.createSchedulerDriver(MesosCoarseGrainedSchedulerBackend.
scala:48)[2017-01-10 17:45:16,576] {bash_operator.py:77} INFO - at
org.apache.spark.scheduler.cluster.mesos.MesosCoarseGrainedS
chedulerBackend.start(MesosCoarseGrainedSchedulerBackend.scala:155)[2017-01-10
17:45:16,577] {bash_operator.py:77} INFO - at org.apache.spark.scheduler.Tas
kSchedulerImpl.start(TaskSchedulerImpl.scala:156)[2017-01-10 17:45:16,577]
{bash_operator.py:77} INFO - at org.apache.spark.SparkContext.
(SparkContext.scala:509)[2017-01-10 17:45:16,577] {bash_operator.py:77}
INFO - at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2313)
[2017-01-10 17:45:16,577] {bash_operator.py:77} INFO - at
org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(
SparkSession.scala:868)[2017-01-10 17:45:16,577] {bash_operator.py:77} INFO - at
org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(
SparkSession.scala:860)[2017-01-10 17:45:16,578] {bash_operator.py:77} INFO - at
scala.Option.getOrElse(Option.scala:121)[2017-01-10 17:45:16,578]
{bash_operator.py:77} INFO - at org.apache.spark.sql.SparkSess
ion$Builder.getOrCreate(SparkSession.scala:860)
Is there any other dependency to add for spark 2.1.0 ?

 





On Tue, Jan 10, 2017 1:26 AM, Abhishek Bhandari abhi10...@gmail.com
wrote:
Glad that you found it.ᐧ
On Mon, Jan 9, 2017 at 3:29 PM, Richard Siebeling <rsiebel...@gmail.com>  wrote:
Probably found it, it turns out that Mesos should be explicitly added while
building Spark, I assumed I could use the old build command that I used for
building Spark 2.0.0... Didn't see the two lines added in the documentation...
Maybe these kind of changes could be added in the changelog under changes of
behaviour or changes in the build process or something like that,
kind regards,Richard

On 9 January 2017 at 22:55, Richard Siebeling <rsiebel...@gmail.com>  wrote:
Hi,
I'm setting up Apache Spark 2.1.0 on Mesos and I am getting a "Could not parse
Master URL: 'mesos://xx.xx.xxx.xxx:5050'" error.Mesos is running fine (both the
master as the slave, it's a single machine configuration).
I really don't understand why this is happening since the same configuration but
using a Spark 2.0.0 is running fine within Vagrant.Could someone please help?
thanks in advance,Richard






-- 
Abhishek J BhandariMobile No. +1 510 493 6205  (USA)
Mobile No. +91 96387 93021  (IND)R & D DepartmentValent Software Inc. CAEmail: 
abhis...@valent-software.com

 

Olivier Girardot| Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94
 


-- 
Michael Gummelt
Software Engineer
Mesosphere


Olivier Girardot| Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94
 


-- 
Michael Gummelt
Software Engineer
Mesosphere


Olivier Girardot| Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94

Re: Could not parse Master URL for Mesos on Spark 2.1.0

2017-01-10 Thread Olivier Girardot
Hi Michael,I did so, but it's not exactly the problem, you see my driver has
all the dependencies packaged, and only the executors fetch via the
spark.executor.uri the tgz,The strange thing is that I see in my classpath the
org.apache.mesos:mesos-1.0.0-shaded-protobuf dependency packaged in the final
dist of my app…So everything should work in theory.
 





On Tue, Jan 10, 2017 7:22 PM, Michael Gummelt mgumm...@mesosphere.io
wrote:
Just build with -Pmesos 
http://spark.apache.org/docs/latest/building-spark.html#building-with-mesos-support

On Tue, Jan 10, 2017 at 8:56 AM, Olivier Girardot <
o.girar...@lateral-thoughts.com>  wrote:
I had the same problem, added spark-mesos as dependency and now I get :
[2017-01-10 17:45:16,575] {bash_operator.py:77} INFO - Exception in thread
"main" java.lang.NoClassDefFoundError: Could not initialize class
org.apache.mesos.MesosSchedulerDriver[2017-01-10 17:45:16,576]
{bash_operator.py:77} INFO - at org.apache.spark.scheduler.cluster.mesos.
MesosSchedulerUtils$class.createSchedulerDriver(MesosSchedulerUtils.scala:105)
[2017-01-10 17:45:16,576] {bash_operator.py:77} INFO - at
org.apache.spark.scheduler.cluster.mesos.MesosCoarseGrainedSchedulerBac
kend.createSchedulerDriver(MesosCoarseGrainedSchedulerBackend.scala:48)
[2017-01-10 17:45:16,576] {bash_operator.py:77} INFO - at
org.apache.spark.scheduler.cluster.mesos.MesosCoarseGrainedSchedulerBac
kend.start(MesosCoarseGrainedSchedulerBackend.scala:155)[2017-01-10
17:45:16,577] {bash_operator.py:77} INFO - at org.apache.spark.scheduler.
TaskSchedulerImpl.start(TaskSchedulerImpl.scala:156)[2017-01-10 17:45:16,577]
{bash_operator.py:77} INFO - at org.apache.spark.SparkContext.
(SparkContext.scala:509)[2017-01-10 17:45:16,577] {bash_operator.py:77}
INFO - at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2313)
[2017-01-10 17:45:16,577] {bash_operator.py:77} INFO - at org.apache.spark.sql.
SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:868)[2017-01-10
17:45:16,577] {bash_operator.py:77} INFO - at org.apache.spark.sql.
SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:860)[2017-01-10
17:45:16,578] {bash_operator.py:77} INFO - at scala.Option.getOrElse(Option.
scala:121)[2017-01-10 17:45:16,578] {bash_operator.py:77} INFO - at
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:860)
Is there any other dependency to add for spark 2.1.0 ?

 





On Tue, Jan 10, 2017 1:26 AM, Abhishek Bhandari abhi10...@gmail.com
wrote:
Glad that you found it.ᐧ
On Mon, Jan 9, 2017 at 3:29 PM, Richard Siebeling <rsiebel...@gmail.com>  wrote:
Probably found it, it turns out that Mesos should be explicitly added while
building Spark, I assumed I could use the old build command that I used for
building Spark 2.0.0... Didn't see the two lines added in the documentation...
Maybe these kind of changes could be added in the changelog under changes of
behaviour or changes in the build process or something like that,
kind regards,Richard

On 9 January 2017 at 22:55, Richard Siebeling <rsiebel...@gmail.com>  wrote:
Hi,
I'm setting up Apache Spark 2.1.0 on Mesos and I am getting a "Could not parse
Master URL: 'mesos://xx.xx.xxx.xxx:5050'" error.Mesos is running fine (both the
master as the slave, it's a single machine configuration).
I really don't understand why this is happening since the same configuration but
using a Spark 2.0.0 is running fine within Vagrant.Could someone please help?
thanks in advance,Richard






-- 
Abhishek J BhandariMobile No. +1 510 493 6205  (USA)
Mobile No. +91 96387 93021  (IND)R & D DepartmentValent Software Inc. CAEmail: 
abhis...@valent-software.com

 

Olivier Girardot| Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94
 


-- 
Michael Gummelt
Software Engineer
Mesosphere


Olivier Girardot| Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94

Re: Could not parse Master URL for Mesos on Spark 2.1.0

2017-01-10 Thread Olivier Girardot
I had the same problem, added spark-mesos as dependency and now I get :
[2017-01-10 17:45:16,575] {bash_operator.py:77} INFO - Exception in thread
"main" java.lang.NoClassDefFoundError: Could not initialize class
org.apache.mesos.MesosSchedulerDriver[2017-01-10 17:45:16,576]
{bash_operator.py:77} INFO - at
org.apache.spark.scheduler.cluster.mesos.MesosSchedulerUtils$class.createSchedulerDriver(MesosSchedulerUtils.scala:105)
[2017-01-10 17:45:16,576] {bash_operator.py:77} INFO - at
org.apache.spark.scheduler.cluster.mesos.MesosCoarseGrainedSchedulerBackend.createSchedulerDriver(MesosCoarseGrainedSchedulerBackend.scala:48)
[2017-01-10 17:45:16,576] {bash_operator.py:77} INFO - at
org.apache.spark.scheduler.cluster.mesos.MesosCoarseGrainedSchedulerBackend.start(MesosCoarseGrainedSchedulerBackend.scala:155)
[2017-01-10 17:45:16,577] {bash_operator.py:77} INFO - at
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:156)
[2017-01-10 17:45:16,577] {bash_operator.py:77} INFO - at
org.apache.spark.SparkContext.(SparkContext.scala:509)[2017-01-10
17:45:16,577] {bash_operator.py:77} INFO - at
org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2313)[2017-01-10
17:45:16,577] {bash_operator.py:77} INFO - at
org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:868)
[2017-01-10 17:45:16,577] {bash_operator.py:77} INFO - at
org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:860)
[2017-01-10 17:45:16,578] {bash_operator.py:77} INFO - at
scala.Option.getOrElse(Option.scala:121)[2017-01-10 17:45:16,578]
{bash_operator.py:77} INFO - at
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:860)
Is there any other dependency to add for spark 2.1.0 ?
 





On Tue, Jan 10, 2017 1:26 AM, Abhishek Bhandari abhi10...@gmail.com
wrote:
Glad that you found it.ᐧ
On Mon, Jan 9, 2017 at 3:29 PM, Richard Siebeling <rsiebel...@gmail.com>  wrote:
Probably found it, it turns out that Mesos should be explicitly added while
building Spark, I assumed I could use the old build command that I used for
building Spark 2.0.0... Didn't see the two lines added in the documentation...
Maybe these kind of changes could be added in the changelog under changes of
behaviour or changes in the build process or something like that,
kind regards,Richard

On 9 January 2017 at 22:55, Richard Siebeling <rsiebel...@gmail.com>  wrote:
Hi,
I'm setting up Apache Spark 2.1.0 on Mesos and I am getting a "Could not parse
Master URL: 'mesos://xx.xx.xxx.xxx:5050'" error.Mesos is running fine (both the
master as the slave, it's a single machine configuration).
I really don't understand why this is happening since the same configuration but
using a Spark 2.0.0 is running fine within Vagrant.Could someone please help?
thanks in advance,Richard






-- 
Abhishek J BhandariMobile No. +1 510 493 6205 (USA)
Mobile No. +91 96387 93021 (IND)R & D DepartmentValent Software Inc. CAEmail: 
abhis...@valent-software.com

 

Olivier Girardot| Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94

Re: Spark SQL - Applying transformation on a struct inside an array

2017-01-05 Thread Olivier Girardot
So, it seems the only way I found for now is a recursive handling of the Row
instances directly, but to do that I have to go back to RDDs, i've put together
a simple test case demonstrating the problem :
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.scalatest.{FlatSpec, Matchers}

class extends with DFInPlaceTransform FlatSpec Matchers {
val spark = SparkSession.builder().appName("local""local[*]"
).master().getOrCreate()
it should "access and mutate deeply nested arrays/structs" in {

val df = spark.read.json(spark.sparkContext.parallelize(List(
"""{"a":[{"b" : "toto" }]}""".stripMargin)))
df.show()
df.printSchema()

val result = transformInPlace("a.b", df)

result.printSchema()
result.show()

result.schema should be (df.schema)
val res = result.toJSON.take(1)
res should be("""{"a":[{"b" : TOTO" }]}""")
}

def transformInPlace(path: String, df: DataFrame): DataFrame = {
val udf = spark.udf.register("transform", (s: String) => s.toUpperCase)
val paths = path.split('.')
val root = paths.head
import org.apache.spark.sql.functions._
df.withColumn(root, udf(df(path))) // does not work of course
}
}

the three other solutions I see are * to create a dedicated Expression for
   in-place modifications of nested arrays and structs,
 * to use heavy explode/lateral views/group
   by computations, but that's bound to be inefficient
 * or to generate bytecode using the schema
   to do the nested "getRow,getSeq…" and re-create the rows once transformation
   is applied

I'd like to open an issue regarding that use case because it's not the first or
last time it comes up and I still don't see any generic solution using
Dataframes.Thanks for your time,Regards,
Olivier
 





On Fri, Sep 16, 2016 10:19 AM, Olivier Girardot o.girar...@lateral-thoughts.com
wrote:
Hi michael,Well for nested structs, I saw in the tests the behaviour defined by
SPARK-12512 for the "a.b.c" handling in withColumn, and even if it's not ideal
for me, I managed to make it work anyway like that :> df.withColumn("a",
struct(struct(myUDF(df("a.b.c." // I didn't put back the aliases but you see
what I mean.
What I'd like to make work in essence is something like that> val someFunc :
String => String = ???> val myUDF = udf(someFunc)> df.withColumn("a.b[*].c",
myUDF(df("a.b[*].c"))) // the fact is that in order to be consistent with the
previous API, maybe I'd have to put something like a struct(array(struct(… which
would be troublesome because I'd have to parse the arbitrary input string  and
create something like "a.b[*].c" => struct(array(struct(
I realise the ambiguity implied in the kind of column expression, but it doesn't
seem for now available to cleanly update data inplace at an arbitrary depth.
I'll try to work on a PR that would make this possible, but any pointers would
be appreciated.
Regards,
Olivier.
 





On Fri, Sep 16, 2016 12:42 AM, Michael Armbrust mich...@databricks.com
wrote:
Is what you are looking for a withColumn that support in place modification of
nested columns? or is it some other problem?
On Wed, Sep 14, 2016 at 11:07 PM, Olivier Girardot <
o.girar...@lateral-thoughts.com>  wrote:
I tried to use the RowEncoder but got stuck along the way :The main issue really
is that even if it's possible (however tedious) to pattern match generically
Row(s) and target the nested field that you need to modify, Rows being immutable
data structure without a method like a case class's copy or any kind of lens to
create a brand new object, I ended up stuck at the step "target and extract the
field to update" without any way to update the original Row with the new value.
To sum up, I tried : * using only dataframe's API itself + my udf - which works
   for nested structs as long as no arrays are along the way
 * trying to create a udf the can apply on Row and pattern
   match recursively the path I needed to explore/modify
 * trying to create a UDT - but we seem to be stuck in a
   strange middle-ground with 2.0 because some parts of the API ended up private
   while some stayed public making it impossible to use it now (I'd be glad if
   I'm mistaken)

All of these failed for me and I ended up converting the rows to JSON and update
using JSONPath which is…. something I'd like to avoid 'pretty please' 





On Thu, Sep 15, 2016 5:20 AM, Michael Allman mich...@videoamp.com
wrote:
Hi Guys,
Have you tried org.apache.spark.sql.catalyst.encoders.RowEncoder? It's not a
public API, but it is publicly accessible. I used it recently to correct some
bad data in a few nested columns in a dataframe. It wasn't an easy job, but it
made it possible. In my particular case I was not working with arrays.
Olivier, I'm interested in seeing what you come up with.
Than

Re: Help in generating unique Id in spark row

2017-01-05 Thread Olivier Girardot
There is a way, you can use
org.apache.spark.sql.functions.monotonicallyIncreasingId it will give each rows
of your dataframe a unique Id
 





On Tue, Oct 18, 2016 10:36 AM, ayan guha guha.a...@gmail.com
wrote:
Do you have any primary key or unique identifier in your data? Even if multiple
columns can make a composite key? In other words, can your data have exactly
same 2 rows with different unique ID? Also, do you have to have numeric ID? 

You may want to pursue hashing algorithm such as sha group to convert single or
composite unique columns to an ID. 

On 18 Oct 2016 15:32, "Saurav Sinha" <sauravsinh...@gmail.com> wrote:
Can any one help me out
On Mon, Oct 17, 2016 at 7:27 PM, Saurav Sinha <sauravsinh...@gmail.com>  wrote:
Hi,
I am in situation where I want to generate unique Id for each row.
I have use monotonicallyIncreasingId but it is giving increasing values and
start generating from start if it fail.
I have two question here:
Q1. Does this method give me unique id even in failure situation becaue I want
to use that ID in my solr id.
Q2. If answer to previous question is NO. Then Is there way yo generate UUID for
each row which is uniqe and not updatedable.
As I have come up with situation where UUID is updated

val idUDF = udf(() => UUID.randomUUID().toString)
val a = withColumn("alarmUUID", lit(idUDF()))a.persist(StorageLevel.MEMORY_
AND_DISK)
rawDataDf.registerTempTable("rawAlarms")

// I do some joines
but as I reach further below
I do sonthing likeb is transformation of asqlContext.sql("""Select
a.alarmUUID,b.alarmUUID                      from a right outer join bon
a.alarmUUID = b.alarmUUID""")
it give output as
+++|           alarmUUID|          
alarmUUID|+++|7d33a516-5532-410...|    
           null||                null|2439d6db-16a2-44b...|
+++


-- 
Thanks and Regards,
Saurav Sinha
Contact: 9742879062


-- 
Thanks and Regards,
Saurav Sinha
Contact: 9742879062

 

Olivier Girardot| Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94

Re: Using Spark as a Maven dependency but with Hadoop 2.6

2016-09-29 Thread Olivier Girardot
I know that the code itself would not be the same, but it would be useful to at
least have the pom/build.sbt transitive dependencies different when fetching the
artifact with a specific classifier, don't you think ?For now I've overriden
them myself using the dependency versions defined in the pom.xml of spark.So
it's not a blocker issue, it may be useful to document it, but a blog post would
be sufficient I think.
 





On Wed, Sep 28, 2016 7:21 PM, Sean Owen so...@cloudera.com
wrote:
I guess I'm claiming the artifacts wouldn't even be different in the first
place, because the Hadoop APIs that are used are all the same across these
versions. That would be the thing that makes you need multiple versions of the
artifact under multiple classifiers.
On Wed, Sep 28, 2016 at 1:16 PM, Olivier Girardot <
o.girar...@lateral-thoughts.com>  wrote:
ok, don't you think it could be published with just different classifiers
hadoop-2.6hadoop-2.4
hadoop-2.2 being the current default.
So for now, I should just override spark 2.0.0's dependencies with the ones
defined in the pom profile

 





On Thu, Sep 22, 2016 11:17 AM, Sean Owen so...@cloudera.com
wrote:
There can be just one published version of the Spark artifacts and they have to
depend on something, though in truth they'd be binary-compatible with anything
2.2+. So you merely manage the dependency versions up to the desired version in
your .
On Thu, Sep 22, 2016 at 7:05 AM, Olivier Girardot <
o.girar...@lateral-thoughts.com>  wrote:
Hi,when we fetch Spark 2.0.0 as maven dependency then we automatically end up
with hadoop 2.2 as a transitive dependency, I know multiple profiles are used to
generate the different tar.gz bundles that we can download, Is there by any
chance publications of Spark 2.0.0 with different classifier according to
different versions of Hadoop available ?
Thanks for your time !
Olivier Girardot

 


Olivier Girardot| Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94
 


Olivier Girardot| Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94

Re: Using Spark as a Maven dependency but with Hadoop 2.6

2016-09-28 Thread Olivier Girardot
ok, don't you think it could be published with just different classifiers
hadoop-2.6hadoop-2.4
hadoop-2.2 being the current default.
So for now, I should just override spark 2.0.0's dependencies with the ones
defined in the pom profile
 





On Thu, Sep 22, 2016 11:17 AM, Sean Owen so...@cloudera.com
wrote:
There can be just one published version of the Spark artifacts and they have to
depend on something, though in truth they'd be binary-compatible with anything
2.2+. So you merely manage the dependency versions up to the desired version in
your .
On Thu, Sep 22, 2016 at 7:05 AM, Olivier Girardot <
o.girar...@lateral-thoughts.com>  wrote:
Hi,when we fetch Spark 2.0.0 as maven dependency then we automatically end up
with hadoop 2.2 as a transitive dependency, I know multiple profiles are used to
generate the different tar.gz bundles that we can download, Is there by any
chance publications of Spark 2.0.0 with different classifier according to
different versions of Hadoop available ?
Thanks for your time !
Olivier Girardot

 


Olivier Girardot| Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94

Using Spark as a Maven dependency but with Hadoop 2.6

2016-09-22 Thread Olivier Girardot

Hi,when we fetch Spark 2.0.0 as maven dependency then we automatically end up
with hadoop 2.2 as a transitive dependency, I know multiple profiles are used to
generate the different tar.gz bundles that we can download, Is there by any
chance publications of Spark 2.0.0 with different classifier according to
different versions of Hadoop available ?
Thanks for your time !
Olivier Girardot

Re: Spark SQL - Applying transformation on a struct inside an array

2016-09-16 Thread Olivier Girardot
Hi michael,Well for nested structs, I saw in the tests the behaviour defined by
SPARK-12512 for the "a.b.c" handling in withColumn, and even if it's not ideal
for me, I managed to make it work anyway like that :> df.withColumn("a",
struct(struct(myUDF(df("a.b.c." // I didn't put back the aliases but you see
what I mean.
What I'd like to make work in essence is something like that> val someFunc :
String => String = ???> val myUDF = udf(someFunc)> df.withColumn("a.b[*].c",
myUDF(df("a.b[*].c"))) // the fact is that in order to be consistent with the
previous API, maybe I'd have to put something like a struct(array(struct(… which
would be troublesome because I'd have to parse the arbitrary input string  and
create something like "a.b[*].c" => struct(array(struct(
I realise the ambiguity implied in the kind of column expression, but it doesn't
seem for now available to cleanly update data inplace at an arbitrary depth.
I'll try to work on a PR that would make this possible, but any pointers would
be appreciated.
Regards,
Olivier.
 





On Fri, Sep 16, 2016 12:42 AM, Michael Armbrust mich...@databricks.com
wrote:
Is what you are looking for a withColumn that support in place modification of
nested columns? or is it some other problem?
On Wed, Sep 14, 2016 at 11:07 PM, Olivier Girardot <
o.girar...@lateral-thoughts.com>  wrote:
I tried to use the RowEncoder but got stuck along the way :The main issue really
is that even if it's possible (however tedious) to pattern match generically
Row(s) and target the nested field that you need to modify, Rows being immutable
data structure without a method like a case class's copy or any kind of lens to
create a brand new object, I ended up stuck at the step "target and extract the
field to update" without any way to update the original Row with the new value.
To sum up, I tried : * using only dataframe's API itself + my udf - which works
   for nested structs as long as no arrays are along the way
 * trying to create a udf the can apply on Row and pattern
   match recursively the path I needed to explore/modify
 * trying to create a UDT - but we seem to be stuck in a
   strange middle-ground with 2.0 because some parts of the API ended up private
   while some stayed public making it impossible to use it now (I'd be glad if
   I'm mistaken)

All of these failed for me and I ended up converting the rows to JSON and update
using JSONPath which is…. something I'd like to avoid 'pretty please' 





On Thu, Sep 15, 2016 5:20 AM, Michael Allman mich...@videoamp.com
wrote:
Hi Guys,
Have you tried org.apache.spark.sql.catalyst.encoders.RowEncoder? It's not a
public API, but it is publicly accessible. I used it recently to correct some
bad data in a few nested columns in a dataframe. It wasn't an easy job, but it
made it possible. In my particular case I was not working with arrays.
Olivier, I'm interested in seeing what you come up with.
Thanks,
Michael

On Sep 14, 2016, at 10:44 AM, Fred Reiss <freiss@gmail.com> wrote:
+1 to this request. I talked last week with a product group within IBM that is
struggling with the same issue. It's pretty common in data cleaning applications
for data in the early stages to have nested lists or sets inconsistent or
incomplete schema information.
Fred
On Tue, Sep 13, 2016 at 8:08 AM, Olivier Girardot <
o.girar...@lateral-thoughts.com>  wrote:
Hi everyone,I'm currently trying to create a generic transformation mecanism on
a Dataframe to modify an arbitrary column regardless of the underlying the
schema.
It's "relatively" straightforward for complex types like struct<struct<…>> to
apply an arbitrary UDF on the column and replace the data "inside" the struct,
however I'm struggling to make it work for complex types containing arrays along
the way like struct<array<struct<…>>>.
Michael Armbrust seemed to allude on the mailing list/forum to a way of using
Encoders to do that, I'd be interested in any pointers, especially considering
that it's not possible to output any Row or GenericRowWithSchema from a UDF
(thanks to https://github.com/apache/spark/blob/v2.0.0/sql/catalyst/
src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala#L657  it
seems).
To sum up, I'd like to find a way to apply a transformation on complex nested
datatypes (arrays and struct) on a Dataframe updating the value itself.
Regards,
Olivier Girardot

 



Olivier Girardot| Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94
 


Olivier Girardot| Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94

Re: Spark SQL - Applying transformation on a struct inside an array

2016-09-15 Thread Olivier Girardot
I tried to use the RowEncoder but got stuck along the way :The main issue
really is that even if it's possible (however tedious) to pattern match
generically Row(s) and target the nested field that you need to modify, Rows
being immutable data structure without a method like a case class's copy or any
kind of lens to create a brand new object, I ended up stuck at the step "target
and extract the field to update" without any way to update the original Row with
the new value.
To sum up, I tried : * using only dataframe's API itself + my udf - which works
   for nested structs as long as no arrays are along the way
 * trying to create a udf the can apply on Row and pattern
   match recursively the path I needed to explore/modify
 * trying to create a UDT - but we seem to be stuck in a
   strange middle-ground with 2.0 because some parts of the API ended up private
   while some stayed public making it impossible to use it now (I'd be glad if
   I'm mistaken)

All of these failed for me and I ended up converting the rows to JSON and update
using JSONPath which is…. something I'd like to avoid 'pretty please' 





On Thu, Sep 15, 2016 5:20 AM, Michael Allman mich...@videoamp.com
wrote:
Hi Guys,
Have you tried org.apache.spark.sql.catalyst.encoders.RowEncoder? It's not a
public API, but it is publicly accessible. I used it recently to correct some
bad data in a few nested columns in a dataframe. It wasn't an easy job, but it
made it possible. In my particular case I was not working with arrays.
Olivier, I'm interested in seeing what you come up with.
Thanks,
Michael

On Sep 14, 2016, at 10:44 AM, Fred Reiss <freiss@gmail.com> wrote:
+1 to this request. I talked last week with a product group within IBM that is
struggling with the same issue. It's pretty common in data cleaning applications
for data in the early stages to have nested lists or sets inconsistent or
incomplete schema information.
Fred
On Tue, Sep 13, 2016 at 8:08 AM, Olivier Girardot <
o.girar...@lateral-thoughts.com>  wrote:
Hi everyone,I'm currently trying to create a generic transformation mecanism on
a Dataframe to modify an arbitrary column regardless of the underlying the
schema.
It's "relatively" straightforward for complex types like struct<struct<…>> to
apply an arbitrary UDF on the column and replace the data "inside" the struct,
however I'm struggling to make it work for complex types containing arrays along
the way like struct<array<struct<…>>>.
Michael Armbrust seemed to allude on the mailing list/forum to a way of using
Encoders to do that, I'd be interested in any pointers, especially considering
that it's not possible to output any Row or GenericRowWithSchema from a UDF
(thanks to 
https://github.com/apache/spark/blob/v2.0.0/sql/catalyst/src/main/scala/org/
apache/spark/sql/catalyst/ScalaReflection.scala#L657  it seems).
To sum up, I'd like to find a way to apply a transformation on complex nested
datatypes (arrays and struct) on a Dataframe updating the value itself.
Regards,
Olivier Girardot

 



Olivier Girardot| Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94

Spark SQL - Applying transformation on a struct inside an array

2016-09-13 Thread Olivier Girardot
Hi everyone,I'm currently trying to create a generic transformation mecanism on
a Dataframe to modify an arbitrary column regardless of the underlying the
schema.
It's "relatively" straightforward for complex types like struct<struct<…>> to
apply an arbitrary UDF on the column and replace the data "inside" the struct,
however I'm struggling to make it work for complex types containing arrays along
the way like struct<array<struct<…>>>.
Michael Armbrust seemed to allude on the mailing list/forum to a way of using
Encoders to do that, I'd be interested in any pointers, especially considering
that it's not possible to output any Row or GenericRowWithSchema from a UDF
(thanks to
https://github.com/apache/spark/blob/v2.0.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala#L657
it seems).
To sum up, I'd like to find a way to apply a transformation on complex nested
datatypes (arrays and struct) on a Dataframe updating the value itself.
Regards,
Olivier Girardot

Re: Aggregations with scala pairs

2016-08-18 Thread Olivier Girardot
CC'ing dev list, you should open a Jira and a PR related to it to discuss it 
c.f.
https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-ContributingCodeChanges





On Wed, Aug 17, 2016 4:01 PM, Andrés Ivaldi iaiva...@gmail.com wrote:
Hello, I'd like to report a wrong behavior of DataSet's API, I don´t know how I
can do that. My Jira account doesn't allow me to add a Issue
I'm using Apache 2.0.0 but the problem came since at least version 1.4 (given
the doc since 1.3)
The problem is simple to reporduce, also the work arround, if we apply agg over
a DataSet with scala pairs over the same column, only one agg over that column
is actualy used, this is because the toMap that reduce the pair values of the
mane key to one and overwriting the value
class 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala


def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = {
agg((aggExpr +: aggExprs).toMap)
}
rewrited as somthing like this should work def agg(aggExpr: (String, String), 
aggExprs: (String, String)*): DataFrame = {
toDF((aggExpr +: aggExprs).map { pairExpr => 
strToExpr(pairExpr._2)(df(pairExpr._1).expr) }.toSeq) }

regards --
Ing. Ivaldi Andres


Olivier Girardot | Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94

Re: Spark DF CacheTable method. Will it save data to disk?

2016-08-18 Thread Olivier Girardot
that's another "pipeline" step to add whereas when using persist is just
relevant during the lifetime of your jobs and not in HDFS but in the local disk
of your executors.





On Wed, Aug 17, 2016 5:56 PM, neil90 neilp1...@icloud.com wrote:
>From the spark


documentation(http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence)

yes you can use persist on a dataframe instead of cache. All cache is, is a

shorthand for the default persist storage level "MEMORY_ONLY". If you want

to persist the dataframe to disk you should do

dataframe.persist(StorageLevel.DISK_ONLY).




IMO If reads are expensive against the DB and your afraid of failure why not

just save the data as a parquet on your cluster in hive and read from there?










--

View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-DF-CacheTable-method-Will-it-save-data-to-disk-tp27533p27551.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.




-

To unsubscribe e-mail: user-unsubscr...@spark.apache.org









Olivier Girardot | Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94

Re: error when running spark from oozie launcher

2016-08-18 Thread Olivier Girardot
this is not the full stacktrace, please post the full stacktrace if you want
some help





On Wed, Aug 17, 2016 7:24 PM, tkg_cangkul yuza.ras...@gmail.com wrote:
hi i try to submit job spark with oozie. but i've got one problem here.
when i submit the same job. sometimes my job succeed but sometimes my job was
failed.

i've got this error message when the job was failed :


org.apache.spark.launcher.CommandBuilderUtils.addPermGenSizeOpt(Ljava/util/List;)V

anyone can help me to solve this? i've try to set -XX:MaxPermSize=512m
-XX:PermSize=256m in spark. driver. extraJavaOptions properties but this not 
help enough for me.

Olivier Girardot | Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94

Re: Spark SQL 1.6.1 issue

2016-08-18 Thread Olivier Girardot
your executors/driver must not have the multiple versions of spark in classpath,
it may come from the cassandra connector check the pom dependencies of the
version you fetched and if it's compatible with your spark version.





On Thu, Aug 18, 2016 6:05 AM, thbeh th...@thbeh.com wrote:
Running the query below I have been hitting - local class incompatible

exception, anyone know the cause?




val rdd = csc.cassandraSql("""select *, concat('Q', d_qoy) as qoy from

store_sales join date_dim on ss_sold_date_sk = d_date_sk join item on

ss_item_sk =


i_item_sk""").groupBy("i_category").pivot("qoy").agg(round(sum("ss_sales_price")/100,2))




The source data is from TPCDS test data and I am running in Zeppelin.







/INFO [2016-08-18 03:15:58,429] ({task-result-getter-2}

Logging.scala[logInfo]:58) - Lost task 3.0 in stage 3.0 (TID 52) on executor

ceph5.example.my: java.io.InvalidClassException

(org.apache.spark.sql.catalyst.expressions.Literal; local class

incompatible: stream classdesc serialVersionUID = 3305180847846277455, local

class serialVersionUID = -4259705229845269663) [duplicate 1]

INFO [2016-08-18 03:15:58,429] ({task-result-getter-3}

Logging.scala[logInfo]:58) - Lost task 2.0 in stage 3.0 (TID 51) on executor

ceph5.example.my: java.io.InvalidClassException

(org.apache.spark.sql.catalyst.expressions.Literal; local class

incompatible: stream classdesc serialVersionUID = 3305180847846277455, local

class serialVersionUID = -4259705229845269663) [duplicate 2]

INFO [2016-08-18 03:15:58,430] ({task-result-getter-3}

Logging.scala[logInfo]:58) - Lost task 6.0 in stage 3.0 (TID 55) on executor

ceph5.example.my: java.io.InvalidClassException

(org.apache.spark.sql.catalyst.expressions.Literal; local class

incompatible: stream classdesc serialVersionUID = 3305180847846277455, local

class serialVersionUID = -4259705229845269663) [duplicate 3]/




Thanks










--

View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-1-6-1-issue-tp27554.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.




-

To unsubscribe e-mail: user-unsubscr...@spark.apache.org









Olivier Girardot | Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94

Re: tpcds for spark2.0

2016-07-29 Thread Olivier Girardot
I have the same kind of issue (not using spark-sql-perf), just trying to deploy
2.0.0 on mesos. I'll keep you posted as I investigate





On Wed, Jul 27, 2016 1:06 PM, kevin kiss.kevin...@gmail.com wrote:
hi,all: I want to have a test about tpcds99 sql run on spark2.0. I user 
https://github.com/databricks/spark-sql-perf
about the master version ,when I run :val tpcds = new TPCDS (sqlContext =
sqlContext) I got error:
scala> val tpcds = new TPCDS (sqlContext = sqlContext)
error: missing or invalid dependency detected while loading class file
'Benchmarkable.class'.
Could not access term typesafe in package com,
because it (or its dependencies) are missing. Check your build definition for
missing or conflicting dependencies. (Re-run with -Ylog-classpath to see the 
problematic classpath.)
A full rebuild may help if 'Benchmarkable.class' was compiled against an
incompatible version of com.
error: missing or invalid dependency detected while loading class file
'Benchmarkable.class'.
Could not access term scalalogging in value com.typesafe,
because it (or its dependencies) are missing. Check your build definition for
missing or conflicting dependencies. (Re-run with -Ylog-classpath to see the 
problematic classpath.)
A full rebuild may help if 'Benchmarkable.class' was compiled against an
incompatible version of com.typesafe.

about spark-sql-perf-0.4.3 when I run 
:tables.genData("hdfs://master1:9000/tpctest",
"parquet", true, false, false, false, false) I got error:
Generating table catalog_sales in database to
hdfs://master1:9000/tpctest/catalog_sales with save mode Overwrite. 16/07/27 
18:59:59 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
slave1): java.lang.ClassCastException: cannot assign instance of
scala.collection.immutable.List$SerializationProxy to field 
org.apache.spark.rdd.RDD.org $apache$spark$rdd$RDD$$dependencies_ of type 
scala.collection.Seq in instance
of org.apache.spark.rdd.MapPartitionsRDD


Olivier Girardot | Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94

Re: OOM exception during Broadcast

2016-03-08 Thread Olivier Girardot
Stream.java:370)
>>>> at
>>>> scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>> at
>>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>>>> at
>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
>>>> at
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>> at
>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>> at
>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>>> at
>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>>> at
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>> at
>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>> at
>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>>> at
>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>>> at
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>> at
>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>> at
>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>>> at
>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>>> at
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>> at
>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>> at
>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>>> at
>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>>> at
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>> at
>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>> at
>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>>> at
>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>>>
>>>>
>>>> I'm using spark 1.5.2. Cluster nodes are amazon r3.2xlarge. The spark
>>>> property maximizeResourceAllocation is set to true (executor.memory = 48G
>>>> according to spark ui environment). We're also using kryo serialization and
>>>> Yarn is the resource manager.
>>>>
>>>> Any ideas as what might be going wrong and how to debug this?
>>>>
>>>> Thanks,
>>>> Arash
>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>


-- 
*Olivier Girardot* | Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94


Re: Spark Certification

2016-02-14 Thread Olivier Girardot
It does not contain (as of yet) anything > 1.3 (for example in depth
knowledge of the Dataframe API)
but you need to know about all the modules (Core, Streaming, SQL, MLLib,
GraphX)

Regards,

Olivier.

2016-02-11 19:31 GMT+01:00 Prem Sure <premsure...@gmail.com>:

> I did recently. it includes MLib & Graphx too and I felt like exam content
> covered all topics till 1.3 and not the > 1.3 versions of spark.
>
>
> On Thu, Feb 11, 2016 at 9:39 AM, Janardhan Karri <jkarri@gmail.com>
> wrote:
>
>> I am planning to do that with databricks
>> http://go.databricks.com/spark-certified-developer
>>
>> Regards,
>> Janardhan
>>
>> On Thu, Feb 11, 2016 at 2:00 PM, Timothy Spann <tim.sp...@airisdata.com>
>> wrote:
>>
>>> I was wondering that as well.
>>>
>>> Also is it fully updated for 1.6?
>>>
>>> Tim
>>> http://airisdata.com/
>>> http://sparkdeveloper.com/
>>>
>>>
>>> From: naga sharathrayapati <sharathrayap...@gmail.com>
>>> Date: Wednesday, February 10, 2016 at 11:36 PM
>>> To: "user@spark.apache.org" <user@spark.apache.org>
>>> Subject: Spark Certification
>>>
>>> Hello All,
>>>
>>> I am planning on taking Spark Certification and I was wondering If one
>>> has to be well equipped with  MLib & GraphX as well or not ?
>>>
>>> Please advise
>>>
>>> Thanks
>>>
>>
>>
>


-- 
*Olivier Girardot* | Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94


Re: Spark Application Master on Yarn client mode - Virtual memory limit

2016-02-14 Thread Olivier Girardot
you can also activate detail GC prints to get more infos

2016-02-11 7:43 GMT+01:00 Shiva Ramagopal <tr.s...@gmail.com>:

> How are you submitting/running the job - via spark-submit or as a plain
> old Java program?
>
> If you are using spark-submit, you can control the memory setting via the
> configuration parameter spark.executor.memory in spark-defaults.conf.
>
> If you are running it as a Java program, use -Xmx to set the maximum heap
> size.
>
> On Thu, Feb 11, 2016 at 5:46 AM, Nirav Patel <npa...@xactlycorp.com>
> wrote:
>
>> In Yarn we have following settings enabled so that job can use virtual
>> memory to have a capacity beyond physical memory off course.
>>
>> 
>> yarn.nodemanager.vmem-check-enabled
>> false
>> 
>>
>> 
>> yarn.nodemanager.pmem-check-enabled
>> false
>> 
>>
>> vmem to pmem ration is 2:1. However spark doesn't seem to be able to
>> utilize this vmem limits
>> we are getting following heap space error which seemed to be contained
>> within spark executor.
>>
>> 16/02/09 23:08:06 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED
>> SIGNAL 15: SIGTERM
>> 16/02/09 23:08:06 ERROR executor.Executor: Exception in task 4.0 in stage
>> 7.6 (TID 22363)
>> java.lang.OutOfMemoryError: Java heap space
>> at java.util.IdentityHashMap.resize(IdentityHashMap.java:469)
>> at java.util.IdentityHashMap.put(IdentityHashMap.java:445)
>> at
>> org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:159)
>> at
>> org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:203)
>> at
>> org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:202)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at
>> org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:202)
>> at
>> org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:186)
>> at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:54)
>> at
>> org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
>> at
>> org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
>> at
>> org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
>> at
>> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278)
>> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:744)
>>
>>
>>
>> Yarn resource manager doesn't give any indication that whether container
>> ran out of phycial or virtual memory limits.
>>
>> Also how to profile this container memory usage? We know our data is
>> skewed so some of the executor will have large data (~2M RDD objects) to
>> process. I used following as executorJavaOpts but it doesn't seem to work.
>> -XX:-HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -3 %p'
>> -XX:HeapDumpPath=/opt/cores/spark
>>
>>
>>
>>
>>
>>
>> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>>
>> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
>> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
>> <https://twitter.com/Xactly>  [image: Facebook]
>> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
>> <http://www.youtube.com/xactlycorporation>
>
>
>


-- 
*Olivier Girardot* | Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94


Spark 1.6 - Datasets and Avro Encoders

2016-01-05 Thread Olivier Girardot
Hi everyone,
considering the new Datasets API, will there be Encoders defined for
reading and writing Avro files ? Will it be possible to use already
generated Avro classes ?

Regards,

-- 
*Olivier Girardot*


Re: Spark 1.6 - Datasets and Avro Encoders

2016-01-05 Thread Olivier Girardot
I'll do, but if you want my two cents, creating a dedicated "optimised"
encoder for Avro would be great (especially if it's possible to do better
than plain AvroKeyValueOutputFormat with saveAsNewAPIHadoopFile :) )

Thanks for your time Michael, and happy new year :-)

Regards,

Olivier.

2016-01-05 19:01 GMT+01:00 Michael Armbrust <mich...@databricks.com>:

> You could try with the `Encoders.bean` method.  It detects classes that
> have getters and setters.  Please report back!
>
> On Tue, Jan 5, 2016 at 9:45 AM, Olivier Girardot <
> o.girar...@lateral-thoughts.com> wrote:
>
>> Hi everyone,
>> considering the new Datasets API, will there be Encoders defined for
>> reading and writing Avro files ? Will it be possible to use already
>> generated Avro classes ?
>>
>> Regards,
>>
>> --
>> *Olivier Girardot*
>>
>
>


-- 
*Olivier Girardot* | Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94


Re: Lookup / Access of master data in spark streaming

2015-10-06 Thread Olivier Girardot
That's great !  Thanks !
So to sum up, to do some kind of "always up-to-date" lookup we can use
broadcast variables and re-broadcast when the data has changed using
whether the "transform" RDD to RDD transformation, "foreachRDD" or
transformWith.

Thank you for your time

Regards,

2015-10-05 23:49 GMT+02:00 Tathagata Das <t...@databricks.com>:

> Yes, when old broacast objects are not referenced any more in the driver,
> then associated data in the driver AND the executors will get cleared.
>
> On Mon, Oct 5, 2015 at 1:40 PM, Olivier Girardot <
> o.girar...@lateral-thoughts.com> wrote:
>
>> @td does that mean that the "old" broadcasted data will in any way be
>> "garbage collected" at some point if no RDD or transformation is using it
>> anymore ?
>>
>> Regards,
>>
>> Olivier.
>>
>> 2015-04-09 21:49 GMT+02:00 Amit Assudani <aassud...@impetus.com>:
>>
>>> Thanks a lot TD for detailed answers. The answers lead to few more
>>> questions,
>>>
>>>
>>>1. "the transform RDD-to-RDD function runs on the driver “ - I
>>>didn’t understand this, does it mean when I use transform function on
>>>DStream, it is not parallelized, surely I m missing something here.
>>>2.  updateStateByKey I think won’t work in this use case,  I have
>>>three separate attribute streams ( with different frequencies ) make up 
>>> the
>>>combined state ( i.e. Entity ) at point in time on which I want to do 
>>> some
>>>processing. Do you think otherwise ?
>>>3. transform+join seems only option so far, but any guestimate how
>>>would this perform/ react on cluster ? Assuming, master data in 100s of
>>>Gbs, and join is based on some row key. We are talking about slice of
>>>stream data to be joined with 100s of Gbs of master data continuously. Is
>>>it something can be done but should not be done ?
>>>
>>> Regards,
>>> Amit
>>>
>>> From: Tathagata Das <t...@databricks.com>
>>> Date: Thursday, April 9, 2015 at 3:13 PM
>>> To: amit assudani <aassud...@impetus.com>
>>> Cc: "user@spark.apache.org" <user@spark.apache.org>
>>> Subject: Re: Lookup / Access of master data in spark streaming
>>>
>>> Responses inline. Hope they help.
>>>
>>> On Thu, Apr 9, 2015 at 8:20 AM, Amit Assudani <aassud...@impetus.com>
>>> wrote:
>>>
>>>> Hi Friends,
>>>>
>>>> I am trying to solve a use case in spark streaming, I need help on
>>>> getting to right approach on lookup / update the master data.
>>>>
>>>> Use case ( simplified )
>>>> I’ve a dataset of entity with three attributes and identifier/row key
>>>> in a persistent store.
>>>>
>>>> Each attribute along with row key come from a different stream let’s
>>>> say, effectively 3 source streams.
>>>>
>>>> Now whenever any attribute comes up, I want to update/sync the
>>>> persistent store and do some processing, but the processing would require
>>>> the latest state of entity with latest values of three attributes.
>>>>
>>>> I wish if I have the all the entities cached in some sort of
>>>> centralized cache ( like we have data in hdfs ) within spark streaming
>>>> which may be used for data local processing. But I assume there is no such
>>>> thing.
>>>>
>>>> potential approaches I m thinking of, I suspect first two are not
>>>> feasible, but I want to confirm,
>>>>   1.  Is Broadcast Variables mutable ? If yes, can I use it as
>>>> cache for all entities sizing  around 100s of GBs provided i have a cluster
>>>> with enough RAM.
>>>>
>>>
>>> Broadcast variables are not mutable. But you can always create a new
>>> broadcast variable when you want and use the "latest" broadcast variable in
>>> your computation.
>>>
>>> dstream.transform { rdd =>
>>>
>>>val latestBroacast = getLatestBroadcastVariable()  // fetch existing
>>> or update+create new and return
>>>val transformedRDD = rdd. ..  // use  latestBroacast in RDD
>>> tranformations
>>>transformedRDD
>>> }
>>>
>>> Since the transform RDD-to-RDD function runs on the driver every batch
>>> interval, it will always use the latest broadcast variable that

Re: ClassCastException using DataFrame only when num-executors > 2 ...

2015-08-31 Thread Olivier Girardot
tested now against Spark 1.5.0 rc2, and same exceptions happen when
num-executors > 2 :

15/08/25 10:31:10 WARN scheduler.TaskSetManager: Lost task 0.1 in stage 5.0
(TID 501, xxx): java.lang.ClassCastException: java.lang.Double cannot
be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110)
at
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getLong(rows.scala:41)
at
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getLong(rows.scala:220)
at
org.apache.spark.sql.catalyst.expressions.JoinedRow.getLong(JoinedRow.scala:85)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown
Source)
at
org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:325)
at
org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:252)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)


2015-08-26 11:47 GMT+02:00 Olivier Girardot <ssab...@gmail.com>:

> Hi everyone,
> I know this "post title" doesn't seem very logical and I agree,
> we have a very complex computation using "only" pyspark dataframes and
> when launching the computation on a CDH 5.3 cluster using Spark 1.5.0 rc1
> (problem is reproduced with 1.4.x).
> If the number of executors is the default 2, the computation is very long
> but doesn't fail.
> If the number of executors is 3 or more (tested up to 20), then the
> computation fails very quickly with the following error :
>
> *Caused by: java.lang.ClassCastException: java.lang.Double cannot be cast
> to java.lang.Long*
>
> The complete stracktrace being :
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1267)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1255)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1254)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1254)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:684)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:684)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:684)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1480)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1442)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1431)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:554)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1805)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1818)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1831)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1902)
> at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:905)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
> at org.apache.spark.rdd.RDD.collect(RDD.scala:904)
> at org.apache.spark.RangePartitioner$.sketch(Partitioner.scala:264)
> at org.apache.spark.RangePartitioner.(Partitioner.scala:126)
> at
> org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:156)
> at
> org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:141)
> at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
> ... 138 more
> *Caused by: java.lang.ClassCastException: java.lang.Double cannot be cast
> to java.lang.Long*
> at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110)
> at
> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getLong(rows.scala:41)
> at
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getLong(rows.scala:220)
> at
> org.apache.spark.sql.catalyst.expressions.JoinedRow.getLong(JoinedRow.scala:85)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown
> Source)
> at
> org.apache.spark.sql.execution.Window$

Re: Spark stages very slow to complete

2015-08-25 Thread Olivier Girardot
I have pretty much the same symptoms - the computation itself is pretty
fast, but most of my computation is spent in JavaToPython steps (~15min).
I'm using the Spark 1.5.0-rc1 with DataFrame and ML Pipelines.
Any insights into what these steps are exactly ?

2015-06-02 9:18 GMT+02:00 Karlson ksonsp...@siberie.de:

 Hi, the code is some hundreds lines of Python. I can try to compose a
 minimal example as soon as I find the time, though. Any ideas until then?


 Would you mind posting the code?
 On 2 Jun 2015 00:53, Karlson ksonsp...@siberie.de wrote:

 Hi,

 In all (pyspark) Spark jobs, that become somewhat more involved, I am
 experiencing the issue that some stages take a very long time to complete
 and sometimes don't at all. This clearly correlates with the size of my
 input data. Looking at the stage details for one such stage, I am
 wondering
 where Spark spends all this time. Take this table of the stages task
 metrics for example:

 Metric  Min 25th
 percentile  Median  75th percentile Max
 Duration1.4 min 1.5 min 1.7 min
  1.9 min 2.3 min
 Scheduler Delay 1 ms3 ms4 ms
   5 ms23 ms
 Task Deserialization Time   1 ms2 ms3 ms
   8 ms22 ms
 GC Time 0 ms0 ms0 ms
   0 ms0 ms
 Result Serialization Time   0 ms0 ms0 ms
   0 ms1 ms
 Getting Result Time 0 ms0 ms0 ms
   0 ms0 ms
 Input Size / Records23.9 KB / 1 24.0 KB / 1 24.1 KB /
 1 24.1 KB / 1 24.3 KB / 1

 Why is the overall duration almost 2min? Where is all this time spent,
 when no progress of the stages is visible? The progress bar simply
 displays
 0 succeeded tasks for a very long time before sometimes slowly
 progressing.

 Also, the name of the stage displayed above is `javaToPython at null:-1`,
 which I find very uninformative. I don't even know which action exactly
 is
 responsible for this stage. Does anyone experience similar issues or have
 any advice for me?

 Thanks!

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
*Olivier Girardot* | Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94


Re: Classifier for Big Data Mining

2015-07-21 Thread Olivier Girardot
depends on your data and I guess the time/performance goals you have for
both training/prediction, but for a quick answer : yes :)

2015-07-21 11:22 GMT+02:00 Chintan Bhatt chintanbhatt...@charusat.ac.in:

 Which classifier can be useful for mining massive datasets in spark?
 Decision Tree can be good choice as per scalability?

 --
 CHINTAN BHATT http://in.linkedin.com/pub/chintan-bhatt/22/b31/336/
 Assistant Professor,
 U  P U Patel Department of Computer Engineering,
 Chandubhai S. Patel Institute of Technology,
 Charotar University of Science And Technology (CHARUSAT),
 Changa-388421, Gujarat, INDIA.
 http://www.charusat.ac.in
 *Personal Website*: https://sites.google.com/a/ecchanga.ac.in/chintan/



Re: coalesce on dataFrame

2015-07-01 Thread Olivier Girardot
PySpark or Spark (scala) ?
When you use coalesce with anything but a column you must use a literal
like that in PySpark :

from pyspark.sql import functions as F

F.coalesce(df.a, F.lit(True))

Le mer. 1 juil. 2015 à 12:03, Ewan Leith ewan.le...@realitymine.com a
écrit :

 It's in spark 1.4.0, or should be at least:

 https://issues.apache.org/jira/browse/SPARK-6972

 Ewan

 -Original Message-
 From: Hafiz Mujadid [mailto:hafizmujadi...@gmail.com]
 Sent: 01 July 2015 08:23
 To: user@spark.apache.org
 Subject: coalesce on dataFrame

 How can we use coalesce(1, true) on dataFrame?


 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/coalesce-on-dataFrame-tp23564.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
 commands, e-mail: user-h...@spark.apache.org


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Check for null in PySpark DataFrame

2015-07-01 Thread Olivier Girardot
I must admit I've been using the same back to SQL strategy for now :p
So I'd be glad to have insights into that too.

Le mar. 30 juin 2015 à 23:28, pedro ski.rodrig...@gmail.com a écrit :

 I am trying to find what is the correct way to programmatically check for
 null values for rows in a dataframe. For example, below is the code using
 pyspark and sql:

 df = sqlContext.createDataFrame(sc.parallelize([(1, None), (2, a), (3,
 b), (4, None)]))
 df.where('_2 is not null').count()

 However, this won't work
 df.where(df._2 != None).count()

 It seems there is no native Python way with DataFrames to do this, but I
 find that difficult to believe and more likely that I am missing the right
 way to do this.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Check-for-null-in-PySpark-DataFrame-tp23553.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Shell Hive Context and Kerberos ticket

2015-06-26 Thread Olivier Girardot
Nop I have not but I'm glad I'm not the only one :p

Le ven. 26 juin 2015 07:54, Tao Li litao.bupt...@gmail.com a écrit :

 Hi Olivier, have you fix this problem now? I still have this fasterxml
 NoSuchMethodError.

 2015-06-18 3:08 GMT+08:00 Olivier Girardot 
 o.girar...@lateral-thoughts.com:

 Ok what was wrong was that the spark-env did not contain the
 HADOOP_CONF_DIR properly set to /etc/hadoop/conf/
 With that fixed, this issue is gone, but I can't seem to get Spark SQL
 1.4.0 with Hive working on CDH 5.3 or 5.4 :
 Using this command line :
 IPYTHON=1 /.../spark-1.4.0-bin-hadoop2.4/bin/pyspark  --master
 yarn-client --driver-class-path `hadoop classpath`

 I end up with this issue :
 : java.lang.NoSuchMethodError:
 com.fasterxml.jackson.module.scala.deser.BigDecimalDeserializer$.handledType()Ljava/lang/Class;
 at
 com.fasterxml.jackson.module.scala.deser.NumberDeserializers$.init(ScalaNumberDeserializersModule.scala:49)
 at
 com.fasterxml.jackson.module.scala.deser.NumberDeserializers$.clinit(ScalaNumberDeserializersModule.scala)
 at
 com.fasterxml.jackson.module.scala.deser.ScalaNumberDeserializersModule$class.$init$(ScalaNumberDeserializersModule.scala:61)
 at
 com.fasterxml.jackson.module.scala.DefaultScalaModule.init(DefaultScalaModule.scala:19)
 at
 com.fasterxml.jackson.module.scala.DefaultScalaModule$.init(DefaultScalaModule.scala:35)
 at
 com.fasterxml.jackson.module.scala.DefaultScalaModule$.clinit(DefaultScalaModule.scala)
 at
 org.apache.spark.rdd.RDDOperationScope$.init(RDDOperationScope.scala:78)
 at
 org.apache.spark.rdd.RDDOperationScope$.clinit(RDDOperationScope.scala)
 at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
 at
 org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:118)
 at
 org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:125)
 at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1255)
 at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1189)
 at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1248)
 at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:176)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
 at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
 at py4j.Gateway.invoke(Gateway.java:259)
 at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
 at py4j.commands.CallCommand.execute(CallCommand.java:79)
 at py4j.GatewayConnection.run(GatewayConnection.java:207)
 at java.lang.Thread.run(Thread.java:745)

 This seems to be related to this Jira Issue :
 https://issues.apache.org/jira/browse/SPARK-8332

 This is a blocker for me to deploy a Spark dataframe based app on an
 existing cluster, any input regarding how to create a proper classpath
 would be great.

 Regards,

 Olivier.


 Le mer. 17 juin 2015 à 11:37, Olivier Girardot 
 o.girar...@lateral-thoughts.com a écrit :

 Hi everyone,
 After copying the hive-site.xml from a CDH5 cluster, I can't seem to
 connect to the hive metastore using spark-shell, here's a part of the stack
 trace I get :

 15/06/17 04:41:57 ERROR TSaslTransport: SASL negotiation failure
 javax.security.sasl.SaslException: GSS initiate failed [Caused by
 GSSException: No valid credentials provided (Mechanism level: Failed to
 find any Kerberos tgt)]
 at
 com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
 at
 org.apache.thrift.transport.TSaslClientTransport.handleSaslStartMessage(TSaslClientTransport.java:94)
 at
 org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253)
 at
 org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37)
 at
 org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:52)
 at
 org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:49)
 at java.security.AccessController.doPrivileged(Native Method)

 The user has a non-expired ticket, I can execute hadoop fs -ls, all in
 all I should have access to this.
 I am stuck with this issue on Spark 1.4.0, did not try a version
 before...

 Any guess regarding what might be wrong ?

 Regards,

 Olivier.





Re: Accessing Kerberos Secured HDFS Resources from Spark on Mesos

2015-06-26 Thread Olivier Girardot
I would pretty much need exactly this kind of feature too

Le ven. 26 juin 2015 à 21:17, Dave Ariens dari...@blackberry.com a écrit :

  Hi Timothy,



 Because I'm running Spark on Mesos alongside a secured Hadoop cluster, I
 need to ensure that my tasks running on the slaves perform a Kerberos login
 before accessing any HDFS resources.  To login, they just need the name of
 the principal (username) and a keytab file.  Then they just need to invoke
 the following java:



 import org.apache.hadoop.security.UserGroupInformation

 UserGroupInformation.loginUserFromKeytab(adminPrincipal, adminKeytab)



 This is done in the driver in my Gist below, but I don't know how to run
 it within each executor on the slaves as tasks are ran.



 Any help would be appreciated!





 *From:* Timothy Chen [mailto:t...@mesosphere.io]
 *Sent:* Friday, June 26, 2015 12:50 PM
 *To:* Dave Ariens
 *Cc:* user@spark.apache.org
 *Subject:* Re: Accessing Kerberos Secured HDFS Resources from Spark on
 Mesos



 Hi Dave,



 I don't understand Keeberos much but if you know the exact steps that
 needs to happen I can see how we can make that happen with the Spark
 framework.



 Tim


 On Jun 26, 2015, at 8:49 AM, Dave Ariens dari...@blackberry.com wrote:

  I understand that Kerberos support for accessing Hadoop resources in Spark 
 only works when running Spark on YARN.  However, I'd really like to hack 
 something together for Spark on Mesos running alongside a secured Hadoop 
 cluster.  My simplified appplication (gist: 
 https://gist.github.com/ariens/2c44c30e064b1790146a) receives a Kerberos 
 principal and keytab when submitted.  The static main method called currently 
 then performs a UserGroupInformation. loginUserFromKeytab(userPrincipal, 
 userKeytab) and authenticates to the Hadoop.  This works on YARN (curiously 
 without even without having to kinit first), but not on Mesos.  Is there a 
 way to have the slaves  running the tasks perform the same kerberos login 
 before they attempt to access HDFS?



 Putting aside the security of Spark/Mesos and how that keytab would get 
 distributed, I'm just looking for a working POC.



 Is there a way to leverage the Broadcast capability to send a function that 
 performs this?



 https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.broadcast.Broadcast



 Ideally, I'd love for this to not incur much overhead and just simply allow 
 me to work around the absent Kerberos support...



 Thanks,



 Dave




Re: GSSException when submitting Spark job in yarn-cluster mode with HiveContext APIs on Kerberos cluster

2015-06-22 Thread Olivier Girardot
Hi,
I can't get this to work using CDH 5.4, Spark 1.4.0 in yarn cluster mode.
@andrew did you manage to get it work with the latest version ?

Le mar. 21 avr. 2015 à 00:02, Andrew Lee alee...@hotmail.com a écrit :

 Hi Marcelo,

 Exactly what I need to track, thanks for the JIRA pointer.


  Date: Mon, 20 Apr 2015 14:03:55 -0700
  Subject: Re: GSSException when submitting Spark job in yarn-cluster mode
 with HiveContext APIs on Kerberos cluster
  From: van...@cloudera.com
  To: alee...@hotmail.com
  CC: user@spark.apache.org

 
  I think you want to take a look at:
  https://issues.apache.org/jira/browse/SPARK-6207
 
  On Mon, Apr 20, 2015 at 1:58 PM, Andrew Lee alee...@hotmail.com wrote:
   Hi All,
  
   Affected version: spark 1.2.1 / 1.2.2 / 1.3-rc1
  
   Posting this problem to user group first to see if someone is
 encountering
   the same problem.
  
   When submitting spark jobs that invokes HiveContext APIs on a Kerberos
   Hadoop + YARN (2.4.1) cluster,
   I'm getting this error.
  
   javax.security.sasl.SaslException: GSS initiate failed [Caused by
   GSSException: No valid credentials provided (Mechanism level: Failed
 to find
   any Kerberos tgt)]
  
   Apparently, the Kerberos ticket is not on the remote data node nor
 computing
   node since we don't
   deploy Kerberos tickets, and that is not a good practice either. On the
   other hand, we can't just SSH to every machine and run kinit for that
 users.
   This is not practical and it is insecure.
  
   The point here is that shouldn't there be a delegation token during
 the doAs
   to use the token instead of the ticket ?
   I'm trying to understand what is missing in Spark's HiveContext API
 while a
   normal MapReduce job that invokes Hive APIs will work, but not in
 Spark SQL.
   Any insights or feedback are appreciated.
  
   Anyone got this running without pre-deploying (pre-initializing) all
 tickets
   node by node? Is this worth filing a JIRA?
  
  
  
   15/03/25 18:59:08 INFO hive.metastore: Trying to connect to metastore
 with
   URI thrift://alee-cluster.test.testserver.com:9083
   15/03/25 18:59:08 ERROR transport.TSaslTransport: SASL negotiation
 failure
   javax.security.sasl.SaslException: GSS initiate failed [Caused by
   GSSException: No valid credentials provided (Mechanism level: Failed
 to find
   any Kerberos tgt)]
   at
  
 com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
   at
  
 org.apache.thrift.transport.TSaslClientTransport.handleSaslStartMessage(TSaslClientTransport.java:94)
   at
 org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253)
   at
  
 org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37)
   at
  
 org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:52)
   at
  
 org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:49)
   at java.security.AccessController.doPrivileged(Native Method)
   at javax.security.auth.Subject.doAs(Subject.java:415)
   at
  
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
   at
  
 org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport.open(TUGIAssumingTransport.java:49)
   at
  
 org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:336)
   at
  
 org.apache.hadoop.hive.metastore.HiveMetaStoreClient.init(HiveMetaStoreClient.java:214)
   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
   at
  
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
   at
  
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
   at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
   at
  
 org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1410)
   at
  
 org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(RetryingMetaStoreClient.java:62)
   at
  
 org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:72)
   at
  
 org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2453)
   at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465)
   at
  
 org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:340)
   at
  
 org.apache.spark.sql.hive.HiveContext$$anonfun$4.apply(HiveContext.scala:235)
   at
  
 org.apache.spark.sql.hive.HiveContext$$anonfun$4.apply(HiveContext.scala:231)
   at scala.Option.orElse(Option.scala:257)
   at
  
 org.apache.spark.sql.hive.HiveContext.x$3$lzycompute(HiveContext.scala:231)
   at org.apache.spark.sql.hive.HiveContext.x$3(HiveContext.scala:229)
   at
  
 org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:229)
   at
 org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:229)
   at
  
 

Re: Spark Shell Hive Context and Kerberos ticket

2015-06-17 Thread Olivier Girardot
Ok what was wrong was that the spark-env did not contain the
HADOOP_CONF_DIR properly set to /etc/hadoop/conf/
With that fixed, this issue is gone, but I can't seem to get Spark SQL
1.4.0 with Hive working on CDH 5.3 or 5.4 :
Using this command line :
IPYTHON=1 /.../spark-1.4.0-bin-hadoop2.4/bin/pyspark  --master yarn-client
--driver-class-path `hadoop classpath`

I end up with this issue :
: java.lang.NoSuchMethodError:
com.fasterxml.jackson.module.scala.deser.BigDecimalDeserializer$.handledType()Ljava/lang/Class;
at
com.fasterxml.jackson.module.scala.deser.NumberDeserializers$.init(ScalaNumberDeserializersModule.scala:49)
at
com.fasterxml.jackson.module.scala.deser.NumberDeserializers$.clinit(ScalaNumberDeserializersModule.scala)
at
com.fasterxml.jackson.module.scala.deser.ScalaNumberDeserializersModule$class.$init$(ScalaNumberDeserializersModule.scala:61)
at
com.fasterxml.jackson.module.scala.DefaultScalaModule.init(DefaultScalaModule.scala:19)
at
com.fasterxml.jackson.module.scala.DefaultScalaModule$.init(DefaultScalaModule.scala:35)
at
com.fasterxml.jackson.module.scala.DefaultScalaModule$.clinit(DefaultScalaModule.scala)
at
org.apache.spark.rdd.RDDOperationScope$.init(RDDOperationScope.scala:78)
at org.apache.spark.rdd.RDDOperationScope$.clinit(RDDOperationScope.scala)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:118)
at
org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:125)
at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1255)
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1189)
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1248)
at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:176)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)

This seems to be related to this Jira Issue :
https://issues.apache.org/jira/browse/SPARK-8332

This is a blocker for me to deploy a Spark dataframe based app on an
existing cluster, any input regarding how to create a proper classpath
would be great.

Regards,

Olivier.


Le mer. 17 juin 2015 à 11:37, Olivier Girardot 
o.girar...@lateral-thoughts.com a écrit :

 Hi everyone,
 After copying the hive-site.xml from a CDH5 cluster, I can't seem to
 connect to the hive metastore using spark-shell, here's a part of the stack
 trace I get :

 15/06/17 04:41:57 ERROR TSaslTransport: SASL negotiation failure
 javax.security.sasl.SaslException: GSS initiate failed [Caused by
 GSSException: No valid credentials provided (Mechanism level: Failed to
 find any Kerberos tgt)]
 at
 com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
 at
 org.apache.thrift.transport.TSaslClientTransport.handleSaslStartMessage(TSaslClientTransport.java:94)
 at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253)
 at
 org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37)
 at
 org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:52)
 at
 org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:49)
 at java.security.AccessController.doPrivileged(Native Method)

 The user has a non-expired ticket, I can execute hadoop fs -ls, all in all
 I should have access to this.
 I am stuck with this issue on Spark 1.4.0, did not try a version before...

 Any guess regarding what might be wrong ?

 Regards,

 Olivier.



Spark Shell Hive Context and Kerberos ticket

2015-06-17 Thread Olivier Girardot
Hi everyone,
After copying the hive-site.xml from a CDH5 cluster, I can't seem to
connect to the hive metastore using spark-shell, here's a part of the stack
trace I get :

15/06/17 04:41:57 ERROR TSaslTransport: SASL negotiation failure
javax.security.sasl.SaslException: GSS initiate failed [Caused by
GSSException: No valid credentials provided (Mechanism level: Failed to
find any Kerberos tgt)]
at
com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
at
org.apache.thrift.transport.TSaslClientTransport.handleSaslStartMessage(TSaslClientTransport.java:94)
at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253)
at
org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37)
at
org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:52)
at
org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:49)
at java.security.AccessController.doPrivileged(Native Method)

The user has a non-expired ticket, I can execute hadoop fs -ls, all in all
I should have access to this.
I am stuck with this issue on Spark 1.4.0, did not try a version before...

Any guess regarding what might be wrong ?

Regards,

Olivier.


Re: How to share large resources like dictionaries while processing data with Spark ?

2015-06-04 Thread Olivier Girardot
You can use it as a broadcast variable, but if it's too large (more than
1Gb I guess), you may need to share it joining this using some kind of key
to the other RDDs.
But this is the kind of thing broadcast variables were designed for.

Regards,

Olivier.

Le jeu. 4 juin 2015 à 23:50, dgoldenberg dgoldenberg...@gmail.com a
écrit :

 We have some pipelines defined where sometimes we need to load potentially
 large resources such as dictionaries.

 What would be the best strategy for sharing such resources among the
 transformations/actions within a consumer?  Can they be shared somehow
 across the RDD's?

 I'm looking for a way to load such a resource once into the cluster memory
 and have it be available throughout the lifecycle of a consumer...

 Thanks.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-large-resources-like-dictionaries-while-processing-data-with-Spark-tp23162.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Compute Median in Spark Dataframe

2015-06-02 Thread Olivier Girardot
Nice to hear from you Holden ! I ended up trying exactly that (Column) -
but I may have done it wrong :

In [*5*]: g.agg(Column(percentile(value, 0.5)))
Py4JError: An error occurred while calling o97.agg. Trace:
py4j.Py4JException: Method agg([class java.lang.String, class
scala.collection.immutable.Nil$]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)

Any idea ?

Olivier.
Le mar. 2 juin 2015 à 18:02, Holden Karau hol...@pigscanfly.ca a écrit :

 Not super easily, the GroupedData class uses a strToExpr function which
 has a pretty limited set of functions so we cant pass in the name of an
 arbitrary hive UDAF (unless I'm missing something). We can instead
 construct an column with the expression you want and then pass it in to
 agg() that way (although then you need to call the hive UDAF there). There
 are some private classes in hiveUdfs.scala which expose hiveUdaf's as Spark
 SQL AggregateExpressions, but they are private.

 On Tue, Jun 2, 2015 at 8:28 AM, Olivier Girardot 
 o.girar...@lateral-thoughts.com wrote:

 I've finally come to the same conclusion, but isn't there any way to call
 this Hive UDAFs from the agg(percentile(key,0.5)) ??

 Le mar. 2 juin 2015 à 15:37, Yana Kadiyska yana.kadiy...@gmail.com a
 écrit :

 Like this...sqlContext should be a HiveContext instance

 case class KeyValue(key: Int, value: String)
 val df=sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF
 df.registerTempTable(table)
 sqlContext.sql(select percentile(key,0.5) from table).show()

 ​

 On Tue, Jun 2, 2015 at 8:07 AM, Olivier Girardot 
 o.girar...@lateral-thoughts.com wrote:

 Hi everyone,
 Is there any way to compute a median on a column using Spark's
 Dataframe. I know you can use stats in a RDD but I'd rather stay within a
 dataframe.
 Hive seems to imply that using ntile one can compute percentiles,
 quartiles and therefore a median.
 Does anyone have experience with this ?

 Regards,

 Olivier.





 --
 Cell : 425-233-8271
 Twitter: https://twitter.com/holdenkarau
 Linked In: https://www.linkedin.com/in/holdenkarau



Re: Best strategy for Pandas - Spark

2015-06-02 Thread Olivier Girardot
Thanks for the answer, I'm currently doing exactly that.
I'll try to sum-up the usual Pandas = Spark Dataframe caveats soon.

Regards,

Olivier.

Le mar. 2 juin 2015 à 02:38, Davies Liu dav...@databricks.com a écrit :

 The second one sounds reasonable, I think.

 On Thu, Apr 30, 2015 at 1:42 AM, Olivier Girardot
 o.girar...@lateral-thoughts.com wrote:
  Hi everyone,
  Let's assume I have a complex workflow of more than 10 datasources as
 input
  - 20 computations (some creating intermediary datasets and some merging
  everything for the final computation) - some taking on average 1 minute
 to
  complete and some taking more than 30 minutes.
 
  What would be for you the best strategy to port this to Apache Spark ?
 
  Transform the whole flow into a Spark Job (PySpark or Scala)
  Transform only part of the flow (the heavy lifting ~30 min parts) using
 the
  same language (PySpark)
  Transform only part of the flow and pipe the rest from Scala to Python
 
  Regards,
 
  Olivier.



Re: Compute Median in Spark Dataframe

2015-06-02 Thread Olivier Girardot
I've finally come to the same conclusion, but isn't there any way to call
this Hive UDAFs from the agg(percentile(key,0.5)) ??

Le mar. 2 juin 2015 à 15:37, Yana Kadiyska yana.kadiy...@gmail.com a
écrit :

 Like this...sqlContext should be a HiveContext instance

 case class KeyValue(key: Int, value: String)
 val df=sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF
 df.registerTempTable(table)
 sqlContext.sql(select percentile(key,0.5) from table).show()

 ​

 On Tue, Jun 2, 2015 at 8:07 AM, Olivier Girardot 
 o.girar...@lateral-thoughts.com wrote:

 Hi everyone,
 Is there any way to compute a median on a column using Spark's Dataframe.
 I know you can use stats in a RDD but I'd rather stay within a dataframe.
 Hive seems to imply that using ntile one can compute percentiles,
 quartiles and therefore a median.
 Does anyone have experience with this ?

 Regards,

 Olivier.





Re: RandomSplit with Spark-ML and Dataframe

2015-05-19 Thread Olivier Girardot
Thank you !

Le mar. 19 mai 2015 à 21:08, Xiangrui Meng men...@gmail.com a écrit :

 In 1.4, we added RAND as a DataFrame expression, which can be used for
 random split. Please check the example here:

 https://github.com/apache/spark/blob/master/python/pyspark/ml/tuning.py#L214.
 https://github.com/apache/spark/blob/master/python/pyspark/ml/tuning.py#L214.-Xiangrui
 -Xiangrui
 https://github.com/apache/spark/blob/master/python/pyspark/ml/tuning.py#L214.-Xiangrui

 On Thu, May 7, 2015 at 8:39 AM, Olivier Girardot
 o.girar...@lateral-thoughts.com wrote:
  Hi,
  is there any best practice to do like in MLLib a randomSplit of
  training/cross-validation set with dataframes and the pipeline API ?
 
  Regards
 
  Olivier.



Re: [SparkSQL 1.4.0] groupBy columns are always nullable?

2015-05-18 Thread Olivier Girardot
PR is opened : https://github.com/apache/spark/pull/6237

Le ven. 15 mai 2015 à 17:55, Olivier Girardot ssab...@gmail.com a écrit :

 yes, please do and send me the link.
 @rxin I have trouble building master, but the code is done...


 Le ven. 15 mai 2015 à 01:27, Haopu Wang hw...@qilinsoft.com a écrit :

  Thank you, should I open a JIRA for this issue?


  --

 *From:* Olivier Girardot [mailto:ssab...@gmail.com]
 *Sent:* Tuesday, May 12, 2015 5:12 AM
 *To:* Reynold Xin
 *Cc:* Haopu Wang; user
 *Subject:* Re: [SparkSQL 1.4.0] groupBy columns are always nullable?



 I'll look into it - not sure yet what I can get out of exprs :p



 Le lun. 11 mai 2015 à 22:35, Reynold Xin r...@databricks.com a écrit :

 Thanks for catching this. I didn't read carefully enough.



 It'd make sense to have the udaf result be non-nullable, if the exprs are
 indeed non-nullable.



 On Mon, May 11, 2015 at 1:32 PM, Olivier Girardot ssab...@gmail.com
 wrote:

 Hi Haopu,
 actually here `key` is nullable because this is your input's schema :

 scala result.printSchema

 root
 |-- key: string (nullable = true)
 |-- SUM(value): long (nullable = true)

 scala df.printSchema
 root
 |-- key: string (nullable = true)
 |-- value: long (nullable = false)



 I tried it with a schema where the key is not flagged as nullable, and
 the schema is actually respected. What you can argue however is that
 SUM(value) should also be not nullable since value is not nullable.



 @rxin do you think it would be reasonable to flag the Sum aggregation
 function as nullable (or not) depending on the input expression's schema ?



 Regards,



 Olivier.

 Le lun. 11 mai 2015 à 22:07, Reynold Xin r...@databricks.com a écrit :

 Not by design. Would you be interested in submitting a pull request?



 On Mon, May 11, 2015 at 1:48 AM, Haopu Wang hw...@qilinsoft.com wrote:

 I try to get the result schema of aggregate functions using DataFrame
 API.

 However, I find the result field of groupBy columns are always nullable
 even the source field is not nullable.

 I want to know if this is by design, thank you! Below is the simple code
 to show the issue.

 ==

   import sqlContext.implicits._
   import org.apache.spark.sql.functions._
   case class Test(key: String, value: Long)
   val df = sc.makeRDD(Seq(Test(k1,2),Test(k1,1))).toDF

   val result = df.groupBy(key).agg($key, sum(value))

   // From the output, you can see the key column is nullable, why??
   result.printSchema
 //root
 // |-- key: string (nullable = true)
 // |-- SUM(value): long (nullable = true)


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org








Re: Why so slow

2015-05-12 Thread Olivier Girardot
can you post the explain too ?

Le mar. 12 mai 2015 à 12:11, Jianshi Huang jianshi.hu...@gmail.com a
écrit :

 Hi,

 I have a SQL query on tables containing big Map columns (thousands of
 keys). I found it to be very slow.

 select meta['is_bad'] as is_bad, count(*) as count, avg(nvar['var1']) as
 avg
 from test
 where date between '2014-04-01' and '2014-04-30'
 group by meta['is_bad']

 =

 +-+---+---+
 | is_bad  |   count   |  avg  |
 +-+---+---+
 | 0   | 17024396  | 0.16257395850742645   |
 | 1   | 179729| -0.37626256661125485  |
 | 2   | 28128 | 0.11674427263203344   |
 | 3   | 116327| -0.6398689187187386   |
 | 4   | 87715 | -0.5349632960030563   |
 | 5   | 169771| 0.40812641191854626   |
 | 6   | 542447| 0.5238256418341465|
 | 7   | 160324| 0.29442847034840386   |
 | 8   | 2099  | -0.9165701665162977   |
 | 9   | 3104  | 0.3845685004598235|
 +-+---+---+
 10 rows selected (130.5 seconds)


 The total number of rows is less than 20M. Why so slow?

 I'm running on Spark 1.4.0-SNAPSHOT with 100 executors each having 4GB ram
 and 2 CPU core.

 Looks like https://issues.apache.org/jira/browse/SPARK-5446 is still
 open, when can we have it fixed? :)

 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/



Re: value toDF is not a member of RDD object

2015-05-12 Thread Olivier Girardot
you need to instantiate a SQLContext :
val sc : SparkContext = ...
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

Le mar. 12 mai 2015 à 12:29, SLiZn Liu sliznmail...@gmail.com a écrit :

 I added `libraryDependencies += org.apache.spark % spark-sql_2.11 %
 1.3.1` to `build.sbt` but the error remains. Do I need to import modules
 other than `import org.apache.spark.sql.{ Row, SQLContext }`?

 On Tue, May 12, 2015 at 5:56 PM Olivier Girardot ssab...@gmail.com
 wrote:

 toDF is part of spark SQL so you need Spark SQL dependency + import
 sqlContext.implicits._ to get the toDF method.

 Regards,

 Olivier.

 Le mar. 12 mai 2015 à 11:36, SLiZn Liu sliznmail...@gmail.com a écrit :

 Hi User Group,

 I’m trying to reproduce the example on Spark SQL Programming Guide
 https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection,
 and got a compile error when packaging with sbt:

 [error] myfile.scala:30: value toDF is not a member of 
 org.apache.spark.rdd.RDD[Person]
 [error] val people = 
 sc.textFile(examples/src/main/resources/people.txt).map(_.split(,)).map(p
  = Person(p(0), p(1).trim.toInt)).toDF()
 [error] 
  ^
 [error] one error found
 [error] (compile:compileIncremental) Compilation failed
 [error] Total time: 3 s, completed May 12, 2015 4:11:53 PM

 I double checked my code includes import sqlContext.implicits._ after
 reading this post
 https://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3c1426522113299-22083.p...@n3.nabble.com%3E
 on spark mailing list, even tried to use toDF(col1, col2) suggested
 by Xiangrui Meng in that post and got the same error.

 The Spark version is specified in build.sbt file as follows:

 scalaVersion := 2.11.6
 libraryDependencies += org.apache.spark % spark-core_2.11 % 1.3.1 % 
 provided
 libraryDependencies += org.apache.spark % spark-mllib_2.11 % 1.3.1

 Anyone have ideas the cause of this error?

 REGARDS,
 Todd Leo
 ​




Re: [SparkSQL 1.4.0] groupBy columns are always nullable?

2015-05-11 Thread Olivier Girardot
Hi Haopu,
actually here `key` is nullable because this is your input's schema :

scala result.printSchema
root
|-- key: string (nullable = true)
|-- SUM(value): long (nullable = true)

scala df.printSchema
root
|-- key: string (nullable = true)
|-- value: long (nullable = false)

I tried it with a schema where the key is not flagged as nullable, and the
schema is actually respected. What you can argue however is that SUM(value)
should also be not nullable since value is not nullable.

@rxin do you think it would be reasonable to flag the Sum aggregation
function as nullable (or not) depending on the input expression's schema ?

Regards,

Olivier.
Le lun. 11 mai 2015 à 22:07, Reynold Xin r...@databricks.com a écrit :

 Not by design. Would you be interested in submitting a pull request?

 On Mon, May 11, 2015 at 1:48 AM, Haopu Wang hw...@qilinsoft.com wrote:

 I try to get the result schema of aggregate functions using DataFrame
 API.

 However, I find the result field of groupBy columns are always nullable
 even the source field is not nullable.

 I want to know if this is by design, thank you! Below is the simple code
 to show the issue.

 ==

   import sqlContext.implicits._
   import org.apache.spark.sql.functions._
   case class Test(key: String, value: Long)
   val df = sc.makeRDD(Seq(Test(k1,2),Test(k1,1))).toDF

   val result = df.groupBy(key).agg($key, sum(value))

   // From the output, you can see the key column is nullable, why??
   result.printSchema
 //root
 // |-- key: string (nullable = true)
 // |-- SUM(value): long (nullable = true)


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





RandomSplit with Spark-ML and Dataframe

2015-05-07 Thread Olivier Girardot
Hi,
is there any best practice to do like in MLLib a randomSplit of
training/cross-validation set with dataframes and the pipeline API ?

Regards

Olivier.


Re: Spark 1.3.1 and Parquet Partitions

2015-05-07 Thread Olivier Girardot
hdfs://some ip:8029/dataset/*/*.parquet doesn't work for you ?

Le jeu. 7 mai 2015 à 03:32, vasuki vax...@gmail.com a écrit :

 Spark 1.3.1 -
 i have a parquet file on hdfs partitioned by some string looking like this
 /dataset/city=London/data.parquet
 /dataset/city=NewYork/data.parquet
 /dataset/city=Paris/data.paruqet
 ….

 I am trying to get to load it using sqlContext using
 sqlcontext.parquetFile(
 hdfs://some ip:8029/dataset/ what do i put here 

 No leads so far. is there i can load the partitions ? I am running on
 cluster and not local..
 -V



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-1-and-Parquet-Partitions-tp22792.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Re: sparksql running slow while joining 2 tables.

2015-05-05 Thread Olivier Girardot
Can you activate your eventLogs and send them us ?
Thank you !

Le mar. 5 mai 2015 à 04:56, luohui20001 luohui20...@sina.com a écrit :

 Yes,just by default 1 executor.thanks



 发自我的小米手机
 在 2015年5月4日 下午10:01,ayan guha guha.a...@gmail.com写道:

 Are you using only 1 executor?

 On Mon, May 4, 2015 at 11:07 PM, luohui20...@sina.com wrote:

 hi Olivier

 spark1.3.1, with java1.8.0.45

 and add 2 pics .

 it seems like a GC issue. I also tried with different parameters like
 memory size of driverexecutor, memory fraction, java opts...

 but this issue still happens.


 

 Thanksamp;Best regards!
 罗辉 San.Luo

 - 原始邮件 -
 发件人:Olivier Girardot ssab...@gmail.com
 收件人:luohui20...@sina.com, user user@spark.apache.org
 主题:Re: sparksql running slow while joining 2 tables.
 日期:2015年05月04日 20点46分

 Hi,
 What is you Spark version ?

 Regards,

 Olivier.

 Le lun. 4 mai 2015 à 11:03, luohui20...@sina.com a écrit :

 hi guys

 when i am running a sql  like select a.name,a.startpoint,a.endpoint,
 a.piece from db a join sample b on (a.name = b.name) where (b.startpoint
  a.startpoint + 25); I found sparksql running slow in minutes which may
 caused by very long GC and shuffle time.


table db is created from a txt file size at 56mb while table
 sample sized at 26mb, both at small size.

my spark cluster is a standalone  pseudo-distributed spark
 cluster with 8g executor and 4g driver manager.

any advises? thank you guys.



 

 Thanksamp;Best regards!
 罗辉 San.Luo

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --
 Best Regards,
 Ayan Guha




Re: sparksql running slow while joining 2 tables.

2015-05-04 Thread Olivier Girardot
Hi,
What is you Spark version ?

Regards,

Olivier.

Le lun. 4 mai 2015 à 11:03, luohui20...@sina.com a écrit :

 hi guys

 when i am running a sql  like select a.name,a.startpoint,a.endpoint,
 a.piece from db a join sample b on (a.name = b.name) where (b.startpoint
  a.startpoint + 25); I found sparksql running slow in minutes which may
 caused by very long GC and shuffle time.


table db is created from a txt file size at 56mb while table sample
 sized at 26mb, both at small size.

my spark cluster is a standalone  pseudo-distributed spark
 cluster with 8g executor and 4g driver manager.

any advises? thank you guys.



 

 Thanksamp;Best regards!
 罗辉 San.Luo

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


Re: AJAX with Apache Spark

2015-05-04 Thread Olivier Girardot
Hi Sergio,
you shouldn't architecture it this way, rather update a storage with Spark
Streaming that your Play App will query.
For example a Cassandra table, or Redis, or anything that will be able to
answer you in milliseconds, rather than querying the Spark Streaming
program.

Regards,

Olivier.

Le lun. 4 mai 2015 à 20:08, Sergio Jiménez Barrio drarse.a...@gmail.com a
écrit :

 Hi,

  I am trying create a DashBoard of a job of Apache Spark. I need run Spark
 Streaming 24/7 and when recive a ajax request this answer with the actual
 state of the job. I have created the client, and the program in Spark. I
 tried create the service of response with play, but this run the program
 with a request. I want send the accumulator of spark program with a request.

 Sorry for my explanation. Any idea? Maybe with Play?

 Thanks



Re: Drop a column from the DataFrame.

2015-05-03 Thread Olivier Girardot
great thx

Le sam. 2 mai 2015 à 23:58, Ted Yu yuzhih...@gmail.com a écrit :

 This is coming in 1.4.0
 https://issues.apache.org/jira/browse/SPARK-7280



 On May 2, 2015, at 2:27 PM, Olivier Girardot ssab...@gmail.com wrote:

 Sounds like a patch for a drop method...

 Le sam. 2 mai 2015 à 21:03, dsgriffin dsgrif...@gmail.com a écrit :

 Just use select() to create a new DataFrame with only the columns you
 want.
 Sort of the opposite of what you want -- but you can select all but the
 columns you want minus the one you don. You could even use a filter to
 remove just the one column you want on the fly:

 myDF.select(myDF.columns.filter(_ != column_you_do_not_want).map(colname
 = new Column(colname)).toList : _* )



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Drop-a-column-from-the-DataFrame-tp22711p22737.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Can I group elements in RDD into different groups and let each group share some elements?

2015-05-02 Thread Olivier Girardot
Did you look at the cogroup transformation or the cartesian transformation ?

Regards,

Olivier.

Le sam. 2 mai 2015 à 22:01, Franz Chien franzj...@gmail.com a écrit :

 Hi all,

 Can I group elements in RDD into different groups and let each group share
 elements? For example, I have 10,000 elements in RDD from e1 to e1, and
 I want to group and aggregate them by another mapping with size of 2000,
 ex: ( (e1,e42), (e1,e554), (e3, e554)…… (2000th group))

 My first approach was to filter the RDD with mapping rules for 2000 times,
 and then union them together. However, it ran forever. Does SPARK provide a
 way to group elements in RDD like this please?


 Thanks,


 Franz



Re: to split an RDD to multiple ones?

2015-05-02 Thread Olivier Girardot
I guess :

val srdd_s1 = srdd.filter(_.startsWith(s1_)).sortBy(_)
val srdd_s2 = srdd.filter(_.startsWith(s2_)).sortBy(_)
val srdd_s3 = srdd.filter(_.startsWith(s3_)).sortBy(_)

Regards,

Olivier.

Le sam. 2 mai 2015 à 22:53, Yifan LI iamyifa...@gmail.com a écrit :

 Hi,

 I have an RDD *srdd* containing (unordered-)data like this:
 s1_0, s3_0, s2_1, s2_2, s3_1, s1_3, s1_2, …

 What I want is (it will be much better if they could be in ascending
 order):
 *srdd_s1*:
 s1_0, s1_1, s1_2, …, s1_n
 *srdd_s2*:
 s2_0, s2_1, s2_2, …, s2_n
 *srdd_s3*:
 s3_0, s3_1, s3_2, …, s3_n
 …
 …

 Have any idea? Thanks in advance! :)


 Best,
 Yifan LI








Re: Drop a column from the DataFrame.

2015-05-02 Thread Olivier Girardot
Sounds like a patch for a drop method...

Le sam. 2 mai 2015 à 21:03, dsgriffin dsgrif...@gmail.com a écrit :

 Just use select() to create a new DataFrame with only the columns you want.
 Sort of the opposite of what you want -- but you can select all but the
 columns you want minus the one you don. You could even use a filter to
 remove just the one column you want on the fly:

 myDF.select(myDF.columns.filter(_ != column_you_do_not_want).map(colname
 = new Column(colname)).toList : _* )



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Drop-a-column-from-the-DataFrame-tp22711p22737.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index:

2015-05-02 Thread Olivier Girardot
Can you post your code, otherwise there's not much we can do.

Regards,

Olivier.

Le sam. 2 mai 2015 à 21:15, shahab shahab.mok...@gmail.com a écrit :

 Hi,

 I am using sprak-1.2.0 and I used Kryo serialization but I get the
 following excepton.

 java.io.IOException: com.esotericsoftware.kryo.KryoException:
 java.lang.IndexOutOfBoundsException: Index: 3448, Size: 1

 I do apprecciate if anyone could tell me how I can resolve this?

 best,
 /Shahab



Best strategy for Pandas - Spark

2015-04-30 Thread Olivier Girardot
Hi everyone,
Let's assume I have a complex workflow of more than 10 datasources as input
- 20 computations (some creating intermediary datasets and some merging
everything for the final computation) - some taking on average 1 minute to
complete and some taking more than 30 minutes.

What would be for you the best strategy to port this to Apache Spark ?

   - Transform the whole flow into a Spark Job (PySpark or Scala)
   - Transform only part of the flow (the heavy lifting ~30 min parts)
   using the same language (PySpark)
   - Transform only part of the flow and pipe the rest from Scala to Python

Regards,

Olivier.


Dataframe filter based on another Dataframe

2015-04-29 Thread Olivier Girardot
Hi everyone,
what is the most efficient way to filter a DataFrame on a column from
another Dataframe's column. The best idea I had, was to join the two
dataframes :

 val df1 : Dataframe
 val df2: Dataframe
 df1.join(df2, df1(id) === df2(id), inner)

But I end up (obviously) with the id column twice.
Another approach would be to filter df1 but I can't seem to get this to
work using df2's column as a base

Any idea ?

Regards,

Olivier.


Re: Dataframe filter based on another Dataframe

2015-04-29 Thread Olivier Girardot
You mean after joining ? Sure, my question was more if there was any best
practice preferred to joining the other dataframe for filtering.

Regards,

Olivier.

Le mer. 29 avr. 2015 à 13:23, Olivier Girardot ssab...@gmail.com a écrit :

 Hi everyone,
 what is the most efficient way to filter a DataFrame on a column from
 another Dataframe's column. The best idea I had, was to join the two
 dataframes :

  val df1 : Dataframe
  val df2: Dataframe
  df1.join(df2, df1(id) === df2(id), inner)

 But I end up (obviously) with the id column twice.
 Another approach would be to filter df1 but I can't seem to get this to
 work using df2's column as a base

 Any idea ?

 Regards,

 Olivier.



How to distribute Spark computation recipes

2015-04-27 Thread Olivier Girardot
Hi everyone,
I know that any RDD is related to its SparkContext and the associated
variables (broadcast, accumulators), but I'm looking for a way to
serialize/deserialize full RDD computations ?

@rxin Spark SQL is, in a way, already doing this but the parsers are
private[sql], is there any way to reuse this work to get Logical/Physical
Plans in  out of Spark ?

Regards,

Olivier.


Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error

2015-04-21 Thread Olivier Girardot
Hi Sourav,
Can you post your updateFunc as well please ?

Regards,

Olivier.

Le mar. 21 avr. 2015 à 12:48, Sourav Chandra sourav.chan...@livestream.com
a écrit :

 Hi,

 We are building a spark streaming application which reads from kafka, does
 updateStateBykey based on the received message type and finally stores into
 redis.

 After running for few seconds the executor process get killed by throwing
 OutOfMemory error.

 The code snippet is below:


 *NoOfReceiverInstances = 1*

 *val kafkaStreams = (1 to NoOfReceiverInstances).map(*
 *  _ = KafkaUtils.createStream(ssc, ZKQuorum, ConsumerGroup, TopicsMap)*
 *)*
 *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long,
 Long)]) = {...}*


 *ssc.union(kafkaStreams).map(KafkaMessageMapper(_)).filter(...)..updateStateByKey(updateFunc).foreachRDD(_.foreachPartition(RedisHelper.update(_)))*



 *object RedisHelper {*
 *  private val client = scredis.Redis(*
 *
 ConfigFactory.parseProperties(System.getProperties).getConfig(namespace)*
 *  )*

 *  def update(**itr: Iterator[(String, (Long, Long))]) {*
 *// redis save operation*
 *  }*

 *}*


 *Below is the spark configuration:*


 *spark.app.name http://spark.app.name = XXX*
 *spark.jars = .jar*
 *spark.home = /spark-1.1.1-bin-hadoop2.4*
 *spark.executor.memory = 1g*
 *spark.streaming.concurrentJobs = 1000*
 *spark.logConf = true*
 *spark.cleaner.ttl = 3600 //in milliseconds*
 *spark.default.parallelism = 12*
 *spark.executor.extraJavaOptions = -Xloggc:gc.log -XX:+PrintGCDetails
 -XX:+UseConcMarkSweepGC -XX:HeapDumpPath=1.hprof
 -XX:+HeapDumpOnOutOfMemoryError*
 *spark.executor.logs.rolling.strategy = size*
 *spark.executor.logs.rolling.size.maxBytes = 104857600 // 100 MB*
 *spark.executor.logs.rolling.maxRetainedFiles = 10*
 *spark.serializer = org.apache.spark.serializer.KryoSerializer*
 *spark.kryo.registrator = xxx.NoOpKryoRegistrator*


 other configurations are below

 *streaming {*
 *// All streaming context related configs should come here*
 *batch-duration = 1 second*
 *checkpoint-directory = /tmp*
 *checkpoint-duration = 10 seconds*
 *slide-duration = 1 second*
 *window-duration = 1 second*
 *partitions-for-shuffle-task = 32*
 *  }*
 *  kafka {*
 *no-of-receivers = 1*
 *zookeeper-quorum = :2181*
 *consumer-group = x*
 *topic = x:2*
 *  }*

 We tried different combinations like
  - with spark 1.1.0 and 1.1.1.
  - by increasing executor memory
  - by changing the serialization strategy (switching between kryo and
 normal java)
  - by changing broadcast strategy (switching between http and torrent
 broadcast)


 Can anyone give any insight what we are missing here? How can we fix this?

 Due to akka version mismatch with some other libraries we cannot upgrade
 the spark version.

 Thanks,
 --

 Sourav Chandra

 Senior Software Engineer

 · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

 sourav.chan...@livestream.com

 o: +91 80 4121 8723

 m: +91 988 699 3746

 skype: sourav.chandra

 Livestream

 Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
 Block, Koramangala Industrial Area,

 Bangalore 560034

 www.livestream.com



Re: Can a map function return null

2015-04-18 Thread Olivier Girardot
You can return an RDD with null values inside, and afterwards filter on
item != null
In scala (or even in Java 8) you'd rather use Option/Optional, and in Scala
they're directly usable from Spark.
Exemple :

 sc.parallelize(1 to 1000).flatMap(item = if (item % 2 ==0) Some(item)
else None).collect()

res0: Array[Int] = Array(2, 4, 6, )

Regards,

Olivier.

Le sam. 18 avr. 2015 à 20:44, Steve Lewis lordjoe2...@gmail.com a écrit :

 I find a number of cases where I have an JavaRDD and I wish to transform
 the data and depending on a test return 0 or one item (don't suggest a
 filter - the real case is more complex). So I currently do something like
 the following - perform a flatmap returning a list with 0 or 1 entry
 depending on the isUsed function.

  JavaRDDFoo original = ...
   JavaRDDFoo words = original.flatMap(new FlatMapFunctionFoo, Foo() {
 @Override
 public IterableFoo call(final Foo s) throws Exception {
 ListFoo ret = new ArrayListFoo();
   if(isUsed(s))
ret.add(transform(s));
 return ret; // contains 0 items if isUsed is false
 }
 });

 My question is can I do a map returning the transformed data and null if
 nothing is to be returned. as shown below - what does a Spark do with a map
 function returning null

 JavaRDDFoo words = original.map(new MapFunctionString, String() {
 @Override
   Foo  call(final Foo s) throws Exception {
 ListFoo ret = new ArrayListFoo();
   if(isUsed(s))
return transform(s);
 return null; // not used - what happens now
 }
 });






Re: Build spark failed with maven

2015-02-14 Thread Olivier Girardot
Hi,
this was not reproduced for me, what kind of jdk are you using for the zinc
server ?

Regards,

Olivier.

2015-02-11 5:08 GMT+01:00 Yi Tian tianyi.asiai...@gmail.com:

  Hi, all

 I got an ERROR when I build spark master branch with maven (commit:
 2d1e916730492f5d61b97da6c483d3223ca44315)

 [INFO]
 [INFO] 
 
 [INFO] Building Spark Project Catalyst 1.3.0-SNAPSHOT
 [INFO] 
 
 [INFO]
 [INFO] --- maven-enforcer-plugin:1.3.1:enforce (enforce-versions) @ 
 spark-catalyst_2.10 ---
 [INFO]
 [INFO] --- build-helper-maven-plugin:1.8:add-source (add-scala-sources) @ 
 spark-catalyst_2.10 ---
 [INFO] Source directory: 
 /Users/tianyi/github/community/apache-spark/sql/catalyst/src/main/scala added.
 [INFO]
 [INFO] --- maven-remote-resources-plugin:1.5:process (default) @ 
 spark-catalyst_2.10 ---
 [INFO]
 [INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ 
 spark-catalyst_2.10 ---
 [INFO] Using 'UTF-8' encoding to copy filtered resources.
 [INFO] skip non existing resourceDirectory 
 /Users/tianyi/github/community/apache-spark/sql/catalyst/src/main/resources
 [INFO] Copying 3 resources
 [INFO]
 [INFO] --- scala-maven-plugin:3.2.0:compile (scala-compile-first) @ 
 spark-catalyst_2.10 ---
 [INFO] Using zinc server for incremental compilation
 [INFO] compiler plugin: 
 BasicArtifact(org.scalamacros,paradise_2.10.4,2.0.1,null)
 [info] Compiling 69 Scala sources and 3 Java sources to 
 /Users/tianyi/github/community/apache-spark/sql/catalyst/target/scala-2.10/classes...[error]
  
 /Users/tianyi/github/community/apache-spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala:314:
  polymorphic expression cannot be instantiated to expected type;
 [error]  found   : [T(in method 
 apply)]org.apache.spark.sql.catalyst.dsl.ScalaUdfBuilder[T(in method apply)]
 [error]  required: 
 org.apache.spark.sql.catalyst.dsl.package.ScalaUdfBuilder[T(in method 
 functionToUdfBuilder)]
 [error]   implicit def functionToUdfBuilder[T: TypeTag](func: Function1[_, 
 T]): ScalaUdfBuilder[T] = ScalaUdfBuilder(func)

 Any suggestion?
 ​



Re: Opening Spark on IntelliJ IDEA

2014-11-29 Thread Olivier Girardot
Hi,
are you using spark for a java or scala project and can you post your pom
file please ?

Regards,

Olivier.

2014-11-27 7:07 GMT+01:00 Taeyun Kim taeyun@innowireless.com:

 Hi,



 An information about the error.

 On File | Project Structure window, the following error message is
 displayed with pink background:



 Library ‘Maven: org.scala-lang:scala-compiler-bundle:2.10.4’ is not used



 Can it be a hint?



 *From:* Taeyun Kim [mailto:taeyun@innowireless.com]
 *Sent:* Thursday, November 27, 2014 3:00 PM
 *To:* 'user'
 *Subject:* Opening Spark on IntelliJ IDEA



 Hi,



 I’m trying to open the Spark source code with IntelliJ IDEA.

 I opened pom.xml on the Spark source code root directory.

 Project tree is displayed in the Project tool window.

 But, when I open a source file, say
 org.apache.spark.deploy.yarn.ClientBase.scala, a lot of red marks shows on
 the editor scroll bar.

 It is the ‘Cannot resolve symbol’ error. Even it cannot resolve
 StringOps.format.

 How can it be fixed?



 The versions I’m using are as follows:

 - OS: Windows 7

 - IntelliJ IDEA: 13.1.6

 - Scala plugin: 0.41.2

 - Spark source code: 1.1.1 (with a few file modified by me)



 I’ve tried to fix this and error state changed somewhat, but eventually I
 gave up fixing it on my own (with googling) and deleted .idea folder and
 started over. So now I’m seeing the errors described above.



 Thank you.





Re: Cannot access data after a join (error: value _1 is not a member of Product with Serializable)

2014-11-19 Thread Olivier Girardot
can you please post the full source of your code and some sample data to
run it on ?

2014-11-19 16:23 GMT+01:00 YaoPau jonrgr...@gmail.com:

 I joined two datasets together, and my resulting logs look like this:


 (975894369,((72364,20141112T170627,web,MEMPHIS,AR,US,Central),(Male,John,Smith)))

 (253142991,((30058,20141112T171246,web,ATLANTA16,GA,US,Southeast),(Male,Bob,Jones)))

 (295305425,((28110,20141112T170454,iph,CHARLOTTE2,NC,US,Southeast),(Female,Mary,Williams)))

 When I try to access the newly-joined data with JoinedInv.map(line =
 line._2._2._1) I get the following error:

 [ERROR]
 error: value _1 is not a member of Product with Serializable
 [INFO]   val getOne = JoinedInv.map(line = line._2._2._1)
 [INFO] ^
 [ERROR] error: value foreach is not a member of Array[Nothing]
 [INFO]   getOne.take(10).foreach(println)
 [INFO]^

 It looks like there are some rows where a JOIN did not occur (no key match
 in the joined dataset), but because I can't access line._2._2._1 I don't
 know of a way to check for that.  I can access line._2._2 but line._2._2
 does not have the length attribute.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-access-data-after-a-join-error-value-1-is-not-a-member-of-Product-with-Serializable-tp19272.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: default parallelism bug?

2014-10-21 Thread Olivier Girardot
Hi,
what do you mean by pretty small ? How big is your file ?

Regards,

Olivier.

2014-10-21 6:01 GMT+02:00 Kevin Jung itsjb.j...@samsung.com:

 I use Spark 1.1.0 and set these options to spark-defaults.conf
 spark.scheduler.mode FAIR
 spark.cores.max 48
 spark.default.parallelism 72

 Thanks,
 Kevin



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/default-parallelism-bug-tp16787p16894.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Convert Iterable to RDD

2014-10-21 Thread Olivier Girardot
I don't think this is provided out of the box, but you can use toSeq on
your Iterable and if the Iterable is lazy, it should stay that way for the
Seq.
And then you can use sc.parallelize(my-iterable.toSeq) so you'll have your
RDD.

For the Iterable[Iterable[T]] you can flatten it and then create your RDD
from the corresponding Iterable.

Regards,

Olivier.


2014-10-21 5:07 GMT+02:00 Dai, Kevin yun...@ebay.com:

  In addition, how to convert Iterable[Iterable[T]] to RDD[T]



 Thanks,

 Kevin.



 *From:* Dai, Kevin [mailto:yun...@ebay.com]
 *Sent:* 2014年10月21日 10:58
 *To:* user@spark.apache.org
 *Subject:* Convert Iterable to RDD



 Hi, All



 Is there any way to convert iterable to RDD?



 Thanks,

 Kevin.



Re: RDD to Multiple Tables SparkSQL

2014-10-21 Thread Olivier Girardot
If you already know your keys the best way would be to extract
one RDD per key (it would not bring the content back to the master and you
can take advantage of the caching features) and then execute a
registerTempTable by Key.

But I'm guessing, you don't know the keys in advance, and in this case, I
think it becomes a very confusing point to put everything in different
tables,
First of all - how would you query it afterwards ?

Regards,

Olivier.

2014-10-20 13:02 GMT+02:00 critikaled isasmani@gmail.com:

 Hi I have a rdd which I want to register as multiple tables based on key

 
 val context = new SparkContext(conf)
 val sqlContext = new org.apache.spark.sql.hive.HiveContext(context)
 import sqlContext.createSchemaRDD

 case class KV(key:String,id:String,value:String)
 val logsRDD = context.textFile(logs, 10).map{line=
   val Array(key,id,value) = line split ' '
   (key,id,value)
 }.registerTempTable(KVS)

 I want to store the above information to multiple tables based on key
 without bringing the entire data to master

 Thanks in advance.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/RDD-to-Multiple-Tables-SparkSQL-tp16807.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org