Re: How to flatten a row in PySpark

2017-10-13 Thread Debabrata Ghosh
Thanks Ayan and NIcholas for your jetfast reply ! Appreciate it a lot. Cheers, Debu On Fri, Oct 13, 2017 at 9:27 AM, ayan guha wrote: > Quick pyspark code: > > >>> s = "ABZ|ABZ|AF|2,3,7,8,B,C,D,E,J,K,L,M,P,Q,T,U,X,Y|1,2,3,4,5|730" > >>> base =

Re: Is there a difference between df.cache() vs df.rdd.cache()

2017-10-13 Thread Weichen Xu
You should use `df.cache()` `df.rdd.cache()` won't work, because `df.rdd` generate a new RDD from the original `df`. and then cache the new RDD. On Fri, Oct 13, 2017 at 3:35 PM, Supun Nakandala wrote: > Hi all, > > I have been experimenting with

Re: Kafka 010 Spark 2.2.0 Streaming / Custom checkpoint strategy

2017-10-13 Thread Jörn Franke
HDFS can be r placed by other filesystem plugins (eg ignitefs, s3, etc) so the easiest is to write a file system plugin. This is not a plug-in for Spark but part of the Hadoop functionality used by Spark. > On 13. Oct 2017, at 17:41, Anand Chandrashekar wrote: > >

Issue Storing offset in Kafka for Spark Streaming Application

2017-10-13 Thread Arpan Rajani
Hi all, In our cluster we have Kafka 0.10.1 and Spark 2.1.0. We are trying to store the offsets in Kafka in order to achieve restartability of the streaming application. ( Using checkpoints, I already implemented, we will require to change code in production hence checkpoint won't work) Checking

Bug Report: Spark Config Defaults not Loading with python code/spark-submit

2017-10-13 Thread Nathan McLean
Here is an example pyspark program which illustrates this problem. If run using spark-submit, the default configurations for Spark do not seem to be loaded when a new SparkConf class is instantiated (contrary to what the loadDefaults=True keyword arg implies). When using the interactive shell for

Re: Is there a difference between df.cache() vs df.rdd.cache()

2017-10-13 Thread Weichen Xu
Hi Supun, Dataframe API is NOT using the old RDD implementation under the covers, dataframe has its own implementation. (Dataframe use binary row format and columnar storage when cached). So dataframe has no relationship with the `RDD[Row]` you want get. When calling `df.rdd`, and then cache, it

Spark Kafka API tries to connect to the dead node for every batch, which increases the processing time

2017-10-13 Thread supritht
Hi guys, I have a 3 node cluster and i am running a spark streaming job. consider the below example /*spark-submit* --master yarn-cluster --class com.huawei.bigdata.spark.examples.FemaleInfoCollectionPrint --jars

Re: Issue Storing offset in Kafka for Spark Streaming Application

2017-10-13 Thread Gerard Maas
Hi Arpan, The error suggests that the streaming context has been started with streamingContext.start() and after that statement, some other dstream operations have been attempted. A suggested pattern to manage the offsets is the following: var offsetRanges: Array[OffsetRanger] = _ //create

Re: Apache Spark-Subtract two datasets

2017-10-13 Thread Nathan Kronenfeld
I think you want a join of type "left_anti"... See below log scala> import spark.implicits._ import spark.implicits._ scala> case class Foo (a: String, b: Int) defined class Foo scala> case class Bar (a: String, d: Double) defined class Bar scala> var fooDs = Seq(Foo("a", 1), Foo("b", 2),

Kafka 010 Spark 2.2.0 Streaming / Custom checkpoint strategy

2017-10-13 Thread Anand Chandrashekar
Greetings! I would like to accomplish a custom kafka checkpoint strategy (instead of hdfs, i would like to use redis). is there a strategy I can use to change this behavior; any advise will help. Thanks! Regards, Anand.

Re: Is there a difference between df.cache() vs df.rdd.cache()

2017-10-13 Thread Stephen Boesch
@Vadim Would it be true to say the `.rdd` *may* be creating a new job - depending on whether the DataFrame/DataSet had already been materialized via an action or checkpoint? If the only prior operations on the DataFrame had been transformations then the dataframe would still not have been

Re: Is there a difference between df.cache() vs df.rdd.cache()

2017-10-13 Thread Supun Nakandala
Hi Weichen, Thank you for the reply. My understanding was Dataframe API is using the old RDD implementation under the covers though it presents a different API. And calling df.rdd will simply give access to the underlying RDD. Is this assumption wrong? I would appreciate if you can shed more

Re: Is there a difference between df.cache() vs df.rdd.cache()

2017-10-13 Thread Vadim Semenov
When you do `Dataset.rdd` you actually create a new job here you can see what it does internally: https://github.com/apache/spark/blob/master/sql/core/ src/main/scala/org/apache/spark/sql/Dataset.scala#L2816-L2828 On Fri, Oct 13, 2017 at 5:24 PM, Supun Nakandala

Re: Hive From Spark: Jdbc VS sparkContext

2017-10-13 Thread Kabeer Ahmed
My take on this might sound a bit different. Here are few points to consider below: 1. Going through Hive JDBC means that the application is restricted by the # of queries that can be compiled. HS2 can only compile one SQL at a time and if users have bad SQL, it can take a long time just to

Re: Hive From Spark: Jdbc VS sparkContext

2017-10-13 Thread Nicolas Paris
> In case a table has a few > million records, it all goes through the driver. This sounds clear in JDBC mode, the driver get all the rows and then it spreads the RDD over the executors. I d'say that most use cases deal with SQL to aggregate huge datasets, and retrieve small amount of rows to be

Is there a difference between df.cache() vs df.rdd.cache()

2017-10-13 Thread Supun Nakandala
Hi all, I have been experimenting with cache/persist/unpersist methods with respect to both Dataframes and RDD APIs. However, I am experiencing different behaviors Ddataframe API compared RDD API such Dataframes are not getting cached when count() is called. Is there a difference between how

Re: Issue Storing offset in Kafka for Spark Streaming Application

2017-10-13 Thread Arpan Rajani
Hi Gerard, Excellent, indeed your inputs helped. Thank you for the quick reply. I modified the code based on inputs. Now the application starts and it reads from the topic. Now we stream like 50,000 messages on the Kafka topic. After a while we terminate the application using YARN kill and

Re: Spark - Partitions

2017-10-13 Thread Tushar Adeshara
You can also try coalesce as it will avoid full shuffle. Regards, Tushar Adeshara Technical Specialist – Analytics Practice Cell: +91-81490 04192 Persistent Systems Ltd. | Partners in Innovation | www.persistentsys.com From:

[Spark on YARN] Asynchronously launching containers in YARN

2017-10-13 Thread Craig Ingram
I was recently doing some research into Spark on YARN's startup time and observed slow, synchronous allocation of containers/executors. I am testing on a 4 node bare metal cluster w/48 cores and 128GB memory per node. YARN was only allocating about 3 containers per second. Moreover when starting 3