Re: How can we control CPU and Memory per Spark job operation..

2016-07-16 Thread Pedro Rodriguez
You could call map on an RDD which has “many” partitions, then call 
repartition/coalesce to drastically reduce the number of partitions so that 
your second map job has less things running.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 16, 2016 at 4:46:04 PM, Jacek Laskowski (ja...@japila.pl) wrote:

Hi,  

My understanding is that these two map functions will end up as a job  
with one stage (as if you wrote the two maps as a single map) so you  
really need as much vcores and memory as possible for map1 and map2. I  
initially thought about dynamic allocation of executors that may or  
may not help you with the case, but since there's just one stage I  
don't think you can do much.  

Pozdrawiam,  
Jacek Laskowski  
  
https://medium.com/@jaceklaskowski/  
Mastering Apache Spark http://bit.ly/mastering-apache-spark  
Follow me at https://twitter.com/jaceklaskowski  


On Fri, Jul 15, 2016 at 9:54 PM, Pavan Achanta  wrote:  
> Hi All,  
>  
> Here is my use case:  
>  
> I have a pipeline job consisting of 2 map functions:  
>  
> CPU intensive map operation that does not require a lot of memory.  
> Memory intensive map operation that requires upto 4 GB of memory. And this  
> 4GB memory cannot be distributed since it is an NLP model.  
>  
> Ideally what I like to do is to use 20 nodes with 4 cores each and minimal  
> memory for first map operation and then use only 3 nodes with minimal CPU  
> but each having 4GB of memory for 2nd operation.  
>  
> While it is possible to control this parallelism for each map operation in  
> spark. I am not sure how to control the resources for each operation.  
> Obviously I don’t want to start off the job with 20 nodes with 4 cores and  
> 4GB memory since I cannot afford that much memory.  
>  
> We use Yarn with Spark. Any suggestions ?  
>  
> Thanks and regards,  
> Pavan  
>  
>  

-  
To unsubscribe e-mail: user-unsubscr...@spark.apache.org  



Re: How can we control CPU and Memory per Spark job operation..

2016-07-16 Thread Jacek Laskowski
Hi,

My understanding is that these two map functions will end up as a job
with one stage (as if you wrote the two maps as a single map) so you
really need as much vcores and memory as possible for map1 and map2. I
initially thought about dynamic allocation of executors that may or
may not help you with the case, but since there's just one stage I
don't think you can do much.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Fri, Jul 15, 2016 at 9:54 PM, Pavan Achanta  wrote:
> Hi All,
>
> Here is my use case:
>
> I have a pipeline job consisting of 2 map functions:
>
> CPU intensive map operation that does not require a lot of memory.
> Memory intensive map operation that requires upto 4 GB of memory. And this
> 4GB memory cannot be distributed since it is an NLP model.
>
> Ideally what I like to do is to use 20 nodes with 4 cores each and minimal
> memory for first map operation and then use only 3 nodes with minimal CPU
> but each having 4GB of memory for 2nd operation.
>
> While it is possible to control this parallelism for each map operation in
> spark. I am not sure how to control the resources for each operation.
> Obviously I don’t want to start off the job with 20 nodes with 4 cores and
> 4GB memory since I cannot afford that much memory.
>
> We use Yarn with Spark. Any suggestions ?
>
> Thanks and regards,
> Pavan
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Filtering RDD Using Spark.mllib's ChiSqSelector

2016-07-16 Thread Tobi Bosede
Hi Yanbo,

Appreciate the response. I might not have phrased this correctly, but I
really wanted to know how to convert the pipeline rdd into a data frame. I
have seen the example you posted. However I need to transform all my data,
just not 1 line. So I did sucessfully use map to use the chisq selector to
filter the chosen features of my data. I just want to convert it to a df so
I can apply a logistic regression model from spark.ml.

Trust me I would use the dataframes api if I could, but the chisq
functionality is not available to me in the python spark 1.4 api.

Regards,
Tobi

On Jul 16, 2016 4:53 AM, "Yanbo Liang"  wrote:

> Hi Tobi,
>
> The MLlib RDD-based API does support to apply transformation on both
> Vector and RDD, but you did not use the appropriate way to do.
> Suppose you have a RDD with LabeledPoint in each line, you can refer the
> following code snippets to train a ChiSqSelectorModel model and do
> transformation:
>
> from pyspark.mllib.regression import LabeledPoint
>
> from pyspark.mllib.feature import ChiSqSelector
>
> data = [LabeledPoint(0.0, SparseVector(3, {0: 8.0, 1: 7.0})), 
> LabeledPoint(1.0, SparseVector(3, {1: 9.0, 2: 6.0})), LabeledPoint(1.0, [0.0, 
> 9.0, 8.0]), LabeledPoint(2.0, [8.0, 9.0, 5.0])]
>
> rdd = sc.parallelize(data)
>
> model = ChiSqSelector(1).fit(rdd)
>
> filteredRDD = model.transform(rdd.map(lambda lp: lp.features))
>
> filteredRDD.collect()
>
> However, we strongly recommend you to migrate to DataFrame-based API since
> the RDD-based API is switched to maintain mode.
>
> Thanks
> Yanbo
>
> 2016-07-14 13:23 GMT-07:00 Tobi Bosede :
>
>> Hi everyone,
>>
>> I am trying to filter my features based on the spark.mllib ChiSqSelector.
>>
>> filteredData = vectorizedTestPar.map(lambda lp: LabeledPoint(lp.label,
>> model.transform(lp.features)))
>>
>> However when I do the following I get the error below. Is there any other
>> way to filter my data to avoid this error?
>>
>> filteredDataDF=filteredData.toDF()
>>
>> Exception: It appears that you are attempting to reference SparkContext from 
>> a broadcast variable, action, or transforamtion. SparkContext can only be 
>> used on the driver, not in code that it run on workers. For more 
>> information, see SPARK-5063.
>>
>>
>> I would directly use the spark.ml ChiSqSelector and work with dataframes, 
>> but I am on spark 1.4 and using pyspark. So spark.ml's ChiSqSelector is not 
>> available to me. filteredData is of type piplelineRDD, if that helps. It is 
>> not a regular RDD. I think that may part of why calling toDF() is not 
>> working.
>>
>>
>> Thanks,
>>
>> Tobi
>>
>>
>


Fwd: File to read sharded (2 levels) parquet files

2016-07-16 Thread Pei Sun
 Hi Spark experts,
 spark version: 2.0.0-preview,
  hadoop version: 2.4, 2.7 (all tried, none works)

 The data is in parquet format and stored in hdfs:
/root/file/partition1/file-xxx.parquet
/root/file/partition2/file-xxx.parquet

Then I did:
sqlContext.read.format("parquet").load("hdfs://Master:port/root/file")

It failed with:

val a = sqlContext.read.parquet("hdfs://
ec2-54-191-19-229.us-west-2.compute.amazonaws.com:9000/alluxio_storage/tpcds1_r2/catalog_sales
")

java.io.FileNotFoundException: Path is not a file:
/alluxio_storage/tpcds1_r2/catalog_sales/cs_sold_date_sk=2450815

at
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:75)

at
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)

at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)

at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)

at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)

at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:587)

at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)

at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)

at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)

at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)

at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:415)

at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)

at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)


  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

  at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)

  at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

  at java.lang.reflect.Constructor.newInstance(Constructor.java:526)

  at
org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)

  at
org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)

  at
org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1239)

  at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1224)

  at org.apache.hadoop.hdfs.DFSClient.getBlockLocations(DFSClient.java:1282)

  at
org.apache.hadoop.hdfs.DistributedFileSystem$1.doCall(DistributedFileSystem.java:221)

  at
org.apache.hadoop.hdfs.DistributedFileSystem$1.doCall(DistributedFileSystem.java:217)

  at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)

  at
org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:228)

  at
org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:209)

  at
org.apache.spark.sql.execution.datasources.ListingFileCatalog$$anonfun$1$$anonfun$apply$2.apply(ListingFileCatalog.scala:104)

  at
org.apache.spark.sql.execution.datasources.ListingFileCatalog$$anonfun$1$$anonfun$apply$2.apply(ListingFileCatalog.scala:92)

  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

  at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)

  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)

  at
org.apache.spark.sql.execution.datasources.ListingFileCatalog$$anonfun$1.apply(ListingFileCatalog.scala:92)

  at
org.apache.spark.sql.execution.datasources.ListingFileCatalog$$anonfun$1.apply(ListingFileCatalog.scala:80)

  at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)

  at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)

  at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

  at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)

  at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)

  at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)

  at
org.apache.spark.sql.execution.datasources.ListingFileCatalog.listLeafFiles(ListingFileCatalog.scala:80)

  at
org.apache.spark.sql.execution.datasources.ListingFileCatalog.refresh(ListingFileCatalog.scala:69)

  at

Re: standalone mode only supports FIFO scheduler across applications ? still in spark 2.0 time ?

2016-07-16 Thread Teng Qiu
Hi Mark, thanks, we just want to keep our system as simple as
possible, using YARN means we need to maintain a full-size hadoop
cluster, we are using s3 as storage layer, so HDFS is not needed, a
hadoop cluster is a little bit overkill, mesos is an option, but
still, it brings extra operation costs.

So... any suggestion from you?

Thanks


2016-07-15 18:51 GMT+02:00 Mark Hamstra :
> Nothing has changed in that regard, nor is there likely to be "progress",
> since more sophisticated or capable resource scheduling at the Application
> level is really beyond the design goals for standalone mode.  If you want
> more in the way of multi-Application resource scheduling, then you should be
> looking at Yarn or Mesos.  Is there some reason why neither of those options
> can work for you?
>
> On Fri, Jul 15, 2016 at 9:15 AM, Teng Qiu  wrote:
>>
>> Hi,
>>
>>
>> http://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/spark-standalone.html#resource-scheduling
>> The standalone cluster mode currently only supports a simple FIFO
>> scheduler across applications.
>>
>> is this sentence still true? any progress on this? it will really
>> helpful. some roadmap?
>>
>> Thanks
>>
>> Teng
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



High availability with Spark

2016-07-16 Thread KhajaAsmath Mohammed
Hi,

could you please share your thoughts if anyone has idea on the below
topics.

   - How to achieve high availability with spark cluster? I have referred
   to the link 
*https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/exercises/spark-exercise-standalone-master-ha.html
   
*
   . is there any other way to do in cluster mode?
   - How to achieve high availability of spark driver? I have gone through
   documentation that it is achieved through check pointing directory. is
   there any other way?
   - what is the procedure to know the number of messages that have been
   consumed by the consumer? is there any way to tack the number of messages
   consumed in spark streaming.
   - I also want to save data from the spark streaming periodically and do
   the aggregation on that. lets say, save date for every hour/day etc and do
   aggregations on that.


Thanks,
Asmath.


Re: Latest 200 messages per topic

2016-07-16 Thread Rabin Banerjee
Just to add ,

  I want to read the MAX_OFFSET of a topic , then read MAX_OFFSET-200 ,
every time .

Also I want to know , If I want to fetch a specific offset range for Batch
processing, is there any option for doing that ?



On Sat, Jul 16, 2016 at 9:08 PM, Rabin Banerjee <
dev.rabin.baner...@gmail.com> wrote:

> HI All,
>
>I have 1000 kafka topics each storing messages for different devices .
> I want to use the direct approach for connecting kafka from Spark , in
> which I am only interested in latest 200 messages in the Kafka .
>
> How do I do that ?
>
> Thanks.
>


Latest 200 messages per topic

2016-07-16 Thread Rabin Banerjee
HI All,

   I have 1000 kafka topics each storing messages for different devices . I
want to use the direct approach for connecting kafka from Spark , in which
I am only interested in latest 200 messages in the Kafka .

How do I do that ?

Thanks.


Re: Feature importance IN random forest

2016-07-16 Thread Yanbo Liang
Spark 1.5 only support getting feature importance for
RandomForestClassificationModel and RandomForestRegressionModel by Scala.
We support this feature in PySpark until 2.0.0.

It's very straight forward with a few lines of code.

rf = RandomForestClassifier(numTrees=3, maxDepth=2, labelCol="indexed", seed=42)

model = rf.fit(td)

model.featureImportances

Then you can get the feature importances which is a Vector.

Thanks
Yanbo

2016-07-12 10:30 GMT-07:00 pseudo oduesp :

> Hi,
>  i use pyspark 1.5.0
> can i  ask you how i can get feature imprtance for a randomforest
> algorithme in pyspark and please give me example
> thanks for advance.
>


Re: bisecting kmeans model tree

2016-07-16 Thread Yanbo Liang
Currently we do not expose the APIs to get the Bisecting KMeans tree
structure, they are private in the ml.clustering package scope.
But I think we should make a plan to expose these APIs like what we did for
Decision Tree.

Thanks
Yanbo

2016-07-12 11:45 GMT-07:00 roni :

> Hi Spark,Mlib experts,
> Anyone who can shine light on this?
> Thanks
> _R
>
> On Thu, Apr 21, 2016 at 12:46 PM, roni  wrote:
>
>> Hi ,
>>  I want to get the bisecting kmeans tree structure to show a dendogram
>>  on the heatmap I am generating based on the hierarchical clustering of
>> data.
>>  How do I get that using mlib .
>> Thanks
>> -Roni
>>
>
>


Re: Dense Vectors outputs in feature engineering

2016-07-16 Thread Yanbo Liang
Since you use two steps (StringIndexer and OneHotEncoder) to encode
categories to Vector, I guess you want to decode the eventual vector into
their original categories.
Suppose you have a DataFrame with only one column named "name", there are
three categories: "b", "a", "c" (ranked by frequency). You can refer the
following code snippets to do encode and decode:

val df = spark.createDataFrame(Seq("a", "b", "c", "b", "a",
"b").map(Tuple1.apply)).toDF("name")

val si = new StringIndexer().setInputCol("name").setOutputCol("indexedName")

val siModel = si.fit(df)

val df2 = siModel.transform(df)

val encoder = new OneHotEncoder()

  .setDropLast(false)

  .setInputCol("indexedName")

  .setOutputCol("encodedName")

val df3 = encoder.transform(df2)

df3.show()

// Decode to get the original categories.

val group = AttributeGroup.fromStructField(df3.schema("encodedName"))

val categories = group.attributes.get.map(_.name.get)

println(categories.mkString(","))

// Output: b,a,c


Thanks
Yanbo

2016-07-14 6:46 GMT-07:00 rachmaninovquartet :

> or would it be common practice to just retain the original categories in
> another df?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Dense-Vectors-outputs-in-feature-engineering-tp27331p27337.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Filtering RDD Using Spark.mllib's ChiSqSelector

2016-07-16 Thread Yanbo Liang
Hi Tobi,

The MLlib RDD-based API does support to apply transformation on both Vector
and RDD, but you did not use the appropriate way to do.
Suppose you have a RDD with LabeledPoint in each line, you can refer the
following code snippets to train a ChiSqSelectorModel model and do
transformation:

from pyspark.mllib.regression import LabeledPoint

from pyspark.mllib.feature import ChiSqSelector

data = [LabeledPoint(0.0, SparseVector(3, {0: 8.0, 1: 7.0})),
LabeledPoint(1.0, SparseVector(3, {1: 9.0, 2: 6.0})),
LabeledPoint(1.0, [0.0, 9.0, 8.0]), LabeledPoint(2.0, [8.0, 9.0,
5.0])]

rdd = sc.parallelize(data)

model = ChiSqSelector(1).fit(rdd)

filteredRDD = model.transform(rdd.map(lambda lp: lp.features))

filteredRDD.collect()

However, we strongly recommend you to migrate to DataFrame-based API since
the RDD-based API is switched to maintain mode.

Thanks
Yanbo

2016-07-14 13:23 GMT-07:00 Tobi Bosede :

> Hi everyone,
>
> I am trying to filter my features based on the spark.mllib ChiSqSelector.
>
> filteredData = vectorizedTestPar.map(lambda lp: LabeledPoint(lp.label,
> model.transform(lp.features)))
>
> However when I do the following I get the error below. Is there any other
> way to filter my data to avoid this error?
>
> filteredDataDF=filteredData.toDF()
>
> Exception: It appears that you are attempting to reference SparkContext from 
> a broadcast variable, action, or transforamtion. SparkContext can only be 
> used on the driver, not in code that it run on workers. For more information, 
> see SPARK-5063.
>
>
> I would directly use the spark.ml ChiSqSelector and work with dataframes, but 
> I am on spark 1.4 and using pyspark. So spark.ml's ChiSqSelector is not 
> available to me. filteredData is of type piplelineRDD, if that helps. It is 
> not a regular RDD. I think that may part of why calling toDF() is not working.
>
>
> Thanks,
>
> Tobi
>
>


Re: QuantileDiscretizer not working properly with big dataframes

2016-07-16 Thread Yanbo Liang
Could you tell us the Spark version you used?
We have fixed this bug at Spark 1.6.2 and Spark 2.0, please upgrade to
these versions and retry.
If this issue still exists, please let us know.

Thanks
Yanbo

2016-07-12 11:03 GMT-07:00 Pasquinell Urbani <
pasquinell.urb...@exalitica.com>:

> In the forum mentioned above the flowing solution is suggested
>
> Problem is in line 113 and 114 of QuantileDiscretizer.scala and can be
> fixed by changing line 113 like so:
> before: val requiredSamples = math.max(numBins * numBins, 1)
> after: val requiredSamples = math.max(numBins * numBins, 1.0)
>
> Is there another way?
>
>
> 2016-07-11 18:28 GMT-04:00 Pasquinell Urbani <
> pasquinell.urb...@exalitica.com>:
>
>> Hi all,
>>
>> We have a dataframe with 2.5 millions of records and 13 features. We want
>> to perform a logistic regression with this data but first we neet to divide
>> each columns in discrete values using QuantileDiscretizer. This will
>> improve the performance of the model by avoiding outliers.
>>
>> For small dataframes QuantileDiscretizer works perfect (see the ml
>> example:
>> https://spark.apache.org/docs/1.6.0/ml-features.html#quantilediscretizer),
>> but for large data frames it tends to split the column in only the values 0
>> and 1 (despite the custom number of buckets is settled in to 5). Here is my
>> code:
>>
>> val discretizer = new QuantileDiscretizer()
>>   .setInputCol("C4")
>>   .setOutputCol("C4_Q")
>>   .setNumBuckets(5)
>>
>> val result = discretizer.fit(df3).transform(df3)
>> result.show()
>>
>> I found the same problem presented here:
>> https://issues.apache.org/jira/browse/SPARK-13444 . But there is no
>> solution yet.
>>
>> Do I am configuring the function in a bad way? Should I pre-process the
>> data (like z-scores)? Can somebody help me dealing with this?
>>
>> Regards
>>
>
>


Re: Spark streaming takes longer time to read json into dataframes

2016-07-16 Thread Martin Eden
Hi,

I would just do a repartition on the initial direct DStream since otherwise
each RDD in the stream has exactly as many partitions as you have
partitions in the Kafka topic (in your case 1). Like that receiving is
still done in only 1 thread but at least the processing further down is
done in parallel.

If you want to parallelize your receiving as well I would partition my
Kafka topic and then the RDDs in the initial DStream will have as many
partitions as you set in Kafka.

Have you seen this?
http://spark.apache.org/docs/latest/streaming-kafka-integration.html

M

On Sat, Jul 16, 2016 at 5:26 AM, Diwakar Dhanuskodi <
diwakar.dhanusk...@gmail.com> wrote:

>
> -- Forwarded message --
> From: Diwakar Dhanuskodi 
> Date: Sat, Jul 16, 2016 at 9:30 AM
> Subject: Re: Spark streaming takes longer time to read json into dataframes
> To: Jean Georges Perrin 
>
>
> Hello,
>
> I need it on memory.  Increased executor memory to 25G and executor cores
> to 3. Got same result. There is always one task running under executor for
> rdd.read.json() because rdd partition size is 1 . Doing hash partitioning
> inside foreachRDD is a good approach?
>
> Regards,
> Diwakar.
>
> On Sat, Jul 16, 2016 at 9:20 AM, Jean Georges Perrin  wrote:
>
>> Do you need it on disk or just push it to memory? Can you try to increase
>> memory or # of cores (I know it sounds basic)
>>
>> > On Jul 15, 2016, at 11:43 PM, Diwakar Dhanuskodi <
>> diwakar.dhanusk...@gmail.com> wrote:
>> >
>> > Hello,
>> >
>> > I have 400K json messages pulled from Kafka into spark streaming using
>> DirectStream approach. Size of 400K messages is around 5G.  Kafka topic is
>> single partitioned. I am using rdd.read.json(_._2) inside foreachRDD to
>> convert  rdd into dataframe. It takes almost 2.3 minutes to convert into
>> dataframe.
>> >
>> > I am running in Yarn client mode with executor memory as 15G and
>> executor cores as 2.
>> >
>> > Caching rdd before converting into dataframe  doesn't change processing
>> time. Whether introducing hash partitions inside foreachRDD  will help?
>> (or) Will partitioning topic and have more than one DirectStream help?. How
>> can I approach this situation to reduce time in converting to dataframe..
>> >
>> > Regards,
>> > Diwakar.
>>
>>
>
>


Re: How to convert from DataFrame to Dataset[Row]?

2016-07-16 Thread Sun Rui
For Spark 1.6.x, a DataFrame can't be directly converted to a Dataset[Row], but 
can done indirectly as follows:

import org.apache.spark.sql.catalyst.encoders.RowEncoder
// assume df is a DataFrame
implicit val encoder: ExpressionEncoder[Row]  = RowEncoder(df.schema)
val ds = df.as[Row]

However, it may be more convenient to convert a DataFrame to a Dataset of Tuple 
or case class corresponding to the row schema. 

> On Jul 16, 2016, at 03:21, Daniel Barclay  wrote:
> 
> In Spark 1.6.1, how can I convert a DataFrame to a Dataset[Row]?
> 
> Is there a direct conversion?  (Trying .as[Row] doesn't work,
> even after importing  .implicits._ .)
> 
> Is there some way to map the Rows from the Dataframe into the Dataset[Row]?
> (DataFrame.map would just make another Dataframe, right?)
> 
> 
> Thanks,
> Daniel
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 



Re: Spark Streaming - Best Practices to handle multiple datapoints arriving at different time interval

2016-07-16 Thread Daniel Haviv
Or use mapWithState

Thank you.
Daniel

> On 16 Jul 2016, at 03:02, RK Aduri  wrote:
> 
> You can probably define sliding windows and set larger batch intervals. 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Best-Practices-to-handle-multiple-datapoints-arriving-at-different-time-interval-tp27315p27348.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>