Re: Spark 2.4.1 on Kubernetes - DNS resolution of driver fails
Hi, I did not try on another vendor, so I can't say if it's only related to gke, and no, I did not notice anything on the kubelet or kube-dns processes... Regards Le ven. 3 mai 2019 à 03:05, Li Gao a écrit : > hi Olivier, > > This seems a GKE specific issue? have you tried on other vendors ? Also on > the kubelet nodes did you notice any pressure on the DNS side? > > Li > > > On Mon, Apr 29, 2019, 5:43 AM Olivier Girardot < > o.girar...@lateral-thoughts.com> wrote: > >> Hi everyone, >> I have ~300 spark job on Kubernetes (GKE) using the cluster auto-scaler, >> and sometimes while running these jobs a pretty bad thing happens, the >> driver (in cluster mode) gets scheduled on Kubernetes and launches many >> executor pods. >> So far so good, but the k8s "Service" associated to the driver does not >> seem to be propagated in terms of DNS resolution so all the executor fails >> with a "spark-application-..cluster.svc.local" does not exists. >> >> All executors failing the driver should be failing too, but it considers >> that it's a "pending" initial allocation and stay stuck forever in a loop >> of "Initial job has not accepted any resources, please check Cluster UI" >> >> Has anyone else observed this king of behaviour ? >> We had it on 2.3.1 and I upgraded to 2.4.1 but this issue still seems to >> exist even after the "big refactoring" in the kubernetes cluster scheduler >> backend. >> >> I can work on a fix / workaround but I'd like to check with you the >> proper way forward : >> >>- Some processes (like the airflow helm recipe) rely on a "sleep 30s" >>before launching the dependent pods (that could be added to >>/opt/entrypoint.sh used in the kubernetes packing) >>- We can add a simple step to the init container trying to do the DNS >>resolution and failing after 60s if it did not work >> >> But these steps won't change the fact that the driver will stay stuck >> thinking we're still in the case of the Initial allocation delay. >> >> Thoughts ? >> >> -- >> *Olivier Girardot* >> o.girar...@lateral-thoughts.com >> >
Spark 2.4.1 on Kubernetes - DNS resolution of driver fails
Hi everyone, I have ~300 spark job on Kubernetes (GKE) using the cluster auto-scaler, and sometimes while running these jobs a pretty bad thing happens, the driver (in cluster mode) gets scheduled on Kubernetes and launches many executor pods. So far so good, but the k8s "Service" associated to the driver does not seem to be propagated in terms of DNS resolution so all the executor fails with a "spark-application-..cluster.svc.local" does not exists. All executors failing the driver should be failing too, but it considers that it's a "pending" initial allocation and stay stuck forever in a loop of "Initial job has not accepted any resources, please check Cluster UI" Has anyone else observed this king of behaviour ? We had it on 2.3.1 and I upgraded to 2.4.1 but this issue still seems to exist even after the "big refactoring" in the kubernetes cluster scheduler backend. I can work on a fix / workaround but I'd like to check with you the proper way forward : - Some processes (like the airflow helm recipe) rely on a "sleep 30s" before launching the dependent pods (that could be added to /opt/entrypoint.sh used in the kubernetes packing) - We can add a simple step to the init container trying to do the DNS resolution and failing after 60s if it did not work But these steps won't change the fact that the driver will stay stuck thinking we're still in the case of the Initial allocation delay. Thoughts ? -- *Olivier Girardot* o.girar...@lateral-thoughts.com
Back to SQL
Hi everyone, Is there any known way to go from a Spark SQL Logical Plan (optimised ?) Back to a SQL query ? Regards, Olivier.
Spark Structured Streaming and compacted topic in Kafka
Hi everyone, I'm aware of the issue regarding direct stream 0.10 consumer in spark and compacted topics (c.f. https://issues.apache.org/jira/browse/SPARK-17147). Is there any chance that spark structured-streaming kafka is compatible with compacted topics ? Regards, -- *Olivier Girardot*
Nested "struct" fonction call creates a compilation error in Spark SQL
Hi everyone, when we create recursive calls to "struct" (up to 5 levels) for extending a complex datastructure we end up with the following compilation error : org.codehaus.janino.JaninoRuntimeException: Code of method "(I[Lscala/collection/Iterator;)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" grows beyond 64 KB The CreateStruct code itself is properly using the ctx.splitExpression command but the "end result" of the df.select( struct(struct(struct() ))) ends up being too much. Should I open a JIRA or is there a workaround ? Regards, -- *Olivier Girardot* | Associé o.girar...@lateral-thoughts.com
Re: Pyspark 2.1.0 weird behavior with repartition
I kinda reproduced that, with pyspark 2.1 also for hadoop 2.6 and with python 3.x I'll look into it a bit more after I've fixed a few other issues regarding the salting of strings on the cluster. 2017-01-30 20:19 GMT+01:00 Blaž Šnuderl <snud...@gmail.com>: > I am loading a simple text file using pyspark. Repartitioning it seems to > produce garbage data. > > I got this results using spark 2.1 prebuilt for hadoop 2.7 using pyspark > shell. > > >>> sc.textFile("outc").collect() > [u'a', u'b', u'c', u'd', u'e', u'f', u'g', u'h', u'i', u'j', u'k', u'l'] > >>> sc.textFile("outc", use_unicode=False).collect() > ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l'] > > Repartitioning seems to produce garbarge and also only only 2 records here > >>> sc.textFile("outc", use_unicode=False).repartition(10).collect() > ['\x80\x02]q\x01(U\x01aU\x01bU\x01cU\x01dU\x01eU\x01fU\x01ge.', > '\x80\x02]q\x01(U\x01hU\x01iU\x01jU\x01kU\x01le.'] > >>> sc.textFile("outc", use_unicode=False).repartition(10).count() > 2 > > > Without setting use_unicode=False we can't even repartition at all > >>> sc.textFile("outc").repartition(19).collect() > Traceback (most recent call last): > File "", line 1, in > File > "/home/snuderl/scrappstore/thirdparty/spark-2.1.0-bin-hadoop > 2.7/python/pyspark/rdd.py", > line 810, in collect > return list(_load_from_socket(port, self._jrdd_deserializer)) > File > "/home/snuderl/scrappstore/thirdparty/spark-2.1.0-bin-hadoop > 2.7/python/pyspark/rdd.py", > line 140, in _load_from_socket > for item in serializer.load_stream(rf): > File > "/home/snuderl/scrappstore/thirdparty/spark-2.1.0-bin-hadoop > 2.7/python/pyspark/serializers.py", > line 529, in load_stream > yield self.loads(stream) > File > "/home/snuderl/scrappstore/thirdparty/spark-2.1.0-bin-hadoop > 2.7/python/pyspark/serializers.py", > line 524, in loads > return s.decode("utf-8") if self.use_unicode else s > File > "/home/snuderl/scrappstore/virtualenv/lib/python2.7/encodings/utf_8.py", > line 16, in decode > return codecs.utf_8_decode(input, errors, True) > UnicodeDecodeError: 'utf8' codec can't decode byte 0x80 in position 0: > invalid start byte > > > > Input file contents: > a > b > c > d > e > f > g > h > i > j > k > l > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/Pyspark-2-1-0-weird-behavior-with-repa > rtition-tp28350.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- *Olivier Girardot* | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: Nested ifs in sparksql
Are you using the "case when" functions ? what do you mean by slow ? can you share a snippet ? On Tue, Jan 10, 2017 8:15 PM, Georg Heiler georg.kf.hei...@gmail.com wrote: Maybe you can create an UDF? Raghavendra Pandey <raghavendra.pan...@gmail.com> schrieb am Di., 10. Jan. 2017 um 20:04 Uhr: I have of around 41 level of nested if else in spark sql. I have programmed it using apis on dataframe. But it takes too much time. Is there anything I can do to improve on time here? Olivier Girardot| Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: Could not parse Master URL for Mesos on Spark 2.1.0
nop, there is no "distribution", no spark-submit at the start of my process.But I found the problem, the behavior when loading mesos native dependency changed, and the static initialization block inside org.apache.mesos.MesosSchedulerDriver needed the specific reference to libmesos-1.0.0.so. So just for the record, setting the env variable MESOS_NATIVE_JAVA_LIBRARY="//libmesos-1.0.0.so" fixed the whole thing. Thanks for the help ! @michael if you want to talk about the setup we're using, we can talk about it directly . On Tue, Jan 10, 2017 9:31 PM, Michael Gummelt mgumm...@mesosphere.io wrote: What do you mean your driver has all the dependencies packaged? What are "all the dependencies"? Is the distribution you use to launch your driver built with -Pmesos? On Tue, Jan 10, 2017 at 12:18 PM, Olivier Girardot < o.girar...@lateral-thoughts.com> wrote: Hi Michael,I did so, but it's not exactly the problem, you see my driver has all the dependencies packaged, and only the executors fetch via the spark.executor.uri the tgz,The strange thing is that I see in my classpath the org.apache.mesos:mesos-1.0.0-shaded-protobuf dependency packaged in the final dist of my app…So everything should work in theory. On Tue, Jan 10, 2017 7:22 PM, Michael Gummelt mgumm...@mesosphere.io wrote: Just build with -Pmesos http://spark.apache.org/docs/latest/building-spark.html# building-with-mesos-support On Tue, Jan 10, 2017 at 8:56 AM, Olivier Girardot < o.girar...@lateral-thoughts.com> wrote: I had the same problem, added spark-mesos as dependency and now I get : [2017-01-10 17:45:16,575] {bash_operator.py:77} INFO - Exception in thread "main" java.lang.NoClassDefFoundError: Could not initialize class org.apache.mesos.MesosSchedulerDriver[2017-01-10 17:45:16,576] {bash_operator.py:77} INFO - at org.apache.spark.scheduler.clu ster.mesos.MesosSchedulerUtils$class.createSchedulerDriver(M esosSchedulerUtils.scala:105)[2017-01-10 17:45:16,576] {bash_operator.py:77} INFO - at org.apache.spark.scheduler.cluster.mesos.MesosCoarseGrainedS chedulerBackend.createSchedulerDriver(MesosCoarseGrainedSchedulerBackend. scala:48)[2017-01-10 17:45:16,576] {bash_operator.py:77} INFO - at org.apache.spark.scheduler.cluster.mesos.MesosCoarseGrainedS chedulerBackend.start(MesosCoarseGrainedSchedulerBackend.scala:155)[2017-01-10 17:45:16,577] {bash_operator.py:77} INFO - at org.apache.spark.scheduler.Tas kSchedulerImpl.start(TaskSchedulerImpl.scala:156)[2017-01-10 17:45:16,577] {bash_operator.py:77} INFO - at org.apache.spark.SparkContext. (SparkContext.scala:509)[2017-01-10 17:45:16,577] {bash_operator.py:77} INFO - at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2313) [2017-01-10 17:45:16,577] {bash_operator.py:77} INFO - at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply( SparkSession.scala:868)[2017-01-10 17:45:16,577] {bash_operator.py:77} INFO - at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply( SparkSession.scala:860)[2017-01-10 17:45:16,578] {bash_operator.py:77} INFO - at scala.Option.getOrElse(Option.scala:121)[2017-01-10 17:45:16,578] {bash_operator.py:77} INFO - at org.apache.spark.sql.SparkSess ion$Builder.getOrCreate(SparkSession.scala:860) Is there any other dependency to add for spark 2.1.0 ? On Tue, Jan 10, 2017 1:26 AM, Abhishek Bhandari abhi10...@gmail.com wrote: Glad that you found it.ᐧ On Mon, Jan 9, 2017 at 3:29 PM, Richard Siebeling <rsiebel...@gmail.com> wrote: Probably found it, it turns out that Mesos should be explicitly added while building Spark, I assumed I could use the old build command that I used for building Spark 2.0.0... Didn't see the two lines added in the documentation... Maybe these kind of changes could be added in the changelog under changes of behaviour or changes in the build process or something like that, kind regards,Richard On 9 January 2017 at 22:55, Richard Siebeling <rsiebel...@gmail.com> wrote: Hi, I'm setting up Apache Spark 2.1.0 on Mesos and I am getting a "Could not parse Master URL: 'mesos://xx.xx.xxx.xxx:5050'" error.Mesos is running fine (both the master as the slave, it's a single machine configuration). I really don't understand why this is happening since the same configuration but using a Spark 2.0.0 is running fine within Vagrant.Could someone please help? thanks in advance,Richard -- Abhishek J BhandariMobile No. +1 510 493 6205 (USA) Mobile No. +91 96387 93021 (IND)R & D DepartmentValent Software Inc. CAEmail: abhis...@valent-software.com Olivier Girardot| Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94 -- Michael Gummelt Software Engineer Mesosphere Olivier Girardot| Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94 -- Michael Gummelt Software Engineer Mesosphere Olivier Girardot| Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: Could not parse Master URL for Mesos on Spark 2.1.0
Hi Michael,I did so, but it's not exactly the problem, you see my driver has all the dependencies packaged, and only the executors fetch via the spark.executor.uri the tgz,The strange thing is that I see in my classpath the org.apache.mesos:mesos-1.0.0-shaded-protobuf dependency packaged in the final dist of my app…So everything should work in theory. On Tue, Jan 10, 2017 7:22 PM, Michael Gummelt mgumm...@mesosphere.io wrote: Just build with -Pmesos http://spark.apache.org/docs/latest/building-spark.html#building-with-mesos-support On Tue, Jan 10, 2017 at 8:56 AM, Olivier Girardot < o.girar...@lateral-thoughts.com> wrote: I had the same problem, added spark-mesos as dependency and now I get : [2017-01-10 17:45:16,575] {bash_operator.py:77} INFO - Exception in thread "main" java.lang.NoClassDefFoundError: Could not initialize class org.apache.mesos.MesosSchedulerDriver[2017-01-10 17:45:16,576] {bash_operator.py:77} INFO - at org.apache.spark.scheduler.cluster.mesos. MesosSchedulerUtils$class.createSchedulerDriver(MesosSchedulerUtils.scala:105) [2017-01-10 17:45:16,576] {bash_operator.py:77} INFO - at org.apache.spark.scheduler.cluster.mesos.MesosCoarseGrainedSchedulerBac kend.createSchedulerDriver(MesosCoarseGrainedSchedulerBackend.scala:48) [2017-01-10 17:45:16,576] {bash_operator.py:77} INFO - at org.apache.spark.scheduler.cluster.mesos.MesosCoarseGrainedSchedulerBac kend.start(MesosCoarseGrainedSchedulerBackend.scala:155)[2017-01-10 17:45:16,577] {bash_operator.py:77} INFO - at org.apache.spark.scheduler. TaskSchedulerImpl.start(TaskSchedulerImpl.scala:156)[2017-01-10 17:45:16,577] {bash_operator.py:77} INFO - at org.apache.spark.SparkContext. (SparkContext.scala:509)[2017-01-10 17:45:16,577] {bash_operator.py:77} INFO - at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2313) [2017-01-10 17:45:16,577] {bash_operator.py:77} INFO - at org.apache.spark.sql. SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:868)[2017-01-10 17:45:16,577] {bash_operator.py:77} INFO - at org.apache.spark.sql. SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:860)[2017-01-10 17:45:16,578] {bash_operator.py:77} INFO - at scala.Option.getOrElse(Option. scala:121)[2017-01-10 17:45:16,578] {bash_operator.py:77} INFO - at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:860) Is there any other dependency to add for spark 2.1.0 ? On Tue, Jan 10, 2017 1:26 AM, Abhishek Bhandari abhi10...@gmail.com wrote: Glad that you found it.ᐧ On Mon, Jan 9, 2017 at 3:29 PM, Richard Siebeling <rsiebel...@gmail.com> wrote: Probably found it, it turns out that Mesos should be explicitly added while building Spark, I assumed I could use the old build command that I used for building Spark 2.0.0... Didn't see the two lines added in the documentation... Maybe these kind of changes could be added in the changelog under changes of behaviour or changes in the build process or something like that, kind regards,Richard On 9 January 2017 at 22:55, Richard Siebeling <rsiebel...@gmail.com> wrote: Hi, I'm setting up Apache Spark 2.1.0 on Mesos and I am getting a "Could not parse Master URL: 'mesos://xx.xx.xxx.xxx:5050'" error.Mesos is running fine (both the master as the slave, it's a single machine configuration). I really don't understand why this is happening since the same configuration but using a Spark 2.0.0 is running fine within Vagrant.Could someone please help? thanks in advance,Richard -- Abhishek J BhandariMobile No. +1 510 493 6205 (USA) Mobile No. +91 96387 93021 (IND)R & D DepartmentValent Software Inc. CAEmail: abhis...@valent-software.com Olivier Girardot| Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94 -- Michael Gummelt Software Engineer Mesosphere Olivier Girardot| Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: Could not parse Master URL for Mesos on Spark 2.1.0
I had the same problem, added spark-mesos as dependency and now I get : [2017-01-10 17:45:16,575] {bash_operator.py:77} INFO - Exception in thread "main" java.lang.NoClassDefFoundError: Could not initialize class org.apache.mesos.MesosSchedulerDriver[2017-01-10 17:45:16,576] {bash_operator.py:77} INFO - at org.apache.spark.scheduler.cluster.mesos.MesosSchedulerUtils$class.createSchedulerDriver(MesosSchedulerUtils.scala:105) [2017-01-10 17:45:16,576] {bash_operator.py:77} INFO - at org.apache.spark.scheduler.cluster.mesos.MesosCoarseGrainedSchedulerBackend.createSchedulerDriver(MesosCoarseGrainedSchedulerBackend.scala:48) [2017-01-10 17:45:16,576] {bash_operator.py:77} INFO - at org.apache.spark.scheduler.cluster.mesos.MesosCoarseGrainedSchedulerBackend.start(MesosCoarseGrainedSchedulerBackend.scala:155) [2017-01-10 17:45:16,577] {bash_operator.py:77} INFO - at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:156) [2017-01-10 17:45:16,577] {bash_operator.py:77} INFO - at org.apache.spark.SparkContext.(SparkContext.scala:509)[2017-01-10 17:45:16,577] {bash_operator.py:77} INFO - at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2313)[2017-01-10 17:45:16,577] {bash_operator.py:77} INFO - at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:868) [2017-01-10 17:45:16,577] {bash_operator.py:77} INFO - at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:860) [2017-01-10 17:45:16,578] {bash_operator.py:77} INFO - at scala.Option.getOrElse(Option.scala:121)[2017-01-10 17:45:16,578] {bash_operator.py:77} INFO - at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:860) Is there any other dependency to add for spark 2.1.0 ? On Tue, Jan 10, 2017 1:26 AM, Abhishek Bhandari abhi10...@gmail.com wrote: Glad that you found it.ᐧ On Mon, Jan 9, 2017 at 3:29 PM, Richard Siebeling <rsiebel...@gmail.com> wrote: Probably found it, it turns out that Mesos should be explicitly added while building Spark, I assumed I could use the old build command that I used for building Spark 2.0.0... Didn't see the two lines added in the documentation... Maybe these kind of changes could be added in the changelog under changes of behaviour or changes in the build process or something like that, kind regards,Richard On 9 January 2017 at 22:55, Richard Siebeling <rsiebel...@gmail.com> wrote: Hi, I'm setting up Apache Spark 2.1.0 on Mesos and I am getting a "Could not parse Master URL: 'mesos://xx.xx.xxx.xxx:5050'" error.Mesos is running fine (both the master as the slave, it's a single machine configuration). I really don't understand why this is happening since the same configuration but using a Spark 2.0.0 is running fine within Vagrant.Could someone please help? thanks in advance,Richard -- Abhishek J BhandariMobile No. +1 510 493 6205 (USA) Mobile No. +91 96387 93021 (IND)R & D DepartmentValent Software Inc. CAEmail: abhis...@valent-software.com Olivier Girardot| Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: Spark SQL - Applying transformation on a struct inside an array
So, it seems the only way I found for now is a recursive handling of the Row instances directly, but to do that I have to go back to RDDs, i've put together a simple test case demonstrating the problem : import org.apache.spark.sql.{DataFrame, SparkSession} import org.scalatest.{FlatSpec, Matchers} class extends with DFInPlaceTransform FlatSpec Matchers { val spark = SparkSession.builder().appName("local""local[*]" ).master().getOrCreate() it should "access and mutate deeply nested arrays/structs" in { val df = spark.read.json(spark.sparkContext.parallelize(List( """{"a":[{"b" : "toto" }]}""".stripMargin))) df.show() df.printSchema() val result = transformInPlace("a.b", df) result.printSchema() result.show() result.schema should be (df.schema) val res = result.toJSON.take(1) res should be("""{"a":[{"b" : TOTO" }]}""") } def transformInPlace(path: String, df: DataFrame): DataFrame = { val udf = spark.udf.register("transform", (s: String) => s.toUpperCase) val paths = path.split('.') val root = paths.head import org.apache.spark.sql.functions._ df.withColumn(root, udf(df(path))) // does not work of course } } the three other solutions I see are * to create a dedicated Expression for in-place modifications of nested arrays and structs, * to use heavy explode/lateral views/group by computations, but that's bound to be inefficient * or to generate bytecode using the schema to do the nested "getRow,getSeq…" and re-create the rows once transformation is applied I'd like to open an issue regarding that use case because it's not the first or last time it comes up and I still don't see any generic solution using Dataframes.Thanks for your time,Regards, Olivier On Fri, Sep 16, 2016 10:19 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi michael,Well for nested structs, I saw in the tests the behaviour defined by SPARK-12512 for the "a.b.c" handling in withColumn, and even if it's not ideal for me, I managed to make it work anyway like that :> df.withColumn("a", struct(struct(myUDF(df("a.b.c." // I didn't put back the aliases but you see what I mean. What I'd like to make work in essence is something like that> val someFunc : String => String = ???> val myUDF = udf(someFunc)> df.withColumn("a.b[*].c", myUDF(df("a.b[*].c"))) // the fact is that in order to be consistent with the previous API, maybe I'd have to put something like a struct(array(struct(… which would be troublesome because I'd have to parse the arbitrary input string and create something like "a.b[*].c" => struct(array(struct( I realise the ambiguity implied in the kind of column expression, but it doesn't seem for now available to cleanly update data inplace at an arbitrary depth. I'll try to work on a PR that would make this possible, but any pointers would be appreciated. Regards, Olivier. On Fri, Sep 16, 2016 12:42 AM, Michael Armbrust mich...@databricks.com wrote: Is what you are looking for a withColumn that support in place modification of nested columns? or is it some other problem? On Wed, Sep 14, 2016 at 11:07 PM, Olivier Girardot < o.girar...@lateral-thoughts.com> wrote: I tried to use the RowEncoder but got stuck along the way :The main issue really is that even if it's possible (however tedious) to pattern match generically Row(s) and target the nested field that you need to modify, Rows being immutable data structure without a method like a case class's copy or any kind of lens to create a brand new object, I ended up stuck at the step "target and extract the field to update" without any way to update the original Row with the new value. To sum up, I tried : * using only dataframe's API itself + my udf - which works for nested structs as long as no arrays are along the way * trying to create a udf the can apply on Row and pattern match recursively the path I needed to explore/modify * trying to create a UDT - but we seem to be stuck in a strange middle-ground with 2.0 because some parts of the API ended up private while some stayed public making it impossible to use it now (I'd be glad if I'm mistaken) All of these failed for me and I ended up converting the rows to JSON and update using JSONPath which is…. something I'd like to avoid 'pretty please' On Thu, Sep 15, 2016 5:20 AM, Michael Allman mich...@videoamp.com wrote: Hi Guys, Have you tried org.apache.spark.sql.catalyst.encoders.RowEncoder? It's not a public API, but it is publicly accessible. I used it recently to correct some bad data in a few nested columns in a dataframe. It wasn't an easy job, but it made it possible. In my particular case I was not working with arrays. Olivier, I'm interested in seeing what you come up with. Than
Re: Help in generating unique Id in spark row
There is a way, you can use org.apache.spark.sql.functions.monotonicallyIncreasingId it will give each rows of your dataframe a unique Id On Tue, Oct 18, 2016 10:36 AM, ayan guha guha.a...@gmail.com wrote: Do you have any primary key or unique identifier in your data? Even if multiple columns can make a composite key? In other words, can your data have exactly same 2 rows with different unique ID? Also, do you have to have numeric ID? You may want to pursue hashing algorithm such as sha group to convert single or composite unique columns to an ID. On 18 Oct 2016 15:32, "Saurav Sinha" <sauravsinh...@gmail.com> wrote: Can any one help me out On Mon, Oct 17, 2016 at 7:27 PM, Saurav Sinha <sauravsinh...@gmail.com> wrote: Hi, I am in situation where I want to generate unique Id for each row. I have use monotonicallyIncreasingId but it is giving increasing values and start generating from start if it fail. I have two question here: Q1. Does this method give me unique id even in failure situation becaue I want to use that ID in my solr id. Q2. If answer to previous question is NO. Then Is there way yo generate UUID for each row which is uniqe and not updatedable. As I have come up with situation where UUID is updated val idUDF = udf(() => UUID.randomUUID().toString) val a = withColumn("alarmUUID", lit(idUDF()))a.persist(StorageLevel.MEMORY_ AND_DISK) rawDataDf.registerTempTable("rawAlarms") // I do some joines but as I reach further below I do sonthing likeb is transformation of asqlContext.sql("""Select a.alarmUUID,b.alarmUUID from a right outer join bon a.alarmUUID = b.alarmUUID""") it give output as +++| alarmUUID| alarmUUID|+++|7d33a516-5532-410...| null|| null|2439d6db-16a2-44b...| +++ -- Thanks and Regards, Saurav Sinha Contact: 9742879062 -- Thanks and Regards, Saurav Sinha Contact: 9742879062 Olivier Girardot| Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: Using Spark as a Maven dependency but with Hadoop 2.6
I know that the code itself would not be the same, but it would be useful to at least have the pom/build.sbt transitive dependencies different when fetching the artifact with a specific classifier, don't you think ?For now I've overriden them myself using the dependency versions defined in the pom.xml of spark.So it's not a blocker issue, it may be useful to document it, but a blog post would be sufficient I think. On Wed, Sep 28, 2016 7:21 PM, Sean Owen so...@cloudera.com wrote: I guess I'm claiming the artifacts wouldn't even be different in the first place, because the Hadoop APIs that are used are all the same across these versions. That would be the thing that makes you need multiple versions of the artifact under multiple classifiers. On Wed, Sep 28, 2016 at 1:16 PM, Olivier Girardot < o.girar...@lateral-thoughts.com> wrote: ok, don't you think it could be published with just different classifiers hadoop-2.6hadoop-2.4 hadoop-2.2 being the current default. So for now, I should just override spark 2.0.0's dependencies with the ones defined in the pom profile On Thu, Sep 22, 2016 11:17 AM, Sean Owen so...@cloudera.com wrote: There can be just one published version of the Spark artifacts and they have to depend on something, though in truth they'd be binary-compatible with anything 2.2+. So you merely manage the dependency versions up to the desired version in your . On Thu, Sep 22, 2016 at 7:05 AM, Olivier Girardot < o.girar...@lateral-thoughts.com> wrote: Hi,when we fetch Spark 2.0.0 as maven dependency then we automatically end up with hadoop 2.2 as a transitive dependency, I know multiple profiles are used to generate the different tar.gz bundles that we can download, Is there by any chance publications of Spark 2.0.0 with different classifier according to different versions of Hadoop available ? Thanks for your time ! Olivier Girardot Olivier Girardot| Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94 Olivier Girardot| Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: Using Spark as a Maven dependency but with Hadoop 2.6
ok, don't you think it could be published with just different classifiers hadoop-2.6hadoop-2.4 hadoop-2.2 being the current default. So for now, I should just override spark 2.0.0's dependencies with the ones defined in the pom profile On Thu, Sep 22, 2016 11:17 AM, Sean Owen so...@cloudera.com wrote: There can be just one published version of the Spark artifacts and they have to depend on something, though in truth they'd be binary-compatible with anything 2.2+. So you merely manage the dependency versions up to the desired version in your . On Thu, Sep 22, 2016 at 7:05 AM, Olivier Girardot < o.girar...@lateral-thoughts.com> wrote: Hi,when we fetch Spark 2.0.0 as maven dependency then we automatically end up with hadoop 2.2 as a transitive dependency, I know multiple profiles are used to generate the different tar.gz bundles that we can download, Is there by any chance publications of Spark 2.0.0 with different classifier according to different versions of Hadoop available ? Thanks for your time ! Olivier Girardot Olivier Girardot| Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Using Spark as a Maven dependency but with Hadoop 2.6
Hi,when we fetch Spark 2.0.0 as maven dependency then we automatically end up with hadoop 2.2 as a transitive dependency, I know multiple profiles are used to generate the different tar.gz bundles that we can download, Is there by any chance publications of Spark 2.0.0 with different classifier according to different versions of Hadoop available ? Thanks for your time ! Olivier Girardot
Re: Spark SQL - Applying transformation on a struct inside an array
Hi michael,Well for nested structs, I saw in the tests the behaviour defined by SPARK-12512 for the "a.b.c" handling in withColumn, and even if it's not ideal for me, I managed to make it work anyway like that :> df.withColumn("a", struct(struct(myUDF(df("a.b.c." // I didn't put back the aliases but you see what I mean. What I'd like to make work in essence is something like that> val someFunc : String => String = ???> val myUDF = udf(someFunc)> df.withColumn("a.b[*].c", myUDF(df("a.b[*].c"))) // the fact is that in order to be consistent with the previous API, maybe I'd have to put something like a struct(array(struct(… which would be troublesome because I'd have to parse the arbitrary input string and create something like "a.b[*].c" => struct(array(struct( I realise the ambiguity implied in the kind of column expression, but it doesn't seem for now available to cleanly update data inplace at an arbitrary depth. I'll try to work on a PR that would make this possible, but any pointers would be appreciated. Regards, Olivier. On Fri, Sep 16, 2016 12:42 AM, Michael Armbrust mich...@databricks.com wrote: Is what you are looking for a withColumn that support in place modification of nested columns? or is it some other problem? On Wed, Sep 14, 2016 at 11:07 PM, Olivier Girardot < o.girar...@lateral-thoughts.com> wrote: I tried to use the RowEncoder but got stuck along the way :The main issue really is that even if it's possible (however tedious) to pattern match generically Row(s) and target the nested field that you need to modify, Rows being immutable data structure without a method like a case class's copy or any kind of lens to create a brand new object, I ended up stuck at the step "target and extract the field to update" without any way to update the original Row with the new value. To sum up, I tried : * using only dataframe's API itself + my udf - which works for nested structs as long as no arrays are along the way * trying to create a udf the can apply on Row and pattern match recursively the path I needed to explore/modify * trying to create a UDT - but we seem to be stuck in a strange middle-ground with 2.0 because some parts of the API ended up private while some stayed public making it impossible to use it now (I'd be glad if I'm mistaken) All of these failed for me and I ended up converting the rows to JSON and update using JSONPath which is…. something I'd like to avoid 'pretty please' On Thu, Sep 15, 2016 5:20 AM, Michael Allman mich...@videoamp.com wrote: Hi Guys, Have you tried org.apache.spark.sql.catalyst.encoders.RowEncoder? It's not a public API, but it is publicly accessible. I used it recently to correct some bad data in a few nested columns in a dataframe. It wasn't an easy job, but it made it possible. In my particular case I was not working with arrays. Olivier, I'm interested in seeing what you come up with. Thanks, Michael On Sep 14, 2016, at 10:44 AM, Fred Reiss <freiss@gmail.com> wrote: +1 to this request. I talked last week with a product group within IBM that is struggling with the same issue. It's pretty common in data cleaning applications for data in the early stages to have nested lists or sets inconsistent or incomplete schema information. Fred On Tue, Sep 13, 2016 at 8:08 AM, Olivier Girardot < o.girar...@lateral-thoughts.com> wrote: Hi everyone,I'm currently trying to create a generic transformation mecanism on a Dataframe to modify an arbitrary column regardless of the underlying the schema. It's "relatively" straightforward for complex types like struct<struct<…>> to apply an arbitrary UDF on the column and replace the data "inside" the struct, however I'm struggling to make it work for complex types containing arrays along the way like struct<array<struct<…>>>. Michael Armbrust seemed to allude on the mailing list/forum to a way of using Encoders to do that, I'd be interested in any pointers, especially considering that it's not possible to output any Row or GenericRowWithSchema from a UDF (thanks to https://github.com/apache/spark/blob/v2.0.0/sql/catalyst/ src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala#L657 it seems). To sum up, I'd like to find a way to apply a transformation on complex nested datatypes (arrays and struct) on a Dataframe updating the value itself. Regards, Olivier Girardot Olivier Girardot| Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94 Olivier Girardot| Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: Spark SQL - Applying transformation on a struct inside an array
I tried to use the RowEncoder but got stuck along the way :The main issue really is that even if it's possible (however tedious) to pattern match generically Row(s) and target the nested field that you need to modify, Rows being immutable data structure without a method like a case class's copy or any kind of lens to create a brand new object, I ended up stuck at the step "target and extract the field to update" without any way to update the original Row with the new value. To sum up, I tried : * using only dataframe's API itself + my udf - which works for nested structs as long as no arrays are along the way * trying to create a udf the can apply on Row and pattern match recursively the path I needed to explore/modify * trying to create a UDT - but we seem to be stuck in a strange middle-ground with 2.0 because some parts of the API ended up private while some stayed public making it impossible to use it now (I'd be glad if I'm mistaken) All of these failed for me and I ended up converting the rows to JSON and update using JSONPath which is…. something I'd like to avoid 'pretty please' On Thu, Sep 15, 2016 5:20 AM, Michael Allman mich...@videoamp.com wrote: Hi Guys, Have you tried org.apache.spark.sql.catalyst.encoders.RowEncoder? It's not a public API, but it is publicly accessible. I used it recently to correct some bad data in a few nested columns in a dataframe. It wasn't an easy job, but it made it possible. In my particular case I was not working with arrays. Olivier, I'm interested in seeing what you come up with. Thanks, Michael On Sep 14, 2016, at 10:44 AM, Fred Reiss <freiss@gmail.com> wrote: +1 to this request. I talked last week with a product group within IBM that is struggling with the same issue. It's pretty common in data cleaning applications for data in the early stages to have nested lists or sets inconsistent or incomplete schema information. Fred On Tue, Sep 13, 2016 at 8:08 AM, Olivier Girardot < o.girar...@lateral-thoughts.com> wrote: Hi everyone,I'm currently trying to create a generic transformation mecanism on a Dataframe to modify an arbitrary column regardless of the underlying the schema. It's "relatively" straightforward for complex types like struct<struct<…>> to apply an arbitrary UDF on the column and replace the data "inside" the struct, however I'm struggling to make it work for complex types containing arrays along the way like struct<array<struct<…>>>. Michael Armbrust seemed to allude on the mailing list/forum to a way of using Encoders to do that, I'd be interested in any pointers, especially considering that it's not possible to output any Row or GenericRowWithSchema from a UDF (thanks to https://github.com/apache/spark/blob/v2.0.0/sql/catalyst/src/main/scala/org/ apache/spark/sql/catalyst/ScalaReflection.scala#L657 it seems). To sum up, I'd like to find a way to apply a transformation on complex nested datatypes (arrays and struct) on a Dataframe updating the value itself. Regards, Olivier Girardot Olivier Girardot| Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Spark SQL - Applying transformation on a struct inside an array
Hi everyone,I'm currently trying to create a generic transformation mecanism on a Dataframe to modify an arbitrary column regardless of the underlying the schema. It's "relatively" straightforward for complex types like struct<struct<…>> to apply an arbitrary UDF on the column and replace the data "inside" the struct, however I'm struggling to make it work for complex types containing arrays along the way like struct<array<struct<…>>>. Michael Armbrust seemed to allude on the mailing list/forum to a way of using Encoders to do that, I'd be interested in any pointers, especially considering that it's not possible to output any Row or GenericRowWithSchema from a UDF (thanks to https://github.com/apache/spark/blob/v2.0.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala#L657 it seems). To sum up, I'd like to find a way to apply a transformation on complex nested datatypes (arrays and struct) on a Dataframe updating the value itself. Regards, Olivier Girardot
Re: Aggregations with scala pairs
CC'ing dev list, you should open a Jira and a PR related to it to discuss it c.f. https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-ContributingCodeChanges On Wed, Aug 17, 2016 4:01 PM, Andrés Ivaldi iaiva...@gmail.com wrote: Hello, I'd like to report a wrong behavior of DataSet's API, I don´t know how I can do that. My Jira account doesn't allow me to add a Issue I'm using Apache 2.0.0 but the problem came since at least version 1.4 (given the doc since 1.3) The problem is simple to reporduce, also the work arround, if we apply agg over a DataSet with scala pairs over the same column, only one agg over that column is actualy used, this is because the toMap that reduce the pair values of the mane key to one and overwriting the value class https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = { agg((aggExpr +: aggExprs).toMap) } rewrited as somthing like this should work def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = { toDF((aggExpr +: aggExprs).map { pairExpr => strToExpr(pairExpr._2)(df(pairExpr._1).expr) }.toSeq) } regards -- Ing. Ivaldi Andres Olivier Girardot | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: Spark DF CacheTable method. Will it save data to disk?
that's another "pipeline" step to add whereas when using persist is just relevant during the lifetime of your jobs and not in HDFS but in the local disk of your executors. On Wed, Aug 17, 2016 5:56 PM, neil90 neilp1...@icloud.com wrote: >From the spark documentation(http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence) yes you can use persist on a dataframe instead of cache. All cache is, is a shorthand for the default persist storage level "MEMORY_ONLY". If you want to persist the dataframe to disk you should do dataframe.persist(StorageLevel.DISK_ONLY). IMO If reads are expensive against the DB and your afraid of failure why not just save the data as a parquet on your cluster in hive and read from there? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-DF-CacheTable-method-Will-it-save-data-to-disk-tp27533p27551.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org Olivier Girardot | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: error when running spark from oozie launcher
this is not the full stacktrace, please post the full stacktrace if you want some help On Wed, Aug 17, 2016 7:24 PM, tkg_cangkul yuza.ras...@gmail.com wrote: hi i try to submit job spark with oozie. but i've got one problem here. when i submit the same job. sometimes my job succeed but sometimes my job was failed. i've got this error message when the job was failed : org.apache.spark.launcher.CommandBuilderUtils.addPermGenSizeOpt(Ljava/util/List;)V anyone can help me to solve this? i've try to set -XX:MaxPermSize=512m -XX:PermSize=256m in spark. driver. extraJavaOptions properties but this not help enough for me. Olivier Girardot | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: Spark SQL 1.6.1 issue
your executors/driver must not have the multiple versions of spark in classpath, it may come from the cassandra connector check the pom dependencies of the version you fetched and if it's compatible with your spark version. On Thu, Aug 18, 2016 6:05 AM, thbeh th...@thbeh.com wrote: Running the query below I have been hitting - local class incompatible exception, anyone know the cause? val rdd = csc.cassandraSql("""select *, concat('Q', d_qoy) as qoy from store_sales join date_dim on ss_sold_date_sk = d_date_sk join item on ss_item_sk = i_item_sk""").groupBy("i_category").pivot("qoy").agg(round(sum("ss_sales_price")/100,2)) The source data is from TPCDS test data and I am running in Zeppelin. /INFO [2016-08-18 03:15:58,429] ({task-result-getter-2} Logging.scala[logInfo]:58) - Lost task 3.0 in stage 3.0 (TID 52) on executor ceph5.example.my: java.io.InvalidClassException (org.apache.spark.sql.catalyst.expressions.Literal; local class incompatible: stream classdesc serialVersionUID = 3305180847846277455, local class serialVersionUID = -4259705229845269663) [duplicate 1] INFO [2016-08-18 03:15:58,429] ({task-result-getter-3} Logging.scala[logInfo]:58) - Lost task 2.0 in stage 3.0 (TID 51) on executor ceph5.example.my: java.io.InvalidClassException (org.apache.spark.sql.catalyst.expressions.Literal; local class incompatible: stream classdesc serialVersionUID = 3305180847846277455, local class serialVersionUID = -4259705229845269663) [duplicate 2] INFO [2016-08-18 03:15:58,430] ({task-result-getter-3} Logging.scala[logInfo]:58) - Lost task 6.0 in stage 3.0 (TID 55) on executor ceph5.example.my: java.io.InvalidClassException (org.apache.spark.sql.catalyst.expressions.Literal; local class incompatible: stream classdesc serialVersionUID = 3305180847846277455, local class serialVersionUID = -4259705229845269663) [duplicate 3]/ Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-1-6-1-issue-tp27554.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org Olivier Girardot | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: tpcds for spark2.0
I have the same kind of issue (not using spark-sql-perf), just trying to deploy 2.0.0 on mesos. I'll keep you posted as I investigate On Wed, Jul 27, 2016 1:06 PM, kevin kiss.kevin...@gmail.com wrote: hi,all: I want to have a test about tpcds99 sql run on spark2.0. I user https://github.com/databricks/spark-sql-perf about the master version ,when I run :val tpcds = new TPCDS (sqlContext = sqlContext) I got error: scala> val tpcds = new TPCDS (sqlContext = sqlContext) error: missing or invalid dependency detected while loading class file 'Benchmarkable.class'. Could not access term typesafe in package com, because it (or its dependencies) are missing. Check your build definition for missing or conflicting dependencies. (Re-run with -Ylog-classpath to see the problematic classpath.) A full rebuild may help if 'Benchmarkable.class' was compiled against an incompatible version of com. error: missing or invalid dependency detected while loading class file 'Benchmarkable.class'. Could not access term scalalogging in value com.typesafe, because it (or its dependencies) are missing. Check your build definition for missing or conflicting dependencies. (Re-run with -Ylog-classpath to see the problematic classpath.) A full rebuild may help if 'Benchmarkable.class' was compiled against an incompatible version of com.typesafe. about spark-sql-perf-0.4.3 when I run :tables.genData("hdfs://master1:9000/tpctest", "parquet", true, false, false, false, false) I got error: Generating table catalog_sales in database to hdfs://master1:9000/tpctest/catalog_sales with save mode Overwrite. 16/07/27 18:59:59 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, slave1): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org $apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD Olivier Girardot | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: OOM exception during Broadcast
Stream.java:370) >>>> at >>>> scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>> at >>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) >>>> at >>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897) >>>> at >>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>> at >>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>> at >>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) >>>> at >>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) >>>> at >>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>> at >>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>> at >>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) >>>> at >>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) >>>> at >>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>> at >>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>> at >>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) >>>> at >>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) >>>> at >>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>> at >>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>> at >>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) >>>> at >>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) >>>> at >>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>> at >>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>> at >>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) >>>> at >>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) >>>> >>>> >>>> I'm using spark 1.5.2. Cluster nodes are amazon r3.2xlarge. The spark >>>> property maximizeResourceAllocation is set to true (executor.memory = 48G >>>> according to spark ui environment). We're also using kryo serialization and >>>> Yarn is the resource manager. >>>> >>>> Any ideas as what might be going wrong and how to debug this? >>>> >>>> Thanks, >>>> Arash >>>> >>>> >>>> >>> >>> >> >> > -- *Olivier Girardot* | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: Spark Certification
It does not contain (as of yet) anything > 1.3 (for example in depth knowledge of the Dataframe API) but you need to know about all the modules (Core, Streaming, SQL, MLLib, GraphX) Regards, Olivier. 2016-02-11 19:31 GMT+01:00 Prem Sure <premsure...@gmail.com>: > I did recently. it includes MLib & Graphx too and I felt like exam content > covered all topics till 1.3 and not the > 1.3 versions of spark. > > > On Thu, Feb 11, 2016 at 9:39 AM, Janardhan Karri <jkarri@gmail.com> > wrote: > >> I am planning to do that with databricks >> http://go.databricks.com/spark-certified-developer >> >> Regards, >> Janardhan >> >> On Thu, Feb 11, 2016 at 2:00 PM, Timothy Spann <tim.sp...@airisdata.com> >> wrote: >> >>> I was wondering that as well. >>> >>> Also is it fully updated for 1.6? >>> >>> Tim >>> http://airisdata.com/ >>> http://sparkdeveloper.com/ >>> >>> >>> From: naga sharathrayapati <sharathrayap...@gmail.com> >>> Date: Wednesday, February 10, 2016 at 11:36 PM >>> To: "user@spark.apache.org" <user@spark.apache.org> >>> Subject: Spark Certification >>> >>> Hello All, >>> >>> I am planning on taking Spark Certification and I was wondering If one >>> has to be well equipped with MLib & GraphX as well or not ? >>> >>> Please advise >>> >>> Thanks >>> >> >> > -- *Olivier Girardot* | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: Spark Application Master on Yarn client mode - Virtual memory limit
you can also activate detail GC prints to get more infos 2016-02-11 7:43 GMT+01:00 Shiva Ramagopal <tr.s...@gmail.com>: > How are you submitting/running the job - via spark-submit or as a plain > old Java program? > > If you are using spark-submit, you can control the memory setting via the > configuration parameter spark.executor.memory in spark-defaults.conf. > > If you are running it as a Java program, use -Xmx to set the maximum heap > size. > > On Thu, Feb 11, 2016 at 5:46 AM, Nirav Patel <npa...@xactlycorp.com> > wrote: > >> In Yarn we have following settings enabled so that job can use virtual >> memory to have a capacity beyond physical memory off course. >> >> >> yarn.nodemanager.vmem-check-enabled >> false >> >> >> >> yarn.nodemanager.pmem-check-enabled >> false >> >> >> vmem to pmem ration is 2:1. However spark doesn't seem to be able to >> utilize this vmem limits >> we are getting following heap space error which seemed to be contained >> within spark executor. >> >> 16/02/09 23:08:06 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED >> SIGNAL 15: SIGTERM >> 16/02/09 23:08:06 ERROR executor.Executor: Exception in task 4.0 in stage >> 7.6 (TID 22363) >> java.lang.OutOfMemoryError: Java heap space >> at java.util.IdentityHashMap.resize(IdentityHashMap.java:469) >> at java.util.IdentityHashMap.put(IdentityHashMap.java:445) >> at >> org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:159) >> at >> org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:203) >> at >> org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:202) >> at scala.collection.immutable.List.foreach(List.scala:318) >> at >> org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:202) >> at >> org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:186) >> at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:54) >> at >> org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78) >> at >> org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70) >> at >> org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31) >> at >> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278) >> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >> at org.apache.spark.scheduler.Task.run(Task.scala:88) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:744) >> >> >> >> Yarn resource manager doesn't give any indication that whether container >> ran out of phycial or virtual memory limits. >> >> Also how to profile this container memory usage? We know our data is >> skewed so some of the executor will have large data (~2M RDD objects) to >> process. I used following as executorJavaOpts but it doesn't seem to work. >> -XX:-HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -3 %p' >> -XX:HeapDumpPath=/opt/cores/spark >> >> >> >> >> >> >> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/> >> >> <https://www.nyse.com/quote/XNYS:XTLY> [image: LinkedIn] >> <https://www.linkedin.com/company/xactly-corporation> [image: Twitter] >> <https://twitter.com/Xactly> [image: Facebook] >> <https://www.facebook.com/XactlyCorp> [image: YouTube] >> <http://www.youtube.com/xactlycorporation> > > > -- *Olivier Girardot* | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Spark 1.6 - Datasets and Avro Encoders
Hi everyone, considering the new Datasets API, will there be Encoders defined for reading and writing Avro files ? Will it be possible to use already generated Avro classes ? Regards, -- *Olivier Girardot*
Re: Spark 1.6 - Datasets and Avro Encoders
I'll do, but if you want my two cents, creating a dedicated "optimised" encoder for Avro would be great (especially if it's possible to do better than plain AvroKeyValueOutputFormat with saveAsNewAPIHadoopFile :) ) Thanks for your time Michael, and happy new year :-) Regards, Olivier. 2016-01-05 19:01 GMT+01:00 Michael Armbrust <mich...@databricks.com>: > You could try with the `Encoders.bean` method. It detects classes that > have getters and setters. Please report back! > > On Tue, Jan 5, 2016 at 9:45 AM, Olivier Girardot < > o.girar...@lateral-thoughts.com> wrote: > >> Hi everyone, >> considering the new Datasets API, will there be Encoders defined for >> reading and writing Avro files ? Will it be possible to use already >> generated Avro classes ? >> >> Regards, >> >> -- >> *Olivier Girardot* >> > > -- *Olivier Girardot* | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: Lookup / Access of master data in spark streaming
That's great ! Thanks ! So to sum up, to do some kind of "always up-to-date" lookup we can use broadcast variables and re-broadcast when the data has changed using whether the "transform" RDD to RDD transformation, "foreachRDD" or transformWith. Thank you for your time Regards, 2015-10-05 23:49 GMT+02:00 Tathagata Das <t...@databricks.com>: > Yes, when old broacast objects are not referenced any more in the driver, > then associated data in the driver AND the executors will get cleared. > > On Mon, Oct 5, 2015 at 1:40 PM, Olivier Girardot < > o.girar...@lateral-thoughts.com> wrote: > >> @td does that mean that the "old" broadcasted data will in any way be >> "garbage collected" at some point if no RDD or transformation is using it >> anymore ? >> >> Regards, >> >> Olivier. >> >> 2015-04-09 21:49 GMT+02:00 Amit Assudani <aassud...@impetus.com>: >> >>> Thanks a lot TD for detailed answers. The answers lead to few more >>> questions, >>> >>> >>>1. "the transform RDD-to-RDD function runs on the driver “ - I >>>didn’t understand this, does it mean when I use transform function on >>>DStream, it is not parallelized, surely I m missing something here. >>>2. updateStateByKey I think won’t work in this use case, I have >>>three separate attribute streams ( with different frequencies ) make up >>> the >>>combined state ( i.e. Entity ) at point in time on which I want to do >>> some >>>processing. Do you think otherwise ? >>>3. transform+join seems only option so far, but any guestimate how >>>would this perform/ react on cluster ? Assuming, master data in 100s of >>>Gbs, and join is based on some row key. We are talking about slice of >>>stream data to be joined with 100s of Gbs of master data continuously. Is >>>it something can be done but should not be done ? >>> >>> Regards, >>> Amit >>> >>> From: Tathagata Das <t...@databricks.com> >>> Date: Thursday, April 9, 2015 at 3:13 PM >>> To: amit assudani <aassud...@impetus.com> >>> Cc: "user@spark.apache.org" <user@spark.apache.org> >>> Subject: Re: Lookup / Access of master data in spark streaming >>> >>> Responses inline. Hope they help. >>> >>> On Thu, Apr 9, 2015 at 8:20 AM, Amit Assudani <aassud...@impetus.com> >>> wrote: >>> >>>> Hi Friends, >>>> >>>> I am trying to solve a use case in spark streaming, I need help on >>>> getting to right approach on lookup / update the master data. >>>> >>>> Use case ( simplified ) >>>> I’ve a dataset of entity with three attributes and identifier/row key >>>> in a persistent store. >>>> >>>> Each attribute along with row key come from a different stream let’s >>>> say, effectively 3 source streams. >>>> >>>> Now whenever any attribute comes up, I want to update/sync the >>>> persistent store and do some processing, but the processing would require >>>> the latest state of entity with latest values of three attributes. >>>> >>>> I wish if I have the all the entities cached in some sort of >>>> centralized cache ( like we have data in hdfs ) within spark streaming >>>> which may be used for data local processing. But I assume there is no such >>>> thing. >>>> >>>> potential approaches I m thinking of, I suspect first two are not >>>> feasible, but I want to confirm, >>>> 1. Is Broadcast Variables mutable ? If yes, can I use it as >>>> cache for all entities sizing around 100s of GBs provided i have a cluster >>>> with enough RAM. >>>> >>> >>> Broadcast variables are not mutable. But you can always create a new >>> broadcast variable when you want and use the "latest" broadcast variable in >>> your computation. >>> >>> dstream.transform { rdd => >>> >>>val latestBroacast = getLatestBroadcastVariable() // fetch existing >>> or update+create new and return >>>val transformedRDD = rdd. .. // use latestBroacast in RDD >>> tranformations >>>transformedRDD >>> } >>> >>> Since the transform RDD-to-RDD function runs on the driver every batch >>> interval, it will always use the latest broadcast variable that
Re: ClassCastException using DataFrame only when num-executors > 2 ...
tested now against Spark 1.5.0 rc2, and same exceptions happen when num-executors > 2 : 15/08/25 10:31:10 WARN scheduler.TaskSetManager: Lost task 0.1 in stage 5.0 (TID 501, xxx): java.lang.ClassCastException: java.lang.Double cannot be cast to java.lang.Long at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getLong(rows.scala:41) at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getLong(rows.scala:220) at org.apache.spark.sql.catalyst.expressions.JoinedRow.getLong(JoinedRow.scala:85) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown Source) at org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:325) at org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:252) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 2015-08-26 11:47 GMT+02:00 Olivier Girardot <ssab...@gmail.com>: > Hi everyone, > I know this "post title" doesn't seem very logical and I agree, > we have a very complex computation using "only" pyspark dataframes and > when launching the computation on a CDH 5.3 cluster using Spark 1.5.0 rc1 > (problem is reproduced with 1.4.x). > If the number of executors is the default 2, the computation is very long > but doesn't fail. > If the number of executors is 3 or more (tested up to 20), then the > computation fails very quickly with the following error : > > *Caused by: java.lang.ClassCastException: java.lang.Double cannot be cast > to java.lang.Long* > > The complete stracktrace being : > > Driver stacktrace: > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1267) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1255) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1254) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1254) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:684) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:684) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:684) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1480) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1442) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1431) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:554) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1805) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1818) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1831) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1902) > at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:905) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) > at org.apache.spark.rdd.RDD.collect(RDD.scala:904) > at org.apache.spark.RangePartitioner$.sketch(Partitioner.scala:264) > at org.apache.spark.RangePartitioner.(Partitioner.scala:126) > at > org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:156) > at > org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:141) > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48) > ... 138 more > *Caused by: java.lang.ClassCastException: java.lang.Double cannot be cast > to java.lang.Long* > at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110) > at > org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getLong(rows.scala:41) > at > org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getLong(rows.scala:220) > at > org.apache.spark.sql.catalyst.expressions.JoinedRow.getLong(JoinedRow.scala:85) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown > Source) > at > org.apache.spark.sql.execution.Window$
Re: Spark stages very slow to complete
I have pretty much the same symptoms - the computation itself is pretty fast, but most of my computation is spent in JavaToPython steps (~15min). I'm using the Spark 1.5.0-rc1 with DataFrame and ML Pipelines. Any insights into what these steps are exactly ? 2015-06-02 9:18 GMT+02:00 Karlson ksonsp...@siberie.de: Hi, the code is some hundreds lines of Python. I can try to compose a minimal example as soon as I find the time, though. Any ideas until then? Would you mind posting the code? On 2 Jun 2015 00:53, Karlson ksonsp...@siberie.de wrote: Hi, In all (pyspark) Spark jobs, that become somewhat more involved, I am experiencing the issue that some stages take a very long time to complete and sometimes don't at all. This clearly correlates with the size of my input data. Looking at the stage details for one such stage, I am wondering where Spark spends all this time. Take this table of the stages task metrics for example: Metric Min 25th percentile Median 75th percentile Max Duration1.4 min 1.5 min 1.7 min 1.9 min 2.3 min Scheduler Delay 1 ms3 ms4 ms 5 ms23 ms Task Deserialization Time 1 ms2 ms3 ms 8 ms22 ms GC Time 0 ms0 ms0 ms 0 ms0 ms Result Serialization Time 0 ms0 ms0 ms 0 ms1 ms Getting Result Time 0 ms0 ms0 ms 0 ms0 ms Input Size / Records23.9 KB / 1 24.0 KB / 1 24.1 KB / 1 24.1 KB / 1 24.3 KB / 1 Why is the overall duration almost 2min? Where is all this time spent, when no progress of the stages is visible? The progress bar simply displays 0 succeeded tasks for a very long time before sometimes slowly progressing. Also, the name of the stage displayed above is `javaToPython at null:-1`, which I find very uninformative. I don't even know which action exactly is responsible for this stage. Does anyone experience similar issues or have any advice for me? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- *Olivier Girardot* | Associé o.girar...@lateral-thoughts.com +33 6 24 09 17 94
Re: Classifier for Big Data Mining
depends on your data and I guess the time/performance goals you have for both training/prediction, but for a quick answer : yes :) 2015-07-21 11:22 GMT+02:00 Chintan Bhatt chintanbhatt...@charusat.ac.in: Which classifier can be useful for mining massive datasets in spark? Decision Tree can be good choice as per scalability? -- CHINTAN BHATT http://in.linkedin.com/pub/chintan-bhatt/22/b31/336/ Assistant Professor, U P U Patel Department of Computer Engineering, Chandubhai S. Patel Institute of Technology, Charotar University of Science And Technology (CHARUSAT), Changa-388421, Gujarat, INDIA. http://www.charusat.ac.in *Personal Website*: https://sites.google.com/a/ecchanga.ac.in/chintan/
Re: coalesce on dataFrame
PySpark or Spark (scala) ? When you use coalesce with anything but a column you must use a literal like that in PySpark : from pyspark.sql import functions as F F.coalesce(df.a, F.lit(True)) Le mer. 1 juil. 2015 à 12:03, Ewan Leith ewan.le...@realitymine.com a écrit : It's in spark 1.4.0, or should be at least: https://issues.apache.org/jira/browse/SPARK-6972 Ewan -Original Message- From: Hafiz Mujadid [mailto:hafizmujadi...@gmail.com] Sent: 01 July 2015 08:23 To: user@spark.apache.org Subject: coalesce on dataFrame How can we use coalesce(1, true) on dataFrame? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/coalesce-on-dataFrame-tp23564.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Check for null in PySpark DataFrame
I must admit I've been using the same back to SQL strategy for now :p So I'd be glad to have insights into that too. Le mar. 30 juin 2015 à 23:28, pedro ski.rodrig...@gmail.com a écrit : I am trying to find what is the correct way to programmatically check for null values for rows in a dataframe. For example, below is the code using pyspark and sql: df = sqlContext.createDataFrame(sc.parallelize([(1, None), (2, a), (3, b), (4, None)])) df.where('_2 is not null').count() However, this won't work df.where(df._2 != None).count() It seems there is no native Python way with DataFrames to do this, but I find that difficult to believe and more likely that I am missing the right way to do this. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Check-for-null-in-PySpark-DataFrame-tp23553.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Shell Hive Context and Kerberos ticket
Nop I have not but I'm glad I'm not the only one :p Le ven. 26 juin 2015 07:54, Tao Li litao.bupt...@gmail.com a écrit : Hi Olivier, have you fix this problem now? I still have this fasterxml NoSuchMethodError. 2015-06-18 3:08 GMT+08:00 Olivier Girardot o.girar...@lateral-thoughts.com: Ok what was wrong was that the spark-env did not contain the HADOOP_CONF_DIR properly set to /etc/hadoop/conf/ With that fixed, this issue is gone, but I can't seem to get Spark SQL 1.4.0 with Hive working on CDH 5.3 or 5.4 : Using this command line : IPYTHON=1 /.../spark-1.4.0-bin-hadoop2.4/bin/pyspark --master yarn-client --driver-class-path `hadoop classpath` I end up with this issue : : java.lang.NoSuchMethodError: com.fasterxml.jackson.module.scala.deser.BigDecimalDeserializer$.handledType()Ljava/lang/Class; at com.fasterxml.jackson.module.scala.deser.NumberDeserializers$.init(ScalaNumberDeserializersModule.scala:49) at com.fasterxml.jackson.module.scala.deser.NumberDeserializers$.clinit(ScalaNumberDeserializersModule.scala) at com.fasterxml.jackson.module.scala.deser.ScalaNumberDeserializersModule$class.$init$(ScalaNumberDeserializersModule.scala:61) at com.fasterxml.jackson.module.scala.DefaultScalaModule.init(DefaultScalaModule.scala:19) at com.fasterxml.jackson.module.scala.DefaultScalaModule$.init(DefaultScalaModule.scala:35) at com.fasterxml.jackson.module.scala.DefaultScalaModule$.clinit(DefaultScalaModule.scala) at org.apache.spark.rdd.RDDOperationScope$.init(RDDOperationScope.scala:78) at org.apache.spark.rdd.RDDOperationScope$.clinit(RDDOperationScope.scala) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:118) at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:125) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1255) at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1189) at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1248) at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:176) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) This seems to be related to this Jira Issue : https://issues.apache.org/jira/browse/SPARK-8332 This is a blocker for me to deploy a Spark dataframe based app on an existing cluster, any input regarding how to create a proper classpath would be great. Regards, Olivier. Le mer. 17 juin 2015 à 11:37, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : Hi everyone, After copying the hive-site.xml from a CDH5 cluster, I can't seem to connect to the hive metastore using spark-shell, here's a part of the stack trace I get : 15/06/17 04:41:57 ERROR TSaslTransport: SASL negotiation failure javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212) at org.apache.thrift.transport.TSaslClientTransport.handleSaslStartMessage(TSaslClientTransport.java:94) at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253) at org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37) at org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:52) at org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:49) at java.security.AccessController.doPrivileged(Native Method) The user has a non-expired ticket, I can execute hadoop fs -ls, all in all I should have access to this. I am stuck with this issue on Spark 1.4.0, did not try a version before... Any guess regarding what might be wrong ? Regards, Olivier.
Re: Accessing Kerberos Secured HDFS Resources from Spark on Mesos
I would pretty much need exactly this kind of feature too Le ven. 26 juin 2015 à 21:17, Dave Ariens dari...@blackberry.com a écrit : Hi Timothy, Because I'm running Spark on Mesos alongside a secured Hadoop cluster, I need to ensure that my tasks running on the slaves perform a Kerberos login before accessing any HDFS resources. To login, they just need the name of the principal (username) and a keytab file. Then they just need to invoke the following java: import org.apache.hadoop.security.UserGroupInformation UserGroupInformation.loginUserFromKeytab(adminPrincipal, adminKeytab) This is done in the driver in my Gist below, but I don't know how to run it within each executor on the slaves as tasks are ran. Any help would be appreciated! *From:* Timothy Chen [mailto:t...@mesosphere.io] *Sent:* Friday, June 26, 2015 12:50 PM *To:* Dave Ariens *Cc:* user@spark.apache.org *Subject:* Re: Accessing Kerberos Secured HDFS Resources from Spark on Mesos Hi Dave, I don't understand Keeberos much but if you know the exact steps that needs to happen I can see how we can make that happen with the Spark framework. Tim On Jun 26, 2015, at 8:49 AM, Dave Ariens dari...@blackberry.com wrote: I understand that Kerberos support for accessing Hadoop resources in Spark only works when running Spark on YARN. However, I'd really like to hack something together for Spark on Mesos running alongside a secured Hadoop cluster. My simplified appplication (gist: https://gist.github.com/ariens/2c44c30e064b1790146a) receives a Kerberos principal and keytab when submitted. The static main method called currently then performs a UserGroupInformation. loginUserFromKeytab(userPrincipal, userKeytab) and authenticates to the Hadoop. This works on YARN (curiously without even without having to kinit first), but not on Mesos. Is there a way to have the slaves running the tasks perform the same kerberos login before they attempt to access HDFS? Putting aside the security of Spark/Mesos and how that keytab would get distributed, I'm just looking for a working POC. Is there a way to leverage the Broadcast capability to send a function that performs this? https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.broadcast.Broadcast Ideally, I'd love for this to not incur much overhead and just simply allow me to work around the absent Kerberos support... Thanks, Dave
Re: GSSException when submitting Spark job in yarn-cluster mode with HiveContext APIs on Kerberos cluster
Hi, I can't get this to work using CDH 5.4, Spark 1.4.0 in yarn cluster mode. @andrew did you manage to get it work with the latest version ? Le mar. 21 avr. 2015 à 00:02, Andrew Lee alee...@hotmail.com a écrit : Hi Marcelo, Exactly what I need to track, thanks for the JIRA pointer. Date: Mon, 20 Apr 2015 14:03:55 -0700 Subject: Re: GSSException when submitting Spark job in yarn-cluster mode with HiveContext APIs on Kerberos cluster From: van...@cloudera.com To: alee...@hotmail.com CC: user@spark.apache.org I think you want to take a look at: https://issues.apache.org/jira/browse/SPARK-6207 On Mon, Apr 20, 2015 at 1:58 PM, Andrew Lee alee...@hotmail.com wrote: Hi All, Affected version: spark 1.2.1 / 1.2.2 / 1.3-rc1 Posting this problem to user group first to see if someone is encountering the same problem. When submitting spark jobs that invokes HiveContext APIs on a Kerberos Hadoop + YARN (2.4.1) cluster, I'm getting this error. javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] Apparently, the Kerberos ticket is not on the remote data node nor computing node since we don't deploy Kerberos tickets, and that is not a good practice either. On the other hand, we can't just SSH to every machine and run kinit for that users. This is not practical and it is insecure. The point here is that shouldn't there be a delegation token during the doAs to use the token instead of the ticket ? I'm trying to understand what is missing in Spark's HiveContext API while a normal MapReduce job that invokes Hive APIs will work, but not in Spark SQL. Any insights or feedback are appreciated. Anyone got this running without pre-deploying (pre-initializing) all tickets node by node? Is this worth filing a JIRA? 15/03/25 18:59:08 INFO hive.metastore: Trying to connect to metastore with URI thrift://alee-cluster.test.testserver.com:9083 15/03/25 18:59:08 ERROR transport.TSaslTransport: SASL negotiation failure javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212) at org.apache.thrift.transport.TSaslClientTransport.handleSaslStartMessage(TSaslClientTransport.java:94) at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253) at org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37) at org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:52) at org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:49) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) at org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport.open(TUGIAssumingTransport.java:49) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:336) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.init(HiveMetaStoreClient.java:214) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1410) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(RetryingMetaStoreClient.java:62) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:72) at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2453) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:340) at org.apache.spark.sql.hive.HiveContext$$anonfun$4.apply(HiveContext.scala:235) at org.apache.spark.sql.hive.HiveContext$$anonfun$4.apply(HiveContext.scala:231) at scala.Option.orElse(Option.scala:257) at org.apache.spark.sql.hive.HiveContext.x$3$lzycompute(HiveContext.scala:231) at org.apache.spark.sql.hive.HiveContext.x$3(HiveContext.scala:229) at org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:229) at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:229) at
Re: Spark Shell Hive Context and Kerberos ticket
Ok what was wrong was that the spark-env did not contain the HADOOP_CONF_DIR properly set to /etc/hadoop/conf/ With that fixed, this issue is gone, but I can't seem to get Spark SQL 1.4.0 with Hive working on CDH 5.3 or 5.4 : Using this command line : IPYTHON=1 /.../spark-1.4.0-bin-hadoop2.4/bin/pyspark --master yarn-client --driver-class-path `hadoop classpath` I end up with this issue : : java.lang.NoSuchMethodError: com.fasterxml.jackson.module.scala.deser.BigDecimalDeserializer$.handledType()Ljava/lang/Class; at com.fasterxml.jackson.module.scala.deser.NumberDeserializers$.init(ScalaNumberDeserializersModule.scala:49) at com.fasterxml.jackson.module.scala.deser.NumberDeserializers$.clinit(ScalaNumberDeserializersModule.scala) at com.fasterxml.jackson.module.scala.deser.ScalaNumberDeserializersModule$class.$init$(ScalaNumberDeserializersModule.scala:61) at com.fasterxml.jackson.module.scala.DefaultScalaModule.init(DefaultScalaModule.scala:19) at com.fasterxml.jackson.module.scala.DefaultScalaModule$.init(DefaultScalaModule.scala:35) at com.fasterxml.jackson.module.scala.DefaultScalaModule$.clinit(DefaultScalaModule.scala) at org.apache.spark.rdd.RDDOperationScope$.init(RDDOperationScope.scala:78) at org.apache.spark.rdd.RDDOperationScope$.clinit(RDDOperationScope.scala) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:118) at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:125) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1255) at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1189) at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1248) at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:176) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) This seems to be related to this Jira Issue : https://issues.apache.org/jira/browse/SPARK-8332 This is a blocker for me to deploy a Spark dataframe based app on an existing cluster, any input regarding how to create a proper classpath would be great. Regards, Olivier. Le mer. 17 juin 2015 à 11:37, Olivier Girardot o.girar...@lateral-thoughts.com a écrit : Hi everyone, After copying the hive-site.xml from a CDH5 cluster, I can't seem to connect to the hive metastore using spark-shell, here's a part of the stack trace I get : 15/06/17 04:41:57 ERROR TSaslTransport: SASL negotiation failure javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212) at org.apache.thrift.transport.TSaslClientTransport.handleSaslStartMessage(TSaslClientTransport.java:94) at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253) at org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37) at org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:52) at org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:49) at java.security.AccessController.doPrivileged(Native Method) The user has a non-expired ticket, I can execute hadoop fs -ls, all in all I should have access to this. I am stuck with this issue on Spark 1.4.0, did not try a version before... Any guess regarding what might be wrong ? Regards, Olivier.
Spark Shell Hive Context and Kerberos ticket
Hi everyone, After copying the hive-site.xml from a CDH5 cluster, I can't seem to connect to the hive metastore using spark-shell, here's a part of the stack trace I get : 15/06/17 04:41:57 ERROR TSaslTransport: SASL negotiation failure javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212) at org.apache.thrift.transport.TSaslClientTransport.handleSaslStartMessage(TSaslClientTransport.java:94) at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253) at org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37) at org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:52) at org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:49) at java.security.AccessController.doPrivileged(Native Method) The user has a non-expired ticket, I can execute hadoop fs -ls, all in all I should have access to this. I am stuck with this issue on Spark 1.4.0, did not try a version before... Any guess regarding what might be wrong ? Regards, Olivier.
Re: How to share large resources like dictionaries while processing data with Spark ?
You can use it as a broadcast variable, but if it's too large (more than 1Gb I guess), you may need to share it joining this using some kind of key to the other RDDs. But this is the kind of thing broadcast variables were designed for. Regards, Olivier. Le jeu. 4 juin 2015 à 23:50, dgoldenberg dgoldenberg...@gmail.com a écrit : We have some pipelines defined where sometimes we need to load potentially large resources such as dictionaries. What would be the best strategy for sharing such resources among the transformations/actions within a consumer? Can they be shared somehow across the RDD's? I'm looking for a way to load such a resource once into the cluster memory and have it be available throughout the lifecycle of a consumer... Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-large-resources-like-dictionaries-while-processing-data-with-Spark-tp23162.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Compute Median in Spark Dataframe
Nice to hear from you Holden ! I ended up trying exactly that (Column) - but I may have done it wrong : In [*5*]: g.agg(Column(percentile(value, 0.5))) Py4JError: An error occurred while calling o97.agg. Trace: py4j.Py4JException: Method agg([class java.lang.String, class scala.collection.immutable.Nil$]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333) Any idea ? Olivier. Le mar. 2 juin 2015 à 18:02, Holden Karau hol...@pigscanfly.ca a écrit : Not super easily, the GroupedData class uses a strToExpr function which has a pretty limited set of functions so we cant pass in the name of an arbitrary hive UDAF (unless I'm missing something). We can instead construct an column with the expression you want and then pass it in to agg() that way (although then you need to call the hive UDAF there). There are some private classes in hiveUdfs.scala which expose hiveUdaf's as Spark SQL AggregateExpressions, but they are private. On Tue, Jun 2, 2015 at 8:28 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: I've finally come to the same conclusion, but isn't there any way to call this Hive UDAFs from the agg(percentile(key,0.5)) ?? Le mar. 2 juin 2015 à 15:37, Yana Kadiyska yana.kadiy...@gmail.com a écrit : Like this...sqlContext should be a HiveContext instance case class KeyValue(key: Int, value: String) val df=sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF df.registerTempTable(table) sqlContext.sql(select percentile(key,0.5) from table).show() On Tue, Jun 2, 2015 at 8:07 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi everyone, Is there any way to compute a median on a column using Spark's Dataframe. I know you can use stats in a RDD but I'd rather stay within a dataframe. Hive seems to imply that using ntile one can compute percentiles, quartiles and therefore a median. Does anyone have experience with this ? Regards, Olivier. -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau
Re: Best strategy for Pandas - Spark
Thanks for the answer, I'm currently doing exactly that. I'll try to sum-up the usual Pandas = Spark Dataframe caveats soon. Regards, Olivier. Le mar. 2 juin 2015 à 02:38, Davies Liu dav...@databricks.com a écrit : The second one sounds reasonable, I think. On Thu, Apr 30, 2015 at 1:42 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi everyone, Let's assume I have a complex workflow of more than 10 datasources as input - 20 computations (some creating intermediary datasets and some merging everything for the final computation) - some taking on average 1 minute to complete and some taking more than 30 minutes. What would be for you the best strategy to port this to Apache Spark ? Transform the whole flow into a Spark Job (PySpark or Scala) Transform only part of the flow (the heavy lifting ~30 min parts) using the same language (PySpark) Transform only part of the flow and pipe the rest from Scala to Python Regards, Olivier.
Re: Compute Median in Spark Dataframe
I've finally come to the same conclusion, but isn't there any way to call this Hive UDAFs from the agg(percentile(key,0.5)) ?? Le mar. 2 juin 2015 à 15:37, Yana Kadiyska yana.kadiy...@gmail.com a écrit : Like this...sqlContext should be a HiveContext instance case class KeyValue(key: Int, value: String) val df=sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF df.registerTempTable(table) sqlContext.sql(select percentile(key,0.5) from table).show() On Tue, Jun 2, 2015 at 8:07 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi everyone, Is there any way to compute a median on a column using Spark's Dataframe. I know you can use stats in a RDD but I'd rather stay within a dataframe. Hive seems to imply that using ntile one can compute percentiles, quartiles and therefore a median. Does anyone have experience with this ? Regards, Olivier.
Re: RandomSplit with Spark-ML and Dataframe
Thank you ! Le mar. 19 mai 2015 à 21:08, Xiangrui Meng men...@gmail.com a écrit : In 1.4, we added RAND as a DataFrame expression, which can be used for random split. Please check the example here: https://github.com/apache/spark/blob/master/python/pyspark/ml/tuning.py#L214. https://github.com/apache/spark/blob/master/python/pyspark/ml/tuning.py#L214.-Xiangrui -Xiangrui https://github.com/apache/spark/blob/master/python/pyspark/ml/tuning.py#L214.-Xiangrui On Thu, May 7, 2015 at 8:39 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi, is there any best practice to do like in MLLib a randomSplit of training/cross-validation set with dataframes and the pipeline API ? Regards Olivier.
Re: [SparkSQL 1.4.0] groupBy columns are always nullable?
PR is opened : https://github.com/apache/spark/pull/6237 Le ven. 15 mai 2015 à 17:55, Olivier Girardot ssab...@gmail.com a écrit : yes, please do and send me the link. @rxin I have trouble building master, but the code is done... Le ven. 15 mai 2015 à 01:27, Haopu Wang hw...@qilinsoft.com a écrit : Thank you, should I open a JIRA for this issue? -- *From:* Olivier Girardot [mailto:ssab...@gmail.com] *Sent:* Tuesday, May 12, 2015 5:12 AM *To:* Reynold Xin *Cc:* Haopu Wang; user *Subject:* Re: [SparkSQL 1.4.0] groupBy columns are always nullable? I'll look into it - not sure yet what I can get out of exprs :p Le lun. 11 mai 2015 à 22:35, Reynold Xin r...@databricks.com a écrit : Thanks for catching this. I didn't read carefully enough. It'd make sense to have the udaf result be non-nullable, if the exprs are indeed non-nullable. On Mon, May 11, 2015 at 1:32 PM, Olivier Girardot ssab...@gmail.com wrote: Hi Haopu, actually here `key` is nullable because this is your input's schema : scala result.printSchema root |-- key: string (nullable = true) |-- SUM(value): long (nullable = true) scala df.printSchema root |-- key: string (nullable = true) |-- value: long (nullable = false) I tried it with a schema where the key is not flagged as nullable, and the schema is actually respected. What you can argue however is that SUM(value) should also be not nullable since value is not nullable. @rxin do you think it would be reasonable to flag the Sum aggregation function as nullable (or not) depending on the input expression's schema ? Regards, Olivier. Le lun. 11 mai 2015 à 22:07, Reynold Xin r...@databricks.com a écrit : Not by design. Would you be interested in submitting a pull request? On Mon, May 11, 2015 at 1:48 AM, Haopu Wang hw...@qilinsoft.com wrote: I try to get the result schema of aggregate functions using DataFrame API. However, I find the result field of groupBy columns are always nullable even the source field is not nullable. I want to know if this is by design, thank you! Below is the simple code to show the issue. == import sqlContext.implicits._ import org.apache.spark.sql.functions._ case class Test(key: String, value: Long) val df = sc.makeRDD(Seq(Test(k1,2),Test(k1,1))).toDF val result = df.groupBy(key).agg($key, sum(value)) // From the output, you can see the key column is nullable, why?? result.printSchema //root // |-- key: string (nullable = true) // |-- SUM(value): long (nullable = true) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why so slow
can you post the explain too ? Le mar. 12 mai 2015 à 12:11, Jianshi Huang jianshi.hu...@gmail.com a écrit : Hi, I have a SQL query on tables containing big Map columns (thousands of keys). I found it to be very slow. select meta['is_bad'] as is_bad, count(*) as count, avg(nvar['var1']) as avg from test where date between '2014-04-01' and '2014-04-30' group by meta['is_bad'] = +-+---+---+ | is_bad | count | avg | +-+---+---+ | 0 | 17024396 | 0.16257395850742645 | | 1 | 179729| -0.37626256661125485 | | 2 | 28128 | 0.11674427263203344 | | 3 | 116327| -0.6398689187187386 | | 4 | 87715 | -0.5349632960030563 | | 5 | 169771| 0.40812641191854626 | | 6 | 542447| 0.5238256418341465| | 7 | 160324| 0.29442847034840386 | | 8 | 2099 | -0.9165701665162977 | | 9 | 3104 | 0.3845685004598235| +-+---+---+ 10 rows selected (130.5 seconds) The total number of rows is less than 20M. Why so slow? I'm running on Spark 1.4.0-SNAPSHOT with 100 executors each having 4GB ram and 2 CPU core. Looks like https://issues.apache.org/jira/browse/SPARK-5446 is still open, when can we have it fixed? :) -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: value toDF is not a member of RDD object
you need to instantiate a SQLContext : val sc : SparkContext = ... val sqlContext = new SQLContext(sc) import sqlContext.implicits._ Le mar. 12 mai 2015 à 12:29, SLiZn Liu sliznmail...@gmail.com a écrit : I added `libraryDependencies += org.apache.spark % spark-sql_2.11 % 1.3.1` to `build.sbt` but the error remains. Do I need to import modules other than `import org.apache.spark.sql.{ Row, SQLContext }`? On Tue, May 12, 2015 at 5:56 PM Olivier Girardot ssab...@gmail.com wrote: toDF is part of spark SQL so you need Spark SQL dependency + import sqlContext.implicits._ to get the toDF method. Regards, Olivier. Le mar. 12 mai 2015 à 11:36, SLiZn Liu sliznmail...@gmail.com a écrit : Hi User Group, I’m trying to reproduce the example on Spark SQL Programming Guide https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection, and got a compile error when packaging with sbt: [error] myfile.scala:30: value toDF is not a member of org.apache.spark.rdd.RDD[Person] [error] val people = sc.textFile(examples/src/main/resources/people.txt).map(_.split(,)).map(p = Person(p(0), p(1).trim.toInt)).toDF() [error] ^ [error] one error found [error] (compile:compileIncremental) Compilation failed [error] Total time: 3 s, completed May 12, 2015 4:11:53 PM I double checked my code includes import sqlContext.implicits._ after reading this post https://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3c1426522113299-22083.p...@n3.nabble.com%3E on spark mailing list, even tried to use toDF(col1, col2) suggested by Xiangrui Meng in that post and got the same error. The Spark version is specified in build.sbt file as follows: scalaVersion := 2.11.6 libraryDependencies += org.apache.spark % spark-core_2.11 % 1.3.1 % provided libraryDependencies += org.apache.spark % spark-mllib_2.11 % 1.3.1 Anyone have ideas the cause of this error? REGARDS, Todd Leo
Re: [SparkSQL 1.4.0] groupBy columns are always nullable?
Hi Haopu, actually here `key` is nullable because this is your input's schema : scala result.printSchema root |-- key: string (nullable = true) |-- SUM(value): long (nullable = true) scala df.printSchema root |-- key: string (nullable = true) |-- value: long (nullable = false) I tried it with a schema where the key is not flagged as nullable, and the schema is actually respected. What you can argue however is that SUM(value) should also be not nullable since value is not nullable. @rxin do you think it would be reasonable to flag the Sum aggregation function as nullable (or not) depending on the input expression's schema ? Regards, Olivier. Le lun. 11 mai 2015 à 22:07, Reynold Xin r...@databricks.com a écrit : Not by design. Would you be interested in submitting a pull request? On Mon, May 11, 2015 at 1:48 AM, Haopu Wang hw...@qilinsoft.com wrote: I try to get the result schema of aggregate functions using DataFrame API. However, I find the result field of groupBy columns are always nullable even the source field is not nullable. I want to know if this is by design, thank you! Below is the simple code to show the issue. == import sqlContext.implicits._ import org.apache.spark.sql.functions._ case class Test(key: String, value: Long) val df = sc.makeRDD(Seq(Test(k1,2),Test(k1,1))).toDF val result = df.groupBy(key).agg($key, sum(value)) // From the output, you can see the key column is nullable, why?? result.printSchema //root // |-- key: string (nullable = true) // |-- SUM(value): long (nullable = true) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RandomSplit with Spark-ML and Dataframe
Hi, is there any best practice to do like in MLLib a randomSplit of training/cross-validation set with dataframes and the pipeline API ? Regards Olivier.
Re: Spark 1.3.1 and Parquet Partitions
hdfs://some ip:8029/dataset/*/*.parquet doesn't work for you ? Le jeu. 7 mai 2015 à 03:32, vasuki vax...@gmail.com a écrit : Spark 1.3.1 - i have a parquet file on hdfs partitioned by some string looking like this /dataset/city=London/data.parquet /dataset/city=NewYork/data.parquet /dataset/city=Paris/data.paruqet …. I am trying to get to load it using sqlContext using sqlcontext.parquetFile( hdfs://some ip:8029/dataset/ what do i put here No leads so far. is there i can load the partitions ? I am running on cluster and not local.. -V -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-1-and-Parquet-Partitions-tp22792.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Re: sparksql running slow while joining 2 tables.
Can you activate your eventLogs and send them us ? Thank you ! Le mar. 5 mai 2015 à 04:56, luohui20001 luohui20...@sina.com a écrit : Yes,just by default 1 executor.thanks 发自我的小米手机 在 2015年5月4日 下午10:01,ayan guha guha.a...@gmail.com写道: Are you using only 1 executor? On Mon, May 4, 2015 at 11:07 PM, luohui20...@sina.com wrote: hi Olivier spark1.3.1, with java1.8.0.45 and add 2 pics . it seems like a GC issue. I also tried with different parameters like memory size of driverexecutor, memory fraction, java opts... but this issue still happens. Thanksamp;Best regards! 罗辉 San.Luo - 原始邮件 - 发件人:Olivier Girardot ssab...@gmail.com 收件人:luohui20...@sina.com, user user@spark.apache.org 主题:Re: sparksql running slow while joining 2 tables. 日期:2015年05月04日 20点46分 Hi, What is you Spark version ? Regards, Olivier. Le lun. 4 mai 2015 à 11:03, luohui20...@sina.com a écrit : hi guys when i am running a sql like select a.name,a.startpoint,a.endpoint, a.piece from db a join sample b on (a.name = b.name) where (b.startpoint a.startpoint + 25); I found sparksql running slow in minutes which may caused by very long GC and shuffle time. table db is created from a txt file size at 56mb while table sample sized at 26mb, both at small size. my spark cluster is a standalone pseudo-distributed spark cluster with 8g executor and 4g driver manager. any advises? thank you guys. Thanksamp;Best regards! 罗辉 San.Luo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Best Regards, Ayan Guha
Re: sparksql running slow while joining 2 tables.
Hi, What is you Spark version ? Regards, Olivier. Le lun. 4 mai 2015 à 11:03, luohui20...@sina.com a écrit : hi guys when i am running a sql like select a.name,a.startpoint,a.endpoint, a.piece from db a join sample b on (a.name = b.name) where (b.startpoint a.startpoint + 25); I found sparksql running slow in minutes which may caused by very long GC and shuffle time. table db is created from a txt file size at 56mb while table sample sized at 26mb, both at small size. my spark cluster is a standalone pseudo-distributed spark cluster with 8g executor and 4g driver manager. any advises? thank you guys. Thanksamp;Best regards! 罗辉 San.Luo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: AJAX with Apache Spark
Hi Sergio, you shouldn't architecture it this way, rather update a storage with Spark Streaming that your Play App will query. For example a Cassandra table, or Redis, or anything that will be able to answer you in milliseconds, rather than querying the Spark Streaming program. Regards, Olivier. Le lun. 4 mai 2015 à 20:08, Sergio Jiménez Barrio drarse.a...@gmail.com a écrit : Hi, I am trying create a DashBoard of a job of Apache Spark. I need run Spark Streaming 24/7 and when recive a ajax request this answer with the actual state of the job. I have created the client, and the program in Spark. I tried create the service of response with play, but this run the program with a request. I want send the accumulator of spark program with a request. Sorry for my explanation. Any idea? Maybe with Play? Thanks
Re: Drop a column from the DataFrame.
great thx Le sam. 2 mai 2015 à 23:58, Ted Yu yuzhih...@gmail.com a écrit : This is coming in 1.4.0 https://issues.apache.org/jira/browse/SPARK-7280 On May 2, 2015, at 2:27 PM, Olivier Girardot ssab...@gmail.com wrote: Sounds like a patch for a drop method... Le sam. 2 mai 2015 à 21:03, dsgriffin dsgrif...@gmail.com a écrit : Just use select() to create a new DataFrame with only the columns you want. Sort of the opposite of what you want -- but you can select all but the columns you want minus the one you don. You could even use a filter to remove just the one column you want on the fly: myDF.select(myDF.columns.filter(_ != column_you_do_not_want).map(colname = new Column(colname)).toList : _* ) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Drop-a-column-from-the-DataFrame-tp22711p22737.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can I group elements in RDD into different groups and let each group share some elements?
Did you look at the cogroup transformation or the cartesian transformation ? Regards, Olivier. Le sam. 2 mai 2015 à 22:01, Franz Chien franzj...@gmail.com a écrit : Hi all, Can I group elements in RDD into different groups and let each group share elements? For example, I have 10,000 elements in RDD from e1 to e1, and I want to group and aggregate them by another mapping with size of 2000, ex: ( (e1,e42), (e1,e554), (e3, e554)…… (2000th group)) My first approach was to filter the RDD with mapping rules for 2000 times, and then union them together. However, it ran forever. Does SPARK provide a way to group elements in RDD like this please? Thanks, Franz
Re: to split an RDD to multiple ones?
I guess : val srdd_s1 = srdd.filter(_.startsWith(s1_)).sortBy(_) val srdd_s2 = srdd.filter(_.startsWith(s2_)).sortBy(_) val srdd_s3 = srdd.filter(_.startsWith(s3_)).sortBy(_) Regards, Olivier. Le sam. 2 mai 2015 à 22:53, Yifan LI iamyifa...@gmail.com a écrit : Hi, I have an RDD *srdd* containing (unordered-)data like this: s1_0, s3_0, s2_1, s2_2, s3_1, s1_3, s1_2, … What I want is (it will be much better if they could be in ascending order): *srdd_s1*: s1_0, s1_1, s1_2, …, s1_n *srdd_s2*: s2_0, s2_1, s2_2, …, s2_n *srdd_s3*: s3_0, s3_1, s3_2, …, s3_n … … Have any idea? Thanks in advance! :) Best, Yifan LI
Re: Drop a column from the DataFrame.
Sounds like a patch for a drop method... Le sam. 2 mai 2015 à 21:03, dsgriffin dsgrif...@gmail.com a écrit : Just use select() to create a new DataFrame with only the columns you want. Sort of the opposite of what you want -- but you can select all but the columns you want minus the one you don. You could even use a filter to remove just the one column you want on the fly: myDF.select(myDF.columns.filter(_ != column_you_do_not_want).map(colname = new Column(colname)).toList : _* ) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Drop-a-column-from-the-DataFrame-tp22711p22737.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index:
Can you post your code, otherwise there's not much we can do. Regards, Olivier. Le sam. 2 mai 2015 à 21:15, shahab shahab.mok...@gmail.com a écrit : Hi, I am using sprak-1.2.0 and I used Kryo serialization but I get the following excepton. java.io.IOException: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 3448, Size: 1 I do apprecciate if anyone could tell me how I can resolve this? best, /Shahab
Best strategy for Pandas - Spark
Hi everyone, Let's assume I have a complex workflow of more than 10 datasources as input - 20 computations (some creating intermediary datasets and some merging everything for the final computation) - some taking on average 1 minute to complete and some taking more than 30 minutes. What would be for you the best strategy to port this to Apache Spark ? - Transform the whole flow into a Spark Job (PySpark or Scala) - Transform only part of the flow (the heavy lifting ~30 min parts) using the same language (PySpark) - Transform only part of the flow and pipe the rest from Scala to Python Regards, Olivier.
Dataframe filter based on another Dataframe
Hi everyone, what is the most efficient way to filter a DataFrame on a column from another Dataframe's column. The best idea I had, was to join the two dataframes : val df1 : Dataframe val df2: Dataframe df1.join(df2, df1(id) === df2(id), inner) But I end up (obviously) with the id column twice. Another approach would be to filter df1 but I can't seem to get this to work using df2's column as a base Any idea ? Regards, Olivier.
Re: Dataframe filter based on another Dataframe
You mean after joining ? Sure, my question was more if there was any best practice preferred to joining the other dataframe for filtering. Regards, Olivier. Le mer. 29 avr. 2015 à 13:23, Olivier Girardot ssab...@gmail.com a écrit : Hi everyone, what is the most efficient way to filter a DataFrame on a column from another Dataframe's column. The best idea I had, was to join the two dataframes : val df1 : Dataframe val df2: Dataframe df1.join(df2, df1(id) === df2(id), inner) But I end up (obviously) with the id column twice. Another approach would be to filter df1 but I can't seem to get this to work using df2's column as a base Any idea ? Regards, Olivier.
How to distribute Spark computation recipes
Hi everyone, I know that any RDD is related to its SparkContext and the associated variables (broadcast, accumulators), but I'm looking for a way to serialize/deserialize full RDD computations ? @rxin Spark SQL is, in a way, already doing this but the parsers are private[sql], is there any way to reuse this work to get Logical/Physical Plans in out of Spark ? Regards, Olivier.
Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error
Hi Sourav, Can you post your updateFunc as well please ? Regards, Olivier. Le mar. 21 avr. 2015 à 12:48, Sourav Chandra sourav.chan...@livestream.com a écrit : Hi, We are building a spark streaming application which reads from kafka, does updateStateBykey based on the received message type and finally stores into redis. After running for few seconds the executor process get killed by throwing OutOfMemory error. The code snippet is below: *NoOfReceiverInstances = 1* *val kafkaStreams = (1 to NoOfReceiverInstances).map(* * _ = KafkaUtils.createStream(ssc, ZKQuorum, ConsumerGroup, TopicsMap)* *)* *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long, Long)]) = {...}* *ssc.union(kafkaStreams).map(KafkaMessageMapper(_)).filter(...)..updateStateByKey(updateFunc).foreachRDD(_.foreachPartition(RedisHelper.update(_)))* *object RedisHelper {* * private val client = scredis.Redis(* * ConfigFactory.parseProperties(System.getProperties).getConfig(namespace)* * )* * def update(**itr: Iterator[(String, (Long, Long))]) {* *// redis save operation* * }* *}* *Below is the spark configuration:* *spark.app.name http://spark.app.name = XXX* *spark.jars = .jar* *spark.home = /spark-1.1.1-bin-hadoop2.4* *spark.executor.memory = 1g* *spark.streaming.concurrentJobs = 1000* *spark.logConf = true* *spark.cleaner.ttl = 3600 //in milliseconds* *spark.default.parallelism = 12* *spark.executor.extraJavaOptions = -Xloggc:gc.log -XX:+PrintGCDetails -XX:+UseConcMarkSweepGC -XX:HeapDumpPath=1.hprof -XX:+HeapDumpOnOutOfMemoryError* *spark.executor.logs.rolling.strategy = size* *spark.executor.logs.rolling.size.maxBytes = 104857600 // 100 MB* *spark.executor.logs.rolling.maxRetainedFiles = 10* *spark.serializer = org.apache.spark.serializer.KryoSerializer* *spark.kryo.registrator = xxx.NoOpKryoRegistrator* other configurations are below *streaming {* *// All streaming context related configs should come here* *batch-duration = 1 second* *checkpoint-directory = /tmp* *checkpoint-duration = 10 seconds* *slide-duration = 1 second* *window-duration = 1 second* *partitions-for-shuffle-task = 32* * }* * kafka {* *no-of-receivers = 1* *zookeeper-quorum = :2181* *consumer-group = x* *topic = x:2* * }* We tried different combinations like - with spark 1.1.0 and 1.1.1. - by increasing executor memory - by changing the serialization strategy (switching between kryo and normal java) - by changing broadcast strategy (switching between http and torrent broadcast) Can anyone give any insight what we are missing here? How can we fix this? Due to akka version mismatch with some other libraries we cannot upgrade the spark version. Thanks, -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
Re: Can a map function return null
You can return an RDD with null values inside, and afterwards filter on item != null In scala (or even in Java 8) you'd rather use Option/Optional, and in Scala they're directly usable from Spark. Exemple : sc.parallelize(1 to 1000).flatMap(item = if (item % 2 ==0) Some(item) else None).collect() res0: Array[Int] = Array(2, 4, 6, ) Regards, Olivier. Le sam. 18 avr. 2015 à 20:44, Steve Lewis lordjoe2...@gmail.com a écrit : I find a number of cases where I have an JavaRDD and I wish to transform the data and depending on a test return 0 or one item (don't suggest a filter - the real case is more complex). So I currently do something like the following - perform a flatmap returning a list with 0 or 1 entry depending on the isUsed function. JavaRDDFoo original = ... JavaRDDFoo words = original.flatMap(new FlatMapFunctionFoo, Foo() { @Override public IterableFoo call(final Foo s) throws Exception { ListFoo ret = new ArrayListFoo(); if(isUsed(s)) ret.add(transform(s)); return ret; // contains 0 items if isUsed is false } }); My question is can I do a map returning the transformed data and null if nothing is to be returned. as shown below - what does a Spark do with a map function returning null JavaRDDFoo words = original.map(new MapFunctionString, String() { @Override Foo call(final Foo s) throws Exception { ListFoo ret = new ArrayListFoo(); if(isUsed(s)) return transform(s); return null; // not used - what happens now } });
Re: Build spark failed with maven
Hi, this was not reproduced for me, what kind of jdk are you using for the zinc server ? Regards, Olivier. 2015-02-11 5:08 GMT+01:00 Yi Tian tianyi.asiai...@gmail.com: Hi, all I got an ERROR when I build spark master branch with maven (commit: 2d1e916730492f5d61b97da6c483d3223ca44315) [INFO] [INFO] [INFO] Building Spark Project Catalyst 1.3.0-SNAPSHOT [INFO] [INFO] [INFO] --- maven-enforcer-plugin:1.3.1:enforce (enforce-versions) @ spark-catalyst_2.10 --- [INFO] [INFO] --- build-helper-maven-plugin:1.8:add-source (add-scala-sources) @ spark-catalyst_2.10 --- [INFO] Source directory: /Users/tianyi/github/community/apache-spark/sql/catalyst/src/main/scala added. [INFO] [INFO] --- maven-remote-resources-plugin:1.5:process (default) @ spark-catalyst_2.10 --- [INFO] [INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ spark-catalyst_2.10 --- [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] skip non existing resourceDirectory /Users/tianyi/github/community/apache-spark/sql/catalyst/src/main/resources [INFO] Copying 3 resources [INFO] [INFO] --- scala-maven-plugin:3.2.0:compile (scala-compile-first) @ spark-catalyst_2.10 --- [INFO] Using zinc server for incremental compilation [INFO] compiler plugin: BasicArtifact(org.scalamacros,paradise_2.10.4,2.0.1,null) [info] Compiling 69 Scala sources and 3 Java sources to /Users/tianyi/github/community/apache-spark/sql/catalyst/target/scala-2.10/classes...[error] /Users/tianyi/github/community/apache-spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala:314: polymorphic expression cannot be instantiated to expected type; [error] found : [T(in method apply)]org.apache.spark.sql.catalyst.dsl.ScalaUdfBuilder[T(in method apply)] [error] required: org.apache.spark.sql.catalyst.dsl.package.ScalaUdfBuilder[T(in method functionToUdfBuilder)] [error] implicit def functionToUdfBuilder[T: TypeTag](func: Function1[_, T]): ScalaUdfBuilder[T] = ScalaUdfBuilder(func) Any suggestion?
Re: Opening Spark on IntelliJ IDEA
Hi, are you using spark for a java or scala project and can you post your pom file please ? Regards, Olivier. 2014-11-27 7:07 GMT+01:00 Taeyun Kim taeyun@innowireless.com: Hi, An information about the error. On File | Project Structure window, the following error message is displayed with pink background: Library ‘Maven: org.scala-lang:scala-compiler-bundle:2.10.4’ is not used Can it be a hint? *From:* Taeyun Kim [mailto:taeyun@innowireless.com] *Sent:* Thursday, November 27, 2014 3:00 PM *To:* 'user' *Subject:* Opening Spark on IntelliJ IDEA Hi, I’m trying to open the Spark source code with IntelliJ IDEA. I opened pom.xml on the Spark source code root directory. Project tree is displayed in the Project tool window. But, when I open a source file, say org.apache.spark.deploy.yarn.ClientBase.scala, a lot of red marks shows on the editor scroll bar. It is the ‘Cannot resolve symbol’ error. Even it cannot resolve StringOps.format. How can it be fixed? The versions I’m using are as follows: - OS: Windows 7 - IntelliJ IDEA: 13.1.6 - Scala plugin: 0.41.2 - Spark source code: 1.1.1 (with a few file modified by me) I’ve tried to fix this and error state changed somewhat, but eventually I gave up fixing it on my own (with googling) and deleted .idea folder and started over. So now I’m seeing the errors described above. Thank you.
Re: Cannot access data after a join (error: value _1 is not a member of Product with Serializable)
can you please post the full source of your code and some sample data to run it on ? 2014-11-19 16:23 GMT+01:00 YaoPau jonrgr...@gmail.com: I joined two datasets together, and my resulting logs look like this: (975894369,((72364,20141112T170627,web,MEMPHIS,AR,US,Central),(Male,John,Smith))) (253142991,((30058,20141112T171246,web,ATLANTA16,GA,US,Southeast),(Male,Bob,Jones))) (295305425,((28110,20141112T170454,iph,CHARLOTTE2,NC,US,Southeast),(Female,Mary,Williams))) When I try to access the newly-joined data with JoinedInv.map(line = line._2._2._1) I get the following error: [ERROR] error: value _1 is not a member of Product with Serializable [INFO] val getOne = JoinedInv.map(line = line._2._2._1) [INFO] ^ [ERROR] error: value foreach is not a member of Array[Nothing] [INFO] getOne.take(10).foreach(println) [INFO]^ It looks like there are some rows where a JOIN did not occur (no key match in the joined dataset), but because I can't access line._2._2._1 I don't know of a way to check for that. I can access line._2._2 but line._2._2 does not have the length attribute. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-access-data-after-a-join-error-value-1-is-not-a-member-of-Product-with-Serializable-tp19272.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: default parallelism bug?
Hi, what do you mean by pretty small ? How big is your file ? Regards, Olivier. 2014-10-21 6:01 GMT+02:00 Kevin Jung itsjb.j...@samsung.com: I use Spark 1.1.0 and set these options to spark-defaults.conf spark.scheduler.mode FAIR spark.cores.max 48 spark.default.parallelism 72 Thanks, Kevin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/default-parallelism-bug-tp16787p16894.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Convert Iterable to RDD
I don't think this is provided out of the box, but you can use toSeq on your Iterable and if the Iterable is lazy, it should stay that way for the Seq. And then you can use sc.parallelize(my-iterable.toSeq) so you'll have your RDD. For the Iterable[Iterable[T]] you can flatten it and then create your RDD from the corresponding Iterable. Regards, Olivier. 2014-10-21 5:07 GMT+02:00 Dai, Kevin yun...@ebay.com: In addition, how to convert Iterable[Iterable[T]] to RDD[T] Thanks, Kevin. *From:* Dai, Kevin [mailto:yun...@ebay.com] *Sent:* 2014年10月21日 10:58 *To:* user@spark.apache.org *Subject:* Convert Iterable to RDD Hi, All Is there any way to convert iterable to RDD? Thanks, Kevin.
Re: RDD to Multiple Tables SparkSQL
If you already know your keys the best way would be to extract one RDD per key (it would not bring the content back to the master and you can take advantage of the caching features) and then execute a registerTempTable by Key. But I'm guessing, you don't know the keys in advance, and in this case, I think it becomes a very confusing point to put everything in different tables, First of all - how would you query it afterwards ? Regards, Olivier. 2014-10-20 13:02 GMT+02:00 critikaled isasmani@gmail.com: Hi I have a rdd which I want to register as multiple tables based on key val context = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.hive.HiveContext(context) import sqlContext.createSchemaRDD case class KV(key:String,id:String,value:String) val logsRDD = context.textFile(logs, 10).map{line= val Array(key,id,value) = line split ' ' (key,id,value) }.registerTempTable(KVS) I want to store the above information to multiple tables based on key without bringing the entire data to master Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-to-Multiple-Tables-SparkSQL-tp16807.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org