BinaryFiles to ZipInputStream

2016-03-23 Thread Benjamin Kim
I need a little help. I am loading into Spark 1.6 zipped csv files stored in s3.

First of all, I am able to get the List of file keys that have a modified date 
within a range of time by using the AWS SDK Objects (AmazonS3Client, 
ObjectListing, S3ObjectSummary, ListObjectsRequest, GetObjectRequest). Then, by 
setting up the HadoopConfiguration object with s3 access and secret keys, I 
parallelize, partition, and iterate through the List to load each file’s 
contents into a RDD[(String, org.apache.spark.input.PortableDataStream)].

val hadoopConf = sc.hadoopConfiguration;
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", accessKey)
hadoopConf.set("fs.s3.awsSecretAccessKey", secretKey)

val filesRdd = sc.parallelize(lFiles)
filesRdd.foreachPartition(files => {
  val lZipFiles = files.map(x => sc.binaryFiles("s3://" + s3Bucket + "/" + x))
  val lZipStream = lZipFiles.map(x => new ZipInputStream(x)) // make them all 
zip input streams
  val lStrContent = lZipStream.map(x => readZipStream(x))  // read contents 
into string 

})

This is where I need help. I get this error.

:196: error: type mismatch;
 found   : org.apache.spark.rdd.RDD[(String, 
org.apache.spark.input.PortableDataStream)]
 required: java.io.InputStream
val lZipStream = lZipFiles.map(x => new ZipInputStream(x)) // 
make them all zip input streams

   ^

Does anyone know how to load the PortableDataStream returned in a RDD and 
convert it into a ZipInputStream?

Thanks,
Ben
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark 1.6.0 connect to hive metastore

2016-03-23 Thread Koert Kuipers
can someone provide the correct settings for spark 1.6.1 to work with cdh 5
(hive 1.1.0)?

in particular the settings for:
spark.sql.hive.version
spark.sql.hive.metastore.jars

also it would be helpful to know if your spark jar includes hadoop
dependencies or not.

i realize it works (or at least seems to work) if you simply set the
spark.sql.hive.version to 1.2.1 and spark.sql.hive.metastore.jars to
builtin, but i find it somewhat unsatisfactory to rely on that happy
coincidence.


On Sat, Mar 12, 2016 at 7:09 PM, Timur Shenkao  wrote:

> I had similar issue with CDH 5.5.3.
> Not only with Spark 1.6 but with beeline as well.
> I resolved it via installation & running hiveserver2 role instance at the
> same server wher metastore is. 
>
> On Tue, Feb 9, 2016 at 10:58 PM, Koert Kuipers  wrote:
>
>> has anyone successfully connected to hive metastore using spark 1.6.0? i
>> am having no luck. worked fine with spark 1.5.1 for me. i am on cdh 5.5 and
>> launching spark with yarn.
>>
>> this is what i see in logs:
>> 16/02/09 14:49:12 INFO hive.metastore: Trying to connect to metastore
>> with URI thrift://metastore.mycompany.com:9083
>> 16/02/09 14:49:12 INFO hive.metastore: Connected to metastore.
>>
>> and then a little later:
>>
>> 16/02/09 14:49:34 INFO hive.HiveContext: Initializing execution hive,
>> version 1.2.1
>> 16/02/09 14:49:34 INFO client.ClientWrapper: Inspected Hadoop version:
>> 2.6.0-cdh5.4.4
>> 16/02/09 14:49:34 INFO client.ClientWrapper: Loaded
>> org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.6.0-cdh5.4.4
>> 16/02/09 14:49:34 WARN conf.HiveConf: HiveConf of name
>> hive.server2.enable.impersonation does not exist
>> 16/02/09 14:49:35 INFO metastore.HiveMetaStore: 0: Opening raw store with
>> implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
>> 16/02/09 14:49:35 INFO metastore.ObjectStore: ObjectStore, initialize
>> called
>> 16/02/09 14:49:35 INFO DataNucleus.Persistence: Property
>> hive.metastore.integral.jdo.pushdown unknown - will be ignored
>> 16/02/09 14:49:35 INFO DataNucleus.Persistence: Property
>> datanucleus.cache.level2 unknown - will be ignored
>> 16/02/09 14:49:35 WARN DataNucleus.Connection: BoneCP specified but not
>> present in CLASSPATH (or one of dependencies)
>> 16/02/09 14:49:35 WARN DataNucleus.Connection: BoneCP specified but not
>> present in CLASSPATH (or one of dependencies)
>> 16/02/09 14:49:37 WARN conf.HiveConf: HiveConf of name
>> hive.server2.enable.impersonation does not exist
>> 16/02/09 14:49:37 INFO metastore.ObjectStore: Setting MetaStore object
>> pin classes with
>> hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"
>> 16/02/09 14:49:38 INFO DataNucleus.Datastore: The class
>> "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as
>> "embedded-only" so does not have its own datastore table.
>> 16/02/09 14:49:38 INFO DataNucleus.Datastore: The class
>> "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as
>> "embedded-only" so does not have its own datastore table.
>> 16/02/09 14:49:40 INFO DataNucleus.Datastore: The class
>> "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as
>> "embedded-only" so does not have its own datastore table.
>> 16/02/09 14:49:40 INFO DataNucleus.Datastore: The class
>> "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as
>> "embedded-only" so does not have its own datastore table.
>> 16/02/09 14:49:40 INFO metastore.MetaStoreDirectSql: Using direct SQL,
>> underlying DB is DERBY
>> 16/02/09 14:49:40 INFO metastore.ObjectStore: Initialized ObjectStore
>> java.lang.RuntimeException: java.lang.RuntimeException: Unable to
>> instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
>>   at
>> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
>>   at
>> org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:194)
>>   at
>> org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:238)
>>   at
>> org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveContext.scala:218)
>>   at
>> org.apache.spark.sql.hive.HiveContext.executionHive(HiveContext.scala:208)
>>   at org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:440)
>>   at
>> org.apache.spark.sql.SQLContext$$anonfun$4.apply(SQLContext.scala:272)
>>   at
>> org.apache.spark.sql.SQLContext$$anonfun$4.apply(SQLContext.scala:271)
>>   at scala.collection.Iterator$class.foreach(Iterator.scala:742)
>>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
>>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>   at org.apache.spark.sql.SQLContext.(SQLContext.scala:271)
>>   at org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:97)
>>   at 

Re: What's the benifit of RDD checkpoint against RDD save

2016-03-23 Thread Mark Hamstra
Yes, the terminology is being used sloppily/non-standardly in this thread
-- "the last RDD" after a series of transformation is the RDD at the
beginning of the chain, just now with an attached chain of "to be done"
transformations when an action is eventually run.  If the saveXXX action is
the only action being performed on the RDD, the rest of the chain being
purely transformations, then checkpointing instead of saving still wouldn't
execute any action on the RDD -- it would just mark the point at which
checkpointing should be done when an action is eventually run.

On Wed, Mar 23, 2016 at 7:38 PM, Ted Yu  wrote:

> bq. when I get the last RDD
> If I read Todd's first email correctly, the computation has been done.
> I could be wrong.
>
> On Wed, Mar 23, 2016 at 7:34 PM, Mark Hamstra 
> wrote:
>
>> Neither of you is making any sense to me.  If you just have an RDD for
>> which you have specified a series of transformations but you haven't run
>> any actions, then neither checkpointing nor saving makes sense -- you
>> haven't computed anything yet, you've only written out the recipe for how
>> the computation should be done when it is needed.  Neither does the "called
>> before any job" comment pose any restriction in this case since no jobs
>> have yet been executed on the RDD.
>>
>> On Wed, Mar 23, 2016 at 7:18 PM, Ted Yu  wrote:
>>
>>> See the doc for checkpoint:
>>>
>>>* Mark this RDD for checkpointing. It will be saved to a file inside
>>> the checkpoint
>>>* directory set with `SparkContext#setCheckpointDir` and all
>>> references to its parent
>>>* RDDs will be removed. *This function must be called before any job
>>> has been*
>>> *   * executed on this RDD*. It is strongly recommended that this RDD
>>> is persisted in
>>>* memory, otherwise saving it on a file will require recomputation.
>>>
>>> From the above description, you should not call it at the end of
>>> transformations.
>>>
>>> Cheers
>>>
>>> On Wed, Mar 23, 2016 at 7:14 PM, Todd  wrote:
>>>
 Hi,

 I have a long computing chain, when I get the last RDD after a series
 of transformation. I have two choices to do with this last RDD

 1. Call checkpoint on RDD to materialize it to disk
 2. Call RDD.saveXXX to save it to HDFS, and read it back for further
 processing

 I would ask which choice is better? It looks to me that is not much
 difference between the two choices.
 Thanks!



>>>
>>
>


Re: What's the benifit of RDD checkpoint against RDD save

2016-03-23 Thread Ted Yu
bq. when I get the last RDD
If I read Todd's first email correctly, the computation has been done.
I could be wrong.

On Wed, Mar 23, 2016 at 7:34 PM, Mark Hamstra 
wrote:

> Neither of you is making any sense to me.  If you just have an RDD for
> which you have specified a series of transformations but you haven't run
> any actions, then neither checkpointing nor saving makes sense -- you
> haven't computed anything yet, you've only written out the recipe for how
> the computation should be done when it is needed.  Neither does the "called
> before any job" comment pose any restriction in this case since no jobs
> have yet been executed on the RDD.
>
> On Wed, Mar 23, 2016 at 7:18 PM, Ted Yu  wrote:
>
>> See the doc for checkpoint:
>>
>>* Mark this RDD for checkpointing. It will be saved to a file inside
>> the checkpoint
>>* directory set with `SparkContext#setCheckpointDir` and all
>> references to its parent
>>* RDDs will be removed. *This function must be called before any job
>> has been*
>> *   * executed on this RDD*. It is strongly recommended that this RDD is
>> persisted in
>>* memory, otherwise saving it on a file will require recomputation.
>>
>> From the above description, you should not call it at the end of
>> transformations.
>>
>> Cheers
>>
>> On Wed, Mar 23, 2016 at 7:14 PM, Todd  wrote:
>>
>>> Hi,
>>>
>>> I have a long computing chain, when I get the last RDD after a series of
>>> transformation. I have two choices to do with this last RDD
>>>
>>> 1. Call checkpoint on RDD to materialize it to disk
>>> 2. Call RDD.saveXXX to save it to HDFS, and read it back for further
>>> processing
>>>
>>> I would ask which choice is better? It looks to me that is not much
>>> difference between the two choices.
>>> Thanks!
>>>
>>>
>>>
>>
>


Re: What's the benifit of RDD checkpoint against RDD save

2016-03-23 Thread Mark Hamstra
Neither of you is making any sense to me.  If you just have an RDD for
which you have specified a series of transformations but you haven't run
any actions, then neither checkpointing nor saving makes sense -- you
haven't computed anything yet, you've only written out the recipe for how
the computation should be done when it is needed.  Neither does the "called
before any job" comment pose any restriction in this case since no jobs
have yet been executed on the RDD.

On Wed, Mar 23, 2016 at 7:18 PM, Ted Yu  wrote:

> See the doc for checkpoint:
>
>* Mark this RDD for checkpointing. It will be saved to a file inside
> the checkpoint
>* directory set with `SparkContext#setCheckpointDir` and all references
> to its parent
>* RDDs will be removed. *This function must be called before any job
> has been*
> *   * executed on this RDD*. It is strongly recommended that this RDD is
> persisted in
>* memory, otherwise saving it on a file will require recomputation.
>
> From the above description, you should not call it at the end of
> transformations.
>
> Cheers
>
> On Wed, Mar 23, 2016 at 7:14 PM, Todd  wrote:
>
>> Hi,
>>
>> I have a long computing chain, when I get the last RDD after a series of
>> transformation. I have two choices to do with this last RDD
>>
>> 1. Call checkpoint on RDD to materialize it to disk
>> 2. Call RDD.saveXXX to save it to HDFS, and read it back for further
>> processing
>>
>> I would ask which choice is better? It looks to me that is not much
>> difference between the two choices.
>> Thanks!
>>
>>
>>
>


Re: What's the benifit of RDD checkpoint against RDD save

2016-03-23 Thread Ted Yu
See the doc for checkpoint:

   * Mark this RDD for checkpointing. It will be saved to a file inside the
checkpoint
   * directory set with `SparkContext#setCheckpointDir` and all references
to its parent
   * RDDs will be removed. *This function must be called before any job has
been*
*   * executed on this RDD*. It is strongly recommended that this RDD is
persisted in
   * memory, otherwise saving it on a file will require recomputation.

>From the above description, you should not call it at the end of
transformations.

Cheers

On Wed, Mar 23, 2016 at 7:14 PM, Todd  wrote:

> Hi,
>
> I have a long computing chain, when I get the last RDD after a series of
> transformation. I have two choices to do with this last RDD
>
> 1. Call checkpoint on RDD to materialize it to disk
> 2. Call RDD.saveXXX to save it to HDFS, and read it back for further
> processing
>
> I would ask which choice is better? It looks to me that is not much
> difference between the two choices.
> Thanks!
>
>
>


What's the benifit of RDD checkpoint against RDD save

2016-03-23 Thread Todd
Hi,

I have a long computing chain, when I get the last RDD after a series of 
transformation. I have two choices to do with this last RDD

1. Call checkpoint on RDD to materialize it to disk
2. Call RDD.saveXXX to save it to HDFS, and read it back for further processing

I would ask which choice is better? It looks to me that is not much difference 
between the two choices.
Thanks!




Re: calling individual columns from spark temporary table

2016-03-23 Thread Ashok Kumar
Thank you again
For
val r = df.filter(col("paid") > "").map(x => 
(x.getString(0),x.getString(1).)

Can you give an example of column expression please like
df.filter(col("paid") > "").col("firstcolumn").getString   ?

 

On Thursday, 24 March 2016, 0:45, Michael Armbrust  
wrote:
 

 You can only use as on a Column expression, not inside of a lambda function.  
The reason is the lambda function is compiled into opaque bytecode that Spark 
SQL is not able to see.  We just blindly execute it.
However, there are a couple of ways to name the columns that come out of a map. 
 Either use a case class instead of a tuple.  Or use .toDF("name1", 
"name2") after the map.
>From a performance perspective, its even better though if you can avoid maps 
>and stick to Column expressions.  The reason is that for maps, we have to 
>actually materialize and object to pass to your function.  However, if you 
>stick to column expression we can actually work directly on serialized data.
On Wed, Mar 23, 2016 at 5:27 PM, Ashok Kumar  wrote:

thank you sir
sql("select `_1` as firstcolumn from items")

is there anyway one can keep the csv column names using databricks when mapping
val r = df.filter(col("paid") > "").map(x => 
(x.getString(0),x.getString(1).)

can I call example  x.getString(0).as.(firstcolumn) in above when mapping if 
possible so columns will have labels


 

On Thursday, 24 March 2016, 0:18, Michael Armbrust  
wrote:
 

 You probably need to use `backticks` to escape `_1` since I don't think that 
its a valid SQL identifier.
On Wed, Mar 23, 2016 at 5:10 PM, Ashok Kumar  
wrote:

Gurus,
If I register a temporary table as below
 r.toDFres58: org.apache.spark.sql.DataFrame = [_1: string, _2: string, _3: 
double, _4: double, _5: double]
r.toDF.registerTempTable("items")
sql("select * from items")res60: org.apache.spark.sql.DataFrame = [_1: string, 
_2: string, _3: double, _4: double, _5: double]
Is there anyway I can do a select on the first column only
sql("select _1 from items" throws error
Thanking you



   



  

Re: calling individual columns from spark temporary table

2016-03-23 Thread Michael Armbrust
You can only use as on a Column expression, not inside of a lambda
function.  The reason is the lambda function is compiled into opaque
bytecode that Spark SQL is not able to see.  We just blindly execute it.

However, there are a couple of ways to name the columns that come out of a
map.  Either use a case class instead of a tuple.  Or use .toDF("name1",
"name2") after the map.

>From a performance perspective, its even better though if you can avoid
maps and stick to Column expressions.  The reason is that for maps, we have
to actually materialize and object to pass to your function.  However, if
you stick to column expression we can actually work directly on serialized
data.

On Wed, Mar 23, 2016 at 5:27 PM, Ashok Kumar  wrote:

> thank you sir
>
> sql("select `_1` as firstcolumn from items")
>
> is there anyway one can keep the csv column names using databricks when
> mapping
>
> val r = df.filter(col("paid") > "").map(x =>
> (x.getString(0),x.getString(1).)
>
> can I call example  x.getString(0).as.(firstcolumn) in above when mapping
> if possible so columns will have labels
>
>
>
>
>
> On Thursday, 24 March 2016, 0:18, Michael Armbrust 
> wrote:
>
>
> You probably need to use `backticks` to escape `_1` since I don't think
> that its a valid SQL identifier.
>
> On Wed, Mar 23, 2016 at 5:10 PM, Ashok Kumar  > wrote:
>
> Gurus,
>
> If I register a temporary table as below
>
>  r.toDF
> res58: org.apache.spark.sql.DataFrame = [_1: string, _2: string, _3:
> double, _4: double, _5: double]
>
> r.toDF.registerTempTable("items")
>
> sql("select * from items")
> res60: org.apache.spark.sql.DataFrame = [_1: string, _2: string, _3:
> double, _4: double, _5: double]
>
> Is there anyway I can do a select on the first column only
>
> sql("select _1 from items" throws error
>
> Thanking you
>
>
>
>
>


RE: Spark 1.5.2, why the broadcast join shuffle so much data in the last step

2016-03-23 Thread Yong Zhang
Sounds good.
I will manual merge this patch on 1.6.1, and test again for my case tomorrow on 
my environment and will update later.
Thanks
Yong

> Date: Wed, 23 Mar 2016 16:20:23 -0700
> Subject: Re: Spark 1.5.2, why the broadcast join shuffle so much data in the 
> last step
> From: dav...@databricks.com
> To: java8...@hotmail.com
> CC: user@spark.apache.org
> 
> On Wed, Mar 23, 2016 at 10:35 AM, Yong Zhang  wrote:
> > Here is the output:
> >
> > == Parsed Logical Plan ==
> > Project [400+ columns]
> > +- Project [400+ columns]
> >+- Project [400+ columns]
> >   +- Project [400+ columns]
> >  +- Join Inner, Somevisid_high#460L = visid_high#948L) &&
> > (visid_low#461L = visid_low#949L)) && (date_time#25L > date_time#513L)))
> > :- Relation[400+ columns] ParquetRelation
> > +- BroadcastHint
> >+- Project [soid_e1#30 AS
> > account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
> >   +- Filter (instr(event_list#105,202) > 0)
> >  +- Relation[400+ columns] ParquetRelation
> >
> > == Analyzed Logical Plan ==
> > 400+ columns
> > Project [400+ columns]
> > +- Project [400+ columns]
> >+- Project [400+ columns]
> >   +- Project [400+ columns]
> >  +- Join Inner, Somevisid_high#460L = visid_high#948L) &&
> > (visid_low#461L = visid_low#949L)) && (date_time#25L > date_time#513L)))
> > :- Relation[400+ columns] ParquetRelation
> > +- BroadcastHint
> >+- Project [soid_e1#30 AS
> > account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
> >   +- Filter (instr(event_list#105,202) > 0)
> >  +- Relation[400+ columns] ParquetRelation
> >
> > == Optimized Logical Plan ==
> > Project [400+ columns]
> > +- Join Inner, Somevisid_high#460L = visid_high#948L) && (visid_low#461L
> > = visid_low#949L)) && (date_time#25L > date_time#513L)))
> >:- Relation[400+ columns] ParquetRelation
> >+- Project [date_time#25L,visid_low#461L,visid_high#460L,account_id#976]
> >   +- BroadcastHint
> >  +- Project [soid_e1#30 AS
> > account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
> > +- Filter (instr(event_list#105,202) > 0)
> >+- Relation[400+ columns] ParquetRelation
> 
> There is a Project on top of BroadcastHint, which is inserted by
> column pruning rule, that make
> the SparkStratege can not regonize BroadcastHint anymore, it's fixed
> recently in master [1]
> 
> https://github.com/apache/spark/pull/11260
> 
> Your join should run as expected in master.
> 
> > == Physical Plan ==
> > Project [400+ columns]
> > +- Filter (date_time#25L > date_time#513L)
> >+- SortMergeJoin [visid_high#948L,visid_low#949L],
> > [visid_high#460L,visid_low#461L]
> >   :- Sort [visid_high#948L ASC,visid_low#949L ASC], false, 0
> >   :  +- TungstenExchange
> > hashpartitioning(visid_high#948L,visid_low#949L,200), None
> >   : +- Scan ParquetRelation[400+ columns] InputPaths:
> > hdfs://xxx/2015/12/17, hdfs://xxx/2015/12/18, hdfs://xxx/2015/12/19,
> > hdfs://xxx/2015/12/20, hdfs://xxx/2015/12/21, hdfs://xxx/2015/12/22,
> > hdfs://xxx/2015/12/23, hdfs://xxx/2015/12/24, hdfs://xxx/2015/12/25,
> > hdfs://xxx/2015/12/26, hdfs://xxx/2015/12/27, hdfs://xxx/2015/12/28,
> > hdfs://xxx/2015/12/29, hdfs://xxx/2015/12/30, hdfs://xxx/2015/12/31,
> > hdfs://xxx/2016/01/01, hdfs://xxx/2016/01/02, hdfs://xxx/2016/01/03,
> > hdfs://xxx/2016/01/04, hdfs://xxx/2016/01/05, hdfs://xxx/2016/01/06,
> > hdfs://xxx/2016/01/07, hdfs://xxx/2016/01/08, hdfs://xxx/2016/01/09,
> > hdfs://xxx/2016/01/10, hdfs://xxx/2016/01/11, hdfs://xxx/2016/01/12,
> > hdfs://xxx/2016/01/13, hdfs://xxx/2016/01/14, hdfs://xxx/2016/01/15,
> > hdfs://xxx/2016/01/16, hdfs://xxx/2016/01/17, hdfs://xxx/2016/01/18,
> > hdfs://xxx/2016/01/19, hdfs://xxx/2016/01/20, hdfs://xxx/2016/01/21,
> > hdfs://xxx/2016/01/22, hdfs://xxx/2016/01/23, hdfs://xxx/2016/01/24,
> > hdfs://xxx/2016/01/25, hdfs://xxx/2016/01/26, hdfs://xxx/2016/01/27,
> > hdfs://xxx/2016/01/28, hdfs://xxx/2016/01/29, hdfs://xxx/2016/01/30,
> > hdfs://xxx/2016/01/31, hdfs://xxx/2016/02/01, hdfs://xxx/2016/02/02,
> > hdfs://xxx/2016/02/03, hdfs://xxx/2016/02/04, hdfs://xxx/2016/02/05,
> > hdfs://xxx/2016/02/06, hdfs://xxx/2016/02/07, hdfs://xxx/2016/02/08,
> > hdfs://xxx/2016/02/09, hdfs://xxx/2016/02/10, hdfs://xxx/2016/02/11,
> > hdfs://xxx/2016/02/12, hdfs://xxx/2016/02/13, hdfs://xxx/2016/02/14,
> > hdfs://xxx/2016/02/15, hdfs://xxx/2016/02/16, hdfs://xxx/2016/02/17,
> > hdfs://xxx/2016/02/18, hdfs://xxx/2016/02/19, hdfs://xxx/2016/02/20,
> > hdfs://xxx/2016/02/21, hdfs://xxx/2016/02/22, hdfs://xxx/2016/02/23,
> > hdfs://xxx/2016/02/24, hdfs://xxx/2016/02/25, hdfs://xxx/2016/02/26,
> > hdfs://xxx/2016/02/27, hdfs://xxx/2016/02/28, hdfs://xxx/2016/02/29,
> > hdfs://xxx/2016/03/01, hdfs://xxx/2016/03/02, hdfs://xxx/2016/03/03,

Re: calling individual columns from spark temporary table

2016-03-23 Thread Ashok Kumar
thank you sir
sql("select `_1` as firstcolumn from items")

is there anyway one can keep the csv column names using databricks when mapping
val r = df.filter(col("paid") > "").map(x => 
(x.getString(0),x.getString(1).)

can I call example  x.getString(0).as.(firstcolumn) in above when mapping if 
possible so columns will have labels


 

On Thursday, 24 March 2016, 0:18, Michael Armbrust  
wrote:
 

 You probably need to use `backticks` to escape `_1` since I don't think that 
its a valid SQL identifier.
On Wed, Mar 23, 2016 at 5:10 PM, Ashok Kumar  
wrote:

Gurus,
If I register a temporary table as below
 r.toDFres58: org.apache.spark.sql.DataFrame = [_1: string, _2: string, _3: 
double, _4: double, _5: double]
r.toDF.registerTempTable("items")
sql("select * from items")res60: org.apache.spark.sql.DataFrame = [_1: string, 
_2: string, _3: double, _4: double, _5: double]
Is there anyway I can do a select on the first column only
sql("select _1 from items" throws error
Thanking you



  

Re: calling individual columns from spark temporary table

2016-03-23 Thread Michael Armbrust
You probably need to use `backticks` to escape `_1` since I don't think
that its a valid SQL identifier.

On Wed, Mar 23, 2016 at 5:10 PM, Ashok Kumar 
wrote:

> Gurus,
>
> If I register a temporary table as below
>
>  r.toDF
> res58: org.apache.spark.sql.DataFrame = [_1: string, _2: string, _3:
> double, _4: double, _5: double]
>
> r.toDF.registerTempTable("items")
>
> sql("select * from items")
> res60: org.apache.spark.sql.DataFrame = [_1: string, _2: string, _3:
> double, _4: double, _5: double]
>
> Is there anyway I can do a select on the first column only
>
> sql("select _1 from items" throws error
>
> Thanking you
>


calling individual columns from spark temporary table

2016-03-23 Thread Ashok Kumar
Gurus,
If I register a temporary table as below
 r.toDFres58: org.apache.spark.sql.DataFrame = [_1: string, _2: string, _3: 
double, _4: double, _5: double]
r.toDF.registerTempTable("items")
sql("select * from items")res60: org.apache.spark.sql.DataFrame = [_1: string, 
_2: string, _3: double, _4: double, _5: double]
Is there anyway I can do a select on the first column only
sql("select _1 from items" throws error
Thanking you

Re: Spark 1.5.2, why the broadcast join shuffle so much data in the last step

2016-03-23 Thread Ted Yu
SPARK-13383 is fixed in 2.0 only, as of this moment.

Any chance of backporting to branch-1.6 ?

Thanks

On Wed, Mar 23, 2016 at 4:20 PM, Davies Liu  wrote:

> On Wed, Mar 23, 2016 at 10:35 AM, Yong Zhang  wrote:
> > Here is the output:
> >
> > == Parsed Logical Plan ==
> > Project [400+ columns]
> > +- Project [400+ columns]
> >+- Project [400+ columns]
> >   +- Project [400+ columns]
> >  +- Join Inner, Somevisid_high#460L = visid_high#948L) &&
> > (visid_low#461L = visid_low#949L)) && (date_time#25L > date_time#513L)))
> > :- Relation[400+ columns] ParquetRelation
> > +- BroadcastHint
> >+- Project [soid_e1#30 AS
> > account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
> >   +- Filter (instr(event_list#105,202) > 0)
> >  +- Relation[400+ columns] ParquetRelation
> >
> > == Analyzed Logical Plan ==
> > 400+ columns
> > Project [400+ columns]
> > +- Project [400+ columns]
> >+- Project [400+ columns]
> >   +- Project [400+ columns]
> >  +- Join Inner, Somevisid_high#460L = visid_high#948L) &&
> > (visid_low#461L = visid_low#949L)) && (date_time#25L > date_time#513L)))
> > :- Relation[400+ columns] ParquetRelation
> > +- BroadcastHint
> >+- Project [soid_e1#30 AS
> > account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
> >   +- Filter (instr(event_list#105,202) > 0)
> >  +- Relation[400+ columns] ParquetRelation
> >
> > == Optimized Logical Plan ==
> > Project [400+ columns]
> > +- Join Inner, Somevisid_high#460L = visid_high#948L) &&
> (visid_low#461L
> > = visid_low#949L)) && (date_time#25L > date_time#513L)))
> >:- Relation[400+ columns] ParquetRelation
> >+- Project
> [date_time#25L,visid_low#461L,visid_high#460L,account_id#976]
> >   +- BroadcastHint
> >  +- Project [soid_e1#30 AS
> > account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
> > +- Filter (instr(event_list#105,202) > 0)
> >+- Relation[400+ columns] ParquetRelation
>
> There is a Project on top of BroadcastHint, which is inserted by
> column pruning rule, that make
> the SparkStratege can not regonize BroadcastHint anymore, it's fixed
> recently in master [1]
>
> https://github.com/apache/spark/pull/11260
>
> Your join should run as expected in master.
>
> > == Physical Plan ==
> > Project [400+ columns]
> > +- Filter (date_time#25L > date_time#513L)
> >+- SortMergeJoin [visid_high#948L,visid_low#949L],
> > [visid_high#460L,visid_low#461L]
> >   :- Sort [visid_high#948L ASC,visid_low#949L ASC], false, 0
> >   :  +- TungstenExchange
> > hashpartitioning(visid_high#948L,visid_low#949L,200), None
> >   : +- Scan ParquetRelation[400+ columns] InputPaths:
> > hdfs://xxx/2015/12/17, hdfs://xxx/2015/12/18, hdfs://xxx/2015/12/19,
> > hdfs://xxx/2015/12/20, hdfs://xxx/2015/12/21, hdfs://xxx/2015/12/22,
> > hdfs://xxx/2015/12/23, hdfs://xxx/2015/12/24, hdfs://xxx/2015/12/25,
> > hdfs://xxx/2015/12/26, hdfs://xxx/2015/12/27, hdfs://xxx/2015/12/28,
> > hdfs://xxx/2015/12/29, hdfs://xxx/2015/12/30, hdfs://xxx/2015/12/31,
> > hdfs://xxx/2016/01/01, hdfs://xxx/2016/01/02, hdfs://xxx/2016/01/03,
> > hdfs://xxx/2016/01/04, hdfs://xxx/2016/01/05, hdfs://xxx/2016/01/06,
> > hdfs://xxx/2016/01/07, hdfs://xxx/2016/01/08, hdfs://xxx/2016/01/09,
> > hdfs://xxx/2016/01/10, hdfs://xxx/2016/01/11, hdfs://xxx/2016/01/12,
> > hdfs://xxx/2016/01/13, hdfs://xxx/2016/01/14, hdfs://xxx/2016/01/15,
> > hdfs://xxx/2016/01/16, hdfs://xxx/2016/01/17, hdfs://xxx/2016/01/18,
> > hdfs://xxx/2016/01/19, hdfs://xxx/2016/01/20, hdfs://xxx/2016/01/21,
> > hdfs://xxx/2016/01/22, hdfs://xxx/2016/01/23, hdfs://xxx/2016/01/24,
> > hdfs://xxx/2016/01/25, hdfs://xxx/2016/01/26, hdfs://xxx/2016/01/27,
> > hdfs://xxx/2016/01/28, hdfs://xxx/2016/01/29, hdfs://xxx/2016/01/30,
> > hdfs://xxx/2016/01/31, hdfs://xxx/2016/02/01, hdfs://xxx/2016/02/02,
> > hdfs://xxx/2016/02/03, hdfs://xxx/2016/02/04, hdfs://xxx/2016/02/05,
> > hdfs://xxx/2016/02/06, hdfs://xxx/2016/02/07, hdfs://xxx/2016/02/08,
> > hdfs://xxx/2016/02/09, hdfs://xxx/2016/02/10, hdfs://xxx/2016/02/11,
> > hdfs://xxx/2016/02/12, hdfs://xxx/2016/02/13, hdfs://xxx/2016/02/14,
> > hdfs://xxx/2016/02/15, hdfs://xxx/2016/02/16, hdfs://xxx/2016/02/17,
> > hdfs://xxx/2016/02/18, hdfs://xxx/2016/02/19, hdfs://xxx/2016/02/20,
> > hdfs://xxx/2016/02/21, hdfs://xxx/2016/02/22, hdfs://xxx/2016/02/23,
> > hdfs://xxx/2016/02/24, hdfs://xxx/2016/02/25, hdfs://xxx/2016/02/26,
> > hdfs://xxx/2016/02/27, hdfs://xxx/2016/02/28, hdfs://xxx/2016/02/29,
> > hdfs://xxx/2016/03/01, hdfs://xxx/2016/03/02, hdfs://xxx/2016/03/03,
> > hdfs://xxx/2016/03/04, hdfs://xxx/2016/03/05, hdfs://xxx/2016/03/06,
> > hdfs://xxx/2016/03/07, hdfs://xxx/2016/03/08, hdfs://xxx/2016/03/09,
> > hdfs://xxx/2016/03/10, 

Best way to determine # of workers

2016-03-23 Thread Ajaxx
I'm building some elasticity into my model and I'd like to know when my
workers have come online.  It appears at present that the API only supports
getting information about applications.  Is there a good way to determine
how many workers are available?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-determine-of-workers-tp26586.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Forcing data from disk to memory

2016-03-23 Thread Daniel Imberman
Hi all,

So I have a question about persistence. Let's say I have an RDD that's
persisted MEMORY_AND_DISK, and I know that I now have enough memory space
cleared up that I can force the data on disk into memory. Is it possible to
tell spark to re-evaluate the open RDD memory and move that information?

Thank you



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Forcing-data-from-disk-to-memory-tp26585.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.5.2, why the broadcast join shuffle so much data in the last step

2016-03-23 Thread Davies Liu
On Wed, Mar 23, 2016 at 10:35 AM, Yong Zhang  wrote:
> Here is the output:
>
> == Parsed Logical Plan ==
> Project [400+ columns]
> +- Project [400+ columns]
>+- Project [400+ columns]
>   +- Project [400+ columns]
>  +- Join Inner, Somevisid_high#460L = visid_high#948L) &&
> (visid_low#461L = visid_low#949L)) && (date_time#25L > date_time#513L)))
> :- Relation[400+ columns] ParquetRelation
> +- BroadcastHint
>+- Project [soid_e1#30 AS
> account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
>   +- Filter (instr(event_list#105,202) > 0)
>  +- Relation[400+ columns] ParquetRelation
>
> == Analyzed Logical Plan ==
> 400+ columns
> Project [400+ columns]
> +- Project [400+ columns]
>+- Project [400+ columns]
>   +- Project [400+ columns]
>  +- Join Inner, Somevisid_high#460L = visid_high#948L) &&
> (visid_low#461L = visid_low#949L)) && (date_time#25L > date_time#513L)))
> :- Relation[400+ columns] ParquetRelation
> +- BroadcastHint
>+- Project [soid_e1#30 AS
> account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
>   +- Filter (instr(event_list#105,202) > 0)
>  +- Relation[400+ columns] ParquetRelation
>
> == Optimized Logical Plan ==
> Project [400+ columns]
> +- Join Inner, Somevisid_high#460L = visid_high#948L) && (visid_low#461L
> = visid_low#949L)) && (date_time#25L > date_time#513L)))
>:- Relation[400+ columns] ParquetRelation
>+- Project [date_time#25L,visid_low#461L,visid_high#460L,account_id#976]
>   +- BroadcastHint
>  +- Project [soid_e1#30 AS
> account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
> +- Filter (instr(event_list#105,202) > 0)
>+- Relation[400+ columns] ParquetRelation

There is a Project on top of BroadcastHint, which is inserted by
column pruning rule, that make
the SparkStratege can not regonize BroadcastHint anymore, it's fixed
recently in master [1]

https://github.com/apache/spark/pull/11260

Your join should run as expected in master.

> == Physical Plan ==
> Project [400+ columns]
> +- Filter (date_time#25L > date_time#513L)
>+- SortMergeJoin [visid_high#948L,visid_low#949L],
> [visid_high#460L,visid_low#461L]
>   :- Sort [visid_high#948L ASC,visid_low#949L ASC], false, 0
>   :  +- TungstenExchange
> hashpartitioning(visid_high#948L,visid_low#949L,200), None
>   : +- Scan ParquetRelation[400+ columns] InputPaths:
> hdfs://xxx/2015/12/17, hdfs://xxx/2015/12/18, hdfs://xxx/2015/12/19,
> hdfs://xxx/2015/12/20, hdfs://xxx/2015/12/21, hdfs://xxx/2015/12/22,
> hdfs://xxx/2015/12/23, hdfs://xxx/2015/12/24, hdfs://xxx/2015/12/25,
> hdfs://xxx/2015/12/26, hdfs://xxx/2015/12/27, hdfs://xxx/2015/12/28,
> hdfs://xxx/2015/12/29, hdfs://xxx/2015/12/30, hdfs://xxx/2015/12/31,
> hdfs://xxx/2016/01/01, hdfs://xxx/2016/01/02, hdfs://xxx/2016/01/03,
> hdfs://xxx/2016/01/04, hdfs://xxx/2016/01/05, hdfs://xxx/2016/01/06,
> hdfs://xxx/2016/01/07, hdfs://xxx/2016/01/08, hdfs://xxx/2016/01/09,
> hdfs://xxx/2016/01/10, hdfs://xxx/2016/01/11, hdfs://xxx/2016/01/12,
> hdfs://xxx/2016/01/13, hdfs://xxx/2016/01/14, hdfs://xxx/2016/01/15,
> hdfs://xxx/2016/01/16, hdfs://xxx/2016/01/17, hdfs://xxx/2016/01/18,
> hdfs://xxx/2016/01/19, hdfs://xxx/2016/01/20, hdfs://xxx/2016/01/21,
> hdfs://xxx/2016/01/22, hdfs://xxx/2016/01/23, hdfs://xxx/2016/01/24,
> hdfs://xxx/2016/01/25, hdfs://xxx/2016/01/26, hdfs://xxx/2016/01/27,
> hdfs://xxx/2016/01/28, hdfs://xxx/2016/01/29, hdfs://xxx/2016/01/30,
> hdfs://xxx/2016/01/31, hdfs://xxx/2016/02/01, hdfs://xxx/2016/02/02,
> hdfs://xxx/2016/02/03, hdfs://xxx/2016/02/04, hdfs://xxx/2016/02/05,
> hdfs://xxx/2016/02/06, hdfs://xxx/2016/02/07, hdfs://xxx/2016/02/08,
> hdfs://xxx/2016/02/09, hdfs://xxx/2016/02/10, hdfs://xxx/2016/02/11,
> hdfs://xxx/2016/02/12, hdfs://xxx/2016/02/13, hdfs://xxx/2016/02/14,
> hdfs://xxx/2016/02/15, hdfs://xxx/2016/02/16, hdfs://xxx/2016/02/17,
> hdfs://xxx/2016/02/18, hdfs://xxx/2016/02/19, hdfs://xxx/2016/02/20,
> hdfs://xxx/2016/02/21, hdfs://xxx/2016/02/22, hdfs://xxx/2016/02/23,
> hdfs://xxx/2016/02/24, hdfs://xxx/2016/02/25, hdfs://xxx/2016/02/26,
> hdfs://xxx/2016/02/27, hdfs://xxx/2016/02/28, hdfs://xxx/2016/02/29,
> hdfs://xxx/2016/03/01, hdfs://xxx/2016/03/02, hdfs://xxx/2016/03/03,
> hdfs://xxx/2016/03/04, hdfs://xxx/2016/03/05, hdfs://xxx/2016/03/06,
> hdfs://xxx/2016/03/07, hdfs://xxx/2016/03/08, hdfs://xxx/2016/03/09,
> hdfs://xxx/2016/03/10, hdfs://xxx/2016/03/11, hdfs://xxx/2016/03/12,
> hdfs://xxx/2016/03/13, hdfs://xxx/2016/03/14, hdfs://xxx/2016/03/15,
> hdfs://xxx/2016/03/16, hdfs://xxx/2016/03/17
>   +- Sort [visid_high#460L ASC,visid_low#461L ASC], false, 0
>  +- TungstenExchange
> hashpartitioning(visid_high#460L,visid_low#461L,200), None
> +- Project
> 

Re: Spark with Druid

2016-03-23 Thread Michael Malak
Will Spark 2.0 Structured Streaming obviate some of the Druid/Spark use cases?

  From: Raymond Honderdors 
 To: "yuzhih...@gmail.com"  
Cc: "user@spark.apache.org" 
 Sent: Wednesday, March 23, 2016 8:43 AM
 Subject: Re: Spark with Druid
   
I saw these but i fail to understand how to direct the code to use rhe index 
json
Sent from Outlook Mobile



On Wed, Mar 23, 2016 at 7:19 AM -0700, "Ted Yu"  wrote:

Please see:
https://www.linkedin.com/pulse/combining-druid-spark-interactive-flexible-analytics-scale-butani

which references https://github.com/SparklineData/spark-druid-olap
On Wed, Mar 23, 2016 at 5:59 AM, Raymond Honderdors 
 wrote:

Does anyone have a good overview on how to integrate Spark and Druid? I am now 
struggling with the creation of a druid data source in spark.  Raymond 
HonderdorsTeam Lead Analytics BIBusiness Intelligence 
developerraymond.honderd...@sizmek.comt +972.7325.3569Herzliya 



  

Re: Converting array of string type to datetime

2016-03-23 Thread Mich Talebzadeh
Hi Jacek,

I was wondering if I could use this approach itself.

It is basically a CSV read in as follows:

val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
val df =
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
"true").option("header", "true").load("/data/stg/table2")
val current_date = sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(),
'dd/MM/') ").collect.apply(0).getString(0)
def ChangeDate(word : String) : String = {
   return
word.substring(6,10)+"-"+word.substring(3,5)+"-"+word.substring(0,2)
}
//
// Register it as a custom UDF
//
sqlContext.udf.register("ChangeDate", ChangeDate(_:String))

The DF has the following schema

scala> df.printSchema
root
 |-- Invoice Number: string (nullable = true)
 |-- Payment date: string (nullable = true)
 |-- Net: string (nullable = true)
 |-- VAT: string (nullable = true)
 |-- Total: string (nullable = true)


Now logically I want to filter out all "Payment date" values more than 6
months old,

I.e.

current_date - "Payment date" > 6 months

For example use months_difference (current, "Payment date") > 6

However, I need to convert "Payment date" from format "dd/MM/" to "
-MM-dd" first hence the UDF

The question is will this approach work?

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 23 March 2016 at 21:26, Jacek Laskowski  wrote:

> Hi,
>
> Why don't you use Datasets? You'd cut the number of getStrings and
> it'd read nicer to your eyes. Also, doing such transformations would
> *likely* be easier.
>
> p.s. Please gist your example to fix it.
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Wed, Mar 23, 2016 at 10:20 PM, Mich Talebzadeh
>  wrote:
> >
> > How can I convert the following from String to datetime
> >
> > scala> df.map(x => (x.getString(1), ChangeDate(x.getString(1.take(1)
> > res60: Array[(String, String)] = Array((10/02/2014,2014-02-10))
> >
> > Please note that the custom UDF ChangeDate() has revered the string value
> > from "dd/MM/" to "-MM-dd"
> >
> > Now I need to convert ChangeDate(x.getString(1)) from String to datetime?
> >
> > scala> df.map(x => (x.getString(1),
> > ChangeDate(x.getString(1)).toDate)).take(1)
> > :25: error: value toDate is not a member of String
> >   df.map(x => (x.getString(1),
> > ChangeDate(x.getString(1)).toDate)).take(1)
> >
> > Or
> >
> > scala> df.map(x => (x.getString(1),
> > ChangeDate(x.getString(1)).cast("date"))).take(1)
> > :25: error: value cast is not a member of String
> >   df.map(x => (x.getString(1),
> > ChangeDate(x.getString(1)).cast("date"))).take(1)
> >
> >
> > Thanks,
> >
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn
> >
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Converting array of string type to datetime

2016-03-23 Thread Jacek Laskowski
Hi,

Why don't you use Datasets? You'd cut the number of getStrings and
it'd read nicer to your eyes. Also, doing such transformations would
*likely* be easier.

p.s. Please gist your example to fix it.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Wed, Mar 23, 2016 at 10:20 PM, Mich Talebzadeh
 wrote:
>
> How can I convert the following from String to datetime
>
> scala> df.map(x => (x.getString(1), ChangeDate(x.getString(1.take(1)
> res60: Array[(String, String)] = Array((10/02/2014,2014-02-10))
>
> Please note that the custom UDF ChangeDate() has revered the string value
> from "dd/MM/" to "-MM-dd"
>
> Now I need to convert ChangeDate(x.getString(1)) from String to datetime?
>
> scala> df.map(x => (x.getString(1),
> ChangeDate(x.getString(1)).toDate)).take(1)
> :25: error: value toDate is not a member of String
>   df.map(x => (x.getString(1),
> ChangeDate(x.getString(1)).toDate)).take(1)
>
> Or
>
> scala> df.map(x => (x.getString(1),
> ChangeDate(x.getString(1)).cast("date"))).take(1)
> :25: error: value cast is not a member of String
>   df.map(x => (x.getString(1),
> ChangeDate(x.getString(1)).cast("date"))).take(1)
>
>
> Thanks,
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Converting array of string type to datetime

2016-03-23 Thread Mich Talebzadeh
How can I convert the following from String to datetime

scala> df.map(x => (x.getString(1), ChangeDate(x.getString(1.take(1)
res60: Array[(String, String)] = Array((10/02/2014,2014-02-10))

Please note that the custom UDF ChangeDate() has revered the string value
from "dd/MM/" to "-MM-dd"

Now I need to convert ChangeDate(x.getString(1)) from String to datetime?

scala> df.map(x => (x.getString(1),
ChangeDate(x.getString(1)).toDate)).take(1)
:25: error: value toDate is not a member of String
  df.map(x => (x.getString(1),
ChangeDate(x.getString(1)).toDate)).take(1)

Or

scala> df.map(x => (x.getString(1),
ChangeDate(x.getString(1)).cast("date"))).take(1)
:25: error: value cast is not a member of String
  df.map(x => (x.getString(1),
ChangeDate(x.getString(1)).cast("date"))).take(1)


Thanks,


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


Is streaming bisecting k-means possible?

2016-03-23 Thread dustind
Is it technically possible to use bisecting k-means on streaming data and
allow for model decay+updating like streaming k-means? If so, could you
provide some guidance on how to implement this?

If not, what is the best approach to using bisecting k-means on streaming
data?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-streaming-bisecting-k-means-possible-tp26581.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Spark thrift issue 8659 (changing subject)

2016-03-23 Thread ayan guha
>
> Hi All
>
> I found this issue listed in Spark Jira -
> https://issues.apache.org/jira/browse/SPARK-8659
>
> I would love to know if there are any roadmap for this? Maybe someone from
> dev group can confirm?
>
> Thank you in advance
>
> Best
> Ayan
>
>


Re: Spark Streaming UI duration numbers mismatch

2016-03-23 Thread Jean-Baptiste Onofré

Hi Jatin,

I will reproduce tomorrow and take a look.

Did you already create a Jira about that (I don't think so) ? If I 
reproduce the problem (and it's really a problem), then I will create 
one for you.


Thanks,
Regards
JB

On 03/23/2016 08:20 PM, Jatin Kumar wrote:

Hello,

Can someone please provide some help on the below issue?

--
Thanks
Jatin

On Tue, Mar 22, 2016 at 3:30 PM, Jatin Kumar > wrote:

Hello all,

I am running spark streaming application and the duration numbers on
batch page and job page don't match. Please find attached
screenshots of the same.

IMO processing time on batch page at top should be sum of durations
of all jobs and similarly the duration of a job reported on batch
page should be sum of durations of stages of that job.

--
Thanks
Jatin




--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming UI duration numbers mismatch

2016-03-23 Thread Jatin Kumar
Hello,

Can someone please provide some help on the below issue?

--
Thanks
Jatin

On Tue, Mar 22, 2016 at 3:30 PM, Jatin Kumar 
wrote:

> Hello all,
>
> I am running spark streaming application and the duration numbers on batch
> page and job page don't match. Please find attached screenshots of the same.
>
> IMO processing time on batch page at top should be sum of durations of all
> jobs and similarly the duration of a job reported on batch page should be
> sum of durations of stages of that job.
>
> --
> Thanks
> Jatin
>


Re: ClassNotFoundException in RDD.map

2016-03-23 Thread Dirceu Semighini Filho
Thanks Jacob,
I've looked into the source code here and found that I miss this property
there:
spark.repl.class.uri

Putting it solved the problem

Cheers

2016-03-17 18:14 GMT-03:00 Jakob Odersky :

> The error is very strange indeed, however without code that reproduces
> it, we can't really provide much help beyond speculation.
>
> One thing that stood out to me immediately is that you say you have an
> RDD of Any where every Any should be a BigDecimal, so why not specify
> that type information?
> When using Any, a whole class of errors, that normally the typechecker
> could catch, can slip through.
>
> On Thu, Mar 17, 2016 at 10:25 AM, Dirceu Semighini Filho
>  wrote:
> > Hi Ted, thanks for answering.
> > The map is just that, whenever I try inside the map it throws this
> > ClassNotFoundException, even if I do map(f => f) it throws the exception.
> > What is bothering me is that when I do a take or a first it returns the
> > result, which make me conclude that the previous code isn't wrong.
> >
> > Kind Regards,
> > Dirceu
> >
> >
> > 2016-03-17 12:50 GMT-03:00 Ted Yu :
> >>
> >> bq. $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1
> >>
> >> Do you mind showing more of your code involving the map() ?
> >>
> >> On Thu, Mar 17, 2016 at 8:32 AM, Dirceu Semighini Filho
> >>  wrote:
> >>>
> >>> Hello,
> >>> I found a strange behavior after executing a prediction with MLIB.
> >>> My code return an RDD[(Any,Double)] where Any is the id of my dataset,
> >>> which is BigDecimal, and Double is the prediction for that line.
> >>> When I run
> >>> myRdd.take(10) it returns ok
> >>> res16: Array[_ >: (Double, Double) <: (Any, Double)] =
> >>> Array((1921821857196754403.00,0.1690292052496703),
> >>> (454575632374427.00,0.16902820241892452),
> >>> (989198096568001939.00,0.16903432789699502),
> >>> (14284129652106187990.00,0.16903517653451386),
> >>> (17980228074225252497.00,0.16903151028332508),
> >>> (3861345958263692781.00,0.16903056986183976),
> >>> (17558198701997383205.00,0.1690295450319745),
> >>> (10651576092054552310.00,0.1690286445174418),
> >>> (4534494349035056215.00,0.16903303401862327),
> >>> (5551671513234217935.00,0.16902303368995966))
> >>> But when I try to run some map on it:
> >>> myRdd.map(_._1).take(10)
> >>> It throws a ClassCastException:
> >>> org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 0
> >>> in stage 72.0 failed 4 times, most recent failure: Lost task 0.3 in
> stage
> >>> 72.0 (TID 1774, 172.31.23.208): java.lang.ClassNotFoundException:
> >>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1
> >>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> >>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> >>> at java.security.AccessController.doPrivileged(Native Method)
> >>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> >>> at java.lang.Class.forName0(Native Method)
> >>> at java.lang.Class.forName(Class.java:278)
> >>> at
> >>>
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
> >>> at
> >>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
> >>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
> >>> at
> >>>
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
> >>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> >>> at
> >>>
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
> >>> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
> >>> at
> >>>
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> >>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> >>> at
> >>>
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
> >>> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
> >>> at
> >>>
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> >>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> >>> at
> >>>
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
> >>> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
> >>> at
> >>>
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> >>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> >>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> >>> at
> >>>
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
> >>> at
> >>>
> 

RE: Spark 1.5.2, why the broadcast join shuffle so much data in the last step

2016-03-23 Thread Yong Zhang
Here is the output:
== Parsed Logical Plan ==Project [400+ columns]+- Project [400+ columns]   +- 
Project [400+ columns]  +- Project [400+ columns] +- Join Inner, 
Somevisid_high#460L = visid_high#948L) && (visid_low#461L = 
visid_low#949L)) && (date_time#25L > date_time#513L))):- 
Relation[400+ columns] ParquetRelation+- BroadcastHint  
 +- Project [soid_e1#30 AS 
account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127] 
 +- Filter (instr(event_list#105,202) > 0) +- 
Relation[400+ columns] ParquetRelation
== Analyzed Logical Plan ==400+ columnsProject [400+ columns]+- Project [400+ 
columns]   +- Project [400+ columns]  +- Project [400+ columns] +- 
Join Inner, Somevisid_high#460L = visid_high#948L) && (visid_low#461L = 
visid_low#949L)) && (date_time#25L > date_time#513L))):- 
Relation[400+ columns] ParquetRelation+- BroadcastHint  
 +- Project [soid_e1#30 AS 
account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127] 
 +- Filter (instr(event_list#105,202) > 0) +- 
Relation[400+ columns] ParquetRelation
== Optimized Logical Plan ==Project [400+ columns]+- Join Inner, 
Somevisid_high#460L = visid_high#948L) && (visid_low#461L = 
visid_low#949L)) && (date_time#25L > date_time#513L)))   :- Relation[400+ 
columns] ParquetRelation   +- Project 
[date_time#25L,visid_low#461L,visid_high#460L,account_id#976]  +- 
BroadcastHint +- Project [soid_e1#30 AS 
account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
+- Filter (instr(event_list#105,202) > 0)   +- Relation[400+ 
columns] ParquetRelation
== Physical Plan ==Project [400+ columns]+- Filter (date_time#25L > 
date_time#513L)   +- SortMergeJoin [visid_high#948L,visid_low#949L], 
[visid_high#460L,visid_low#461L]  :- Sort [visid_high#948L 
ASC,visid_low#949L ASC], false, 0  :  +- TungstenExchange 
hashpartitioning(visid_high#948L,visid_low#949L,200), None  : +- Scan 
ParquetRelation[400+ columns] InputPaths: hdfs://xxx/2015/12/17, 
hdfs://xxx/2015/12/18, hdfs://xxx/2015/12/19, hdfs://xxx/2015/12/20, 
hdfs://xxx/2015/12/21, hdfs://xxx/2015/12/22, hdfs://xxx/2015/12/23, 
hdfs://xxx/2015/12/24, hdfs://xxx/2015/12/25, hdfs://xxx/2015/12/26, 
hdfs://xxx/2015/12/27, hdfs://xxx/2015/12/28, hdfs://xxx/2015/12/29, 
hdfs://xxx/2015/12/30, hdfs://xxx/2015/12/31, hdfs://xxx/2016/01/01, 
hdfs://xxx/2016/01/02, hdfs://xxx/2016/01/03, hdfs://xxx/2016/01/04, 
hdfs://xxx/2016/01/05, hdfs://xxx/2016/01/06, hdfs://xxx/2016/01/07, 
hdfs://xxx/2016/01/08, hdfs://xxx/2016/01/09, hdfs://xxx/2016/01/10, 
hdfs://xxx/2016/01/11, hdfs://xxx/2016/01/12, hdfs://xxx/2016/01/13, 
hdfs://xxx/2016/01/14, hdfs://xxx/2016/01/15, hdfs://xxx/2016/01/16, 
hdfs://xxx/2016/01/17, hdfs://xxx/2016/01/18, hdfs://xxx/2016/01/19, 
hdfs://xxx/2016/01/20, hdfs://xxx/2016/01/21, hdfs://xxx/2016/01/22, 
hdfs://xxx/2016/01/23, hdfs://xxx/2016/01/24, hdfs://xxx/2016/01/25, 
hdfs://xxx/2016/01/26, hdfs://xxx/2016/01/27, hdfs://xxx/2016/01/28, 
hdfs://xxx/2016/01/29, hdfs://xxx/2016/01/30, hdfs://xxx/2016/01/31, 
hdfs://xxx/2016/02/01, hdfs://xxx/2016/02/02, hdfs://xxx/2016/02/03, 
hdfs://xxx/2016/02/04, hdfs://xxx/2016/02/05, hdfs://xxx/2016/02/06, 
hdfs://xxx/2016/02/07, hdfs://xxx/2016/02/08, hdfs://xxx/2016/02/09, 
hdfs://xxx/2016/02/10, hdfs://xxx/2016/02/11, hdfs://xxx/2016/02/12, 
hdfs://xxx/2016/02/13, hdfs://xxx/2016/02/14, hdfs://xxx/2016/02/15, 
hdfs://xxx/2016/02/16, hdfs://xxx/2016/02/17, hdfs://xxx/2016/02/18, 
hdfs://xxx/2016/02/19, hdfs://xxx/2016/02/20, hdfs://xxx/2016/02/21, 
hdfs://xxx/2016/02/22, hdfs://xxx/2016/02/23, hdfs://xxx/2016/02/24, 
hdfs://xxx/2016/02/25, hdfs://xxx/2016/02/26, hdfs://xxx/2016/02/27, 
hdfs://xxx/2016/02/28, hdfs://xxx/2016/02/29, hdfs://xxx/2016/03/01, 
hdfs://xxx/2016/03/02, hdfs://xxx/2016/03/03, hdfs://xxx/2016/03/04, 
hdfs://xxx/2016/03/05, hdfs://xxx/2016/03/06, hdfs://xxx/2016/03/07, 
hdfs://xxx/2016/03/08, hdfs://xxx/2016/03/09, hdfs://xxx/2016/03/10, 
hdfs://xxx/2016/03/11, hdfs://xxx/2016/03/12, hdfs://xxx/2016/03/13, 
hdfs://xxx/2016/03/14, hdfs://xxx/2016/03/15, hdfs://xxx/2016/03/16, 
hdfs://xxx/2016/03/17  +- Sort [visid_high#460L ASC,visid_low#461L ASC], 
false, 0 +- TungstenExchange 
hashpartitioning(visid_high#460L,visid_low#461L,200), None+- 
Project [date_time#25L,visid_low#461L,visid_high#460L,account_id#976]   
+- Project [soid_e1#30 AS 
account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127] 
 +- Filter (instr(event_list#105,202) > 0) +- Scan 
ParquetRelation[visid_low#461L,ip#127,soid_e1#30,event_list#105,visid_high#460L,date_time#25L]
 InputPaths: hdfs://xxx/2016/03/17
This dataset has more than 480 columns in parquet file, so I replaced them with 
"400+ columns", without blow out the email, but I don't 

Re: Serialization issue with Spark

2016-03-23 Thread Dirceu Semighini Filho
Hello Hafsa,
TaskNotSerialized exception usually means that you are trying to use an
object, defined in the driver, in code that runs on workers.
Can you post the code that ir generating this error here, so we can better
advise you?

Cheers.

2016-03-23 14:14 GMT-03:00 Hafsa Asif :

> Can anyone please help me in this issue?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-with-Spark-tp26565p26579.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark 1.5.2, why the broadcast join shuffle so much data in the last step

2016-03-23 Thread Davies Liu
The broadcast hint does not work as expected in this case, could you
also how the logical plan by 'explain(true)'?

On Wed, Mar 23, 2016 at 8:39 AM, Yong Zhang  wrote:
>
> So I am testing this code to understand "broadcast" feature of DF on Spark 
> 1.6.1.
> This time I am not disable "tungsten". Everything is default value, except 
> setting memory and cores of my job on 1.6.1.
>
> I am testing the join2 case
>
> val join2 = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high") === 
> historyRaw("visid_high") &&  trialRaw("visid_low") === 
> historyRaw("visid_low") && trialRaw("date_time") > historyRaw("date_time"))
>
> and here is the DAG visualization in the runtime of my testing job:
>
>
>
>
>
> So now, I don't understand how the "broadcast" works on DateFrame in Spark. I 
> originally thought it will be the same as "mapjoin" in the hive, but can 
> someone explain the DAG above me?
>
> I have one day data about 1.5G compressed parquet file, filter by 
> "instr(loadRaw("event_list"), "202") > 0", which will only output about 1494 
> rows (very small), and it is the "trailRaw" DF in my example.
> Stage 3 has a filter, which I thought is for the trailRaw data, but the stage 
> statics doesn't match with the data. I don't know why the input is only 78M, 
> and shuffle write is about 97.6KB
>
>
>
>
> The historyRaw will be about 90 days history data, which should be about 
> 100G, so it looks like stage 4 is scanning it
>
>
>
>
> Now, my original thought is that small data will be broadcasted to all the 
> nodes, and most of history data will be filtered out by the join keys, at 
> least that will be the "mapjoin" in Hive will do, but from the DAG above, I 
> didn't see it working this way.
> It is more like that Spark use the SortMerge join to shuffle both data across 
> network, and filter on the "reducers" side by the join keys, to get the final 
> output. But that is not the "broadcast" join supposed to do, correct?
> In the last stage, it will be very slow, until it reach and process all the 
> history data,  shown below as "shuffle read" reaching 720G, to finish.
>
>
>
>
> One thing I notice that if tungsten is enable, the shuffle write volume on 
> stage 4 is larger (720G) than when tungsten is disable (506G) in my 
> originally run, for the exactly same input. It is an interesting point, does 
> anyone have some idea about this?
>
>
> Overall, for my test case, "broadcast" join is the exactly most optimized way 
> I should use; but somehow, I cannot make it do the same way as "mapjoin" of 
> Hive, even in Spark 1.6.1.
>
> As I said, this is a just test case. We have some business cases making sense 
> to use "broadcast" join, but until I understand exactly how to make it work 
> as I expect in Spark, I don't know what to do.
>
> Yong
>
> 
> From: java8...@hotmail.com
> To: user@spark.apache.org
> Subject: RE: Spark 1.5.2, why the broadcast join shuffle so much data in the 
> last step
> Date: Tue, 22 Mar 2016 13:08:31 -0400
>
>
> Please help me understand how the "broadcast" will work on DF in Spark 1.5.2.
>
> Below are the 2 joins I tested and the physical plan I dumped:
>
> val join1 = historyRaw.join(trialRaw, trialRaw("visid_high") === 
> historyRaw("visid_high") &&  trialRaw("visid_low") === 
> historyRaw("visid_low") && trialRaw("date_time") > historyRaw("date_time"))
> val join2 = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high") === 
> historyRaw("visid_high") &&  trialRaw("visid_low") === 
> historyRaw("visid_low") && trialRaw("date_time") > historyRaw("date_time"))
>
> join1.explain(true)
> == Physical Plan ==
> Filter (date_time#25L > date_time#513L)
>  SortMergeJoin [visid_high#948L,visid_low#949L], 
> [visid_high#460L,visid_low#461L]
>   ExternalSort [visid_high#948L ASC,visid_low#949L ASC], false
>Exchange hashpartitioning(visid_high#948L,visid_low#949L)
> Scan ParquetRelation[hdfs://]
>   ExternalSort [visid_high#460L ASC,visid_low#461L ASC], false
>Exchange hashpartitioning(visid_high#460L,visid_low#461L)
> Project [soid_e1#30,visid_high#460L,visid_low#461L,date_time#25L]
>  Filter (instr(event_list#105,202) > 0)
>   Scan 
> ParquetRelation[hdfs://xxx/2016/03/17][visid_high#460L,visid_low#461L,date_time#25L,event_list#105,soid_e1#30]
>
> join2.explain(true)
> == Physical Plan ==
> Filter (date_time#25L > date_time#513L)
>  BroadcastHashJoin [visid_high#948L,visid_low#949L], 
> [visid_high#460L,visid_low#461L], BuildRight
> Scan ParquetRelation[hdfs://]
> Project [soid_e1#30,visid_high#460L,visid_low#461L,date_time#25L]
>Filter (instr(event_list#105,202) > 0)
> Scan 
> ParquetRelation[hdfs://xxx/2016/03/17][visid_high#460L,visid_low#461L,date_time#25L,event_list#105,soid_e1#30]
>
> Obvious, the explain plans are different, but the performance and the job 
> execution steps are almost exactly same, as shown in the original picture in 
> the email below.
> Keep in 

Re: Serialization issue with Spark

2016-03-23 Thread Hafsa Asif
Can anyone please help me in this issue?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-with-Spark-tp26565p26579.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



running fpgrowth -- run method

2016-03-23 Thread Bauer, Robert
I get warnings:

SparkContext: Requesting executors is only supported in coarse-grained mode
ExecutorAllocationManager: Unable to reach the cluster manager to request 2 
total executors

I get info messages:
INFO ContextCleaner: Cleaned accumulator 4

Then my "job" just seems to hang - I don't know how to see if it is running. 
When I look using
Ganglia. I see cluster cpu utilization go up to about 50% from 10% when the job 
started.

I expect this to be mostly IO bound, but I don't have any way to estimate how 
long the job should run.

Also, since I am running the spark-shell, I don't know how to run it in nohup.  
I am fairly certain that the putty windo to the cluster times out and kills my 
job.

Any help appreciated.





This message (including any attachments) contains confidential and/or 
privileged information. It is intended for a specific individual and purpose 
and is protected by law. If you are not the intended recipient, please notify 
the sender immediately and delete this message. Any disclosure, copying, or 
distribution of this message, or the taking of any action based on it, is 
strictly prohibited.


Re: Spark and DB connection pool

2016-03-23 Thread Takeshi Yamamuro
Hi,

Currently, Spark itself doesn't pool JDBC connections.
If you face performance difficulty, all you can do is to cache loaded data
from RDB and Cassandra in Spark.

thanks,
maropu

On Wed, Mar 23, 2016 at 11:56 PM, rjtokenring 
wrote:

> Hi all, is there a way in spark to setup a connection pool?
> As example: I'm going to use a relational DB and Cassandra to join data
> between them.
> How can I control and cache DB connections?
>
> Thanks all!
>
> Mark
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-DB-connection-pool-tp26577.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
---
Takeshi Yamamuro


Streaming bisecting k-means possible?

2016-03-23 Thread Dustin Decker
Is it technically possible to use bisecting k-means on streaming data and
allow for model decay/updating like streaming k-means?

If not, what is the best approach to using bisecting k-means on streaming
data?


Spark and DB connection pool

2016-03-23 Thread rjtokenring
Hi all, is there a way in spark to setup a connection pool?
As example: I'm going to use a relational DB and Cassandra to join data
between them.
How can I control and cache DB connections?

Thanks all!

Mark





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-DB-connection-pool-tp26577.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark with Druid

2016-03-23 Thread Raymond Honderdors
I saw these but i fail to understand how to direct the code to use rhe index 
json

Sent from Outlook Mobile



On Wed, Mar 23, 2016 at 7:19 AM -0700, "Ted Yu" 
> wrote:

Please see:
https://www.linkedin.com/pulse/combining-druid-spark-interactive-flexible-analytics-scale-butani

which references https://github.com/SparklineData/spark-druid-olap

On Wed, Mar 23, 2016 at 5:59 AM, Raymond Honderdors 
> wrote:
Does anyone have a good overview on how to integrate Spark and Druid?

I am now struggling with the creation of a druid data source in spark.


Raymond Honderdors
Team Lead Analytics BI
Business Intelligence Developer
raymond.honderd...@sizmek.com
T +972.7325.3569
Herzliya


[Read More]

[http://www.sizmek.com/Sizmek.png]



Re: Spark with Druid

2016-03-23 Thread Ted Yu
Please see:
https://www.linkedin.com/pulse/combining-druid-spark-interactive-flexible-analytics-scale-butani

which references https://github.com/SparklineData/spark-druid-olap

On Wed, Mar 23, 2016 at 5:59 AM, Raymond Honderdors <
raymond.honderd...@sizmek.com> wrote:

> Does anyone have a good overview on how to integrate Spark and Druid?
>
>
>
> I am now struggling with the creation of a druid data source in spark.
>
>
>
>
>
> *Raymond Honderdors *
>
> *Team Lead Analytics BI*
>
> *Business Intelligence Developer *
>
> *raymond.honderd...@sizmek.com  *
>
> *T +972.7325.3569*
>
> *Herzliya*
>
>
>
> [image: Read More] 
>
> 
>


Re: Spark Metrics Framework?

2016-03-23 Thread Mike Sukmanowsky
Thanks Ted and Silvio. I think I'll need a bit more hand holding here,
sorry. The way we use ES Hadoop is in pyspark via
org.elasticsearch.hadoop.mr.EsOutputFormat

in a saveAsNewAPIHadoopFile call. Given the Hadoop interop, I wouldn't
assume that the EsOutputFormat class

could be modified to define a new Source and register it via
MetricsSystem.createMetricsSystem. This feels like a good feature request
for Spark actually: "Support Hadoop Counters in Input/OutputFormats as
Spark metrics" but I wanted some feedback first to see if that makes sense.

That said, some of the custom RDD classes

could
probably be modified to register a new Source when they perform
reading/writing from/to Elasticsearch.

On Tue, 22 Mar 2016 at 15:17 Silvio Fiorito 
wrote:

> Hi Mike,
>
> It’s been a while since I worked on a custom Source but I think all you
> need to do is make your Source in the org.apache.spark package.
>
> Thanks,
> Silvio
>
> From: Mike Sukmanowsky 
> Date: Tuesday, March 22, 2016 at 3:13 PM
> To: Silvio Fiorito , "user@spark.apache.org"
> 
> Subject: Re: Spark Metrics Framework?
>
> The Source class is private
> 
> to the spark package and any new Sources added to the metrics registry must
> be of type Source
> .
> So unless I'm mistaken, we can't define a custom source. I linked to 1.4.1
> code, but the same is true in 1.6.1.
>
> On Mon, 21 Mar 2016 at 12:05 Silvio Fiorito 
> wrote:
>
>> You could use the metric sources and sinks described here:
>> http://spark.apache.org/docs/latest/monitoring.html#metrics
>>
>> If you want to push the metrics to another system you can define a custom
>> sink. You can also extend the metrics by defining a custom source.
>>
>> From: Mike Sukmanowsky 
>> Date: Monday, March 21, 2016 at 11:54 AM
>> To: "user@spark.apache.org" 
>> Subject: Spark Metrics Framework?
>>
>> We make extensive use of the elasticsearch-hadoop library for
>> Hadoop/Spark. In trying to troubleshoot our Spark applications, it'd be
>> very handy to have access to some of the many metrics
>> 
>> that the library makes available when running in map reduce mode. The 
>> library's
>> author noted
>> 
>> that Spark doesn't offer any kind of a similar metrics API where by these
>> metrics could be reported or aggregated on.
>>
>> Are there any plans to bring a metrics framework similar to Hadoop's
>> Counter system to Spark or is there an alternative means for us to grab
>> metrics exposed when using Hadoop APIs to load/save RDDs?
>>
>> Thanks,
>> Mike
>>
>


Spark with Druid

2016-03-23 Thread Raymond Honderdors
Does anyone have a good overview on how to integrate Spark and Druid?

I am now struggling with the creation of a druid data source in spark.


Raymond Honderdors
Team Lead Analytics BI
Business Intelligence Developer
raymond.honderd...@sizmek.com
T +972.7325.3569
Herzliya


[Read More]

[http://www.sizmek.com/Sizmek.png]


RE: Steps to Run Spark Scala job from Oozie on EC2 Hadoop clsuter

2016-03-23 Thread Joshua Dickerson




Hi everyone,


We have been successfully deploying Spark jobs through Oozie using the spark-action for over a year, however, we are deploying to our on-premises Hadoop infrastructure, not EC2. 

Our process is to build a fat-jar with our job and dependencies, upload that jar to HDFS (accessible by Oozie and, in our case, YARN), and then reference that HDFS location in our spark-action in the  block. 


Here's our spark-action: 




    ${jobTracker}
    ${nameNode}
    yarn-cluster
    SparkJob
    com.dealer.rtb.spark.SparkJob
    *** YOUR HFDS LOCATION HERE ***
    --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.initialExecutors=3 --conf spark.executor.memory=8g --conf spark.shuffle.service.enabled=true --conf spark.eventLog.overwrite=true
 --conf spark.dynamicAllocation.maxExecutors=6 --conf spark.dynamicAllocation.executorIdleTimeout=900 --conf spark.metrics.conf=/etc/spark.metrics.properties




I should also note that we have built a custom maven plugin (not open-source, sorry) for publishing our job, coordinator and workflow, and its jar to the cluster. In our case, it publishes the jar to the following location: 
hdfs://nameservice1/user//workflowslib/


My understanding is that the contents of the lib directory are, by default, added to the classpath of the job. 


Hope that helps. 






Joshua Dickerson
Senior Developer
Advertising - Real-Time Bidding

p: 888.894.8989



A Cox Automotive Brand








From: Chandeep Singh [c...@chandeep.com]
Sent: Monday, March 07, 2016 11:02 AM
To: Neelesh Salian
Cc: Benjamin Kim; Deepak Sharma; Divya Gehlot; user @spark; u...@hadoop.apache.org
Subject: Re: Steps to Run Spark Scala job from Oozie on EC2 Hadoop clsuter



As a work around you could put your spark-submit statement in a shell script and then use Oozie’s SSH action to execute that script.



On Mar 7, 2016, at 3:58 PM, Neelesh Salian  wrote:


Hi Divya,

This link should have the details that you need to begin using the Spark Action on Oozie:
https://oozie.apache.org/docs/4.2.0/DG_SparkActionExtension.html



Thanks.


On Mon, Mar 7, 2016 at 7:52 AM, Benjamin Kim 
 wrote:

To comment…


At my company, we have not gotten it to work in any other mode than local. If we try any of the yarn modes, it fails with a “file does not exist” error when trying to locate the executable jar. I mentioned this to the Hue users group, which we
 used for this, and they replied that the Spark Action is very basic implementation and that they will be writing their own for production use.


That’s all I know...



On Mar 7, 2016, at 1:18 AM, Deepak Sharma  wrote:



There is Spark action defined for oozie workflows.
Though I am not sure if it supports only Java SPARK jobs or Scala jobs as well.
https://oozie.apache.org/docs/4.2.0/DG_SparkActionExtension.html
Thanks
Deepak




On Mon, Mar 7, 2016 at 2:44 PM, Divya Gehlot 
 wrote:

Hi,

Could somebody help me by providing the steps /redirect me  to blog/documentation on how to run Spark job written in scala through Oozie.


Would really appreciate the help.






Thanks,
Divya 









-- 

Thanks
Deepak
www.bigdatabig.com
www.keosha.net














-- 




Neelesh Srinivas Salian
Customer Operations Engineer

























-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



DataFrames UDAF with array and struct

2016-03-23 Thread Matthias Niehoff
Hello Everybody,

I want to write an UDAF for DataFrames where the Buffer Schema is a

ArrayType(StructType(List(StructField(String),
StructField(String),StructField(String

When I want to access the buffer in the update() or merge() method the
ArrayType gets returned as a List. But what is the Type of the List? Or in
other words: What is mapping of StructType with StructFields into Scala
collection/data types?

Thanks!

-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
172.1702676
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet


Re: Plot DataFrame with matplotlib

2016-03-23 Thread Teng Qiu
e... then this sounds like a feature requirement for matplotlib, you
need to make matplotlib's APIs support RDD or spark DataFrame object,
i checked the API of mplot3d
(http://matplotlib.org/mpl_toolkits/mplot3d/tutorial.html#mpl_toolkits.mplot3d.Axes3D.scatter),
it only supports "array-like" input data.

so yes, to use matplotlib, you need to take the elements out of RDD,
and send them to plot API as list object.

2016-03-23 12:20 GMT+01:00 Yavuz Nuzumlalı :
> Thanks for help, but the example that you referenced gets the values from
> RDD as list and plots that list.
>
> What I am specifically asking was that is there a convenient way to plot a
> DataFrame object directly?(like pandas DataFrame objects)
>
>
> On Wed, Mar 23, 2016 at 11:47 AM Teng Qiu  wrote:
>>
>> not sure about 3d plot, but there is a nice example:
>>
>> https://github.com/zalando/spark-appliance/blob/master/examples/notebooks/PySpark_sklearn_matplotlib.ipynb
>>
>> for plotting rdd or dataframe using matplotlib.
>>
>> Am Mittwoch, 23. März 2016 schrieb Yavuz Nuzumlalı :
>> > Hi all,
>> > I'm trying to plot the result of a simple PCA operation, but couldn't
>> > find a clear documentation about plotting data frames.
>> > Here is the output of my data frame:
>> > ++
>> > |pca_features|
>> > ++
>> > |[-255.4681508918886,2.9340031372956155,-0.5357914079267039] |
>> > |[-477.03566189308367,-6.170290817861212,-5.280827588464785] |
>> > |[-163.13388125540507,-4.571443623272966,-1.2349427928939671]|
>> > |[-53.721252166903255,0.6162589419996329,-0.39569546286098245]   |
>> > [-27.97717473880869,0.30883567826481106,-0.11159555340377557]   |
>> > |[-118.27508063853554,1.3484584740407748,-0.8088790388907207]|
>> > Values of `pca_features` column is DenseVector s created using
>> > VectorAssembler.
>> > How can I draw a simple 3d scatter plot from this data frame?
>> > Thanks

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[Spark -1.5.2]Dynamically creation of caseWhen expression

2016-03-23 Thread Divya Gehlot
Hi,
I have a map collection .
I am trying to build when condition based on the key values .
Like df.withColumn("ID", when( condition with map keys ,values of map )

How can I do that dynamically.
Currently I am iterating over keysIterator and get the values
Kal keys = myMap.keysIterator.toArray
Like below
df.withColumn("ID",when(condition on keys(0),lit(myMap get
keys(0)).when(condition on keys(1),lit(myMap get keys(1)).
when(condition on keys(2),lit(myMap get keys(3)).otherwise("value not
found"))

How can I build the above expression dynamically
Like for (key <-keys){
when(condition on key ,lit(myMap get key)
}
Would really appreciate the help.

Thanks,
Divya


Using Spark to retrieve a HDFS file protected by Kerberos

2016-03-23 Thread Nkechi Achara
I am having issues setting up my spark environment to read from a
kerberized HDFS file location.

At the moment I have tried to do the following:

def ugiDoAs[T](ugi:   Option[UserGroupInformation])(code: => T) = ugi match
{
case None => code
case Some(u) => u.doAs(new PrivilegedExceptionAction[T] {
  override def run(): T = code }) }

val sparkConf =
defaultSparkConf.setAppName("file-test").setMaster("yarn-client")

val sc = ugiDoAs(ugi) {new SparkContext(conf)}

val file = sc.textFile("path")

It fails at the point of creating the Spark Context, with the following
error:

Exception in thread "main"
org.apache.hadoop.security.AccessControlException: SIMPLE authentication is
not enabled. Available:[TOKEN, KERBEROS]

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)

at org.apache.hadoop.yarn.ipc.RPCUtil.instantiateException(RPCUtil.java:53)
at
org.apache.hadoop.yarn.ipc.RPCUtil.unwrapAndThrowException(RPCUtil.java:104)

at
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getClusterMetrics(ApplicationClientProtocolPBClientImpl.java:155)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:497)

at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)


Has anyone got a simple example on how to allow spark to connect to a
kerberized HDFS location?

I know that spark needs to be in Yarn mode to be able to make it work, but
the login method does not seem to be working in this respect. Although I
know that the User Group Information (ugi) object is valid as I have used
it to connect to ZK in the same object and HBase.


Thanks!


Re: Plot DataFrame with matplotlib

2016-03-23 Thread Yavuz Nuzumlalı
Thanks for help, but the example that you referenced gets the values from
RDD as list and plots that list.

What I am specifically asking was that is there a convenient way to plot a
DataFrame object directly?(like pandas DataFrame objects)


On Wed, Mar 23, 2016 at 11:47 AM Teng Qiu  wrote:

> not sure about 3d plot, but there is a nice example:
>
> https://github.com/zalando/spark-appliance/blob/master/examples/notebooks/PySpark_sklearn_matplotlib.ipynb
>
> for plotting rdd or dataframe using matplotlib.
>
> Am Mittwoch, 23. März 2016 schrieb Yavuz Nuzumlalı :
> > Hi all,
> > I'm trying to plot the result of a simple PCA operation, but couldn't
> find a clear documentation about plotting data frames.
> > Here is the output of my data frame:
> > ++
> > |pca_features|
> > ++
> > |[-255.4681508918886,2.9340031372956155,-0.5357914079267039] |
> > |[-477.03566189308367,-6.170290817861212,-5.280827588464785] |
> > |[-163.13388125540507,-4.571443623272966,-1.2349427928939671]|
> > |[-53.721252166903255,0.6162589419996329,-0.39569546286098245]   |
> > [-27.97717473880869,0.30883567826481106,-0.11159555340377557]   |
> > |[-118.27508063853554,1.3484584740407748,-0.8088790388907207]|
> > Values of `pca_features` column is DenseVector s created using
> VectorAssembler.
> > How can I draw a simple 3d scatter plot from this data frame?
> > Thanks


Re: Zeppelin Integration

2016-03-23 Thread ayan guha
Hi All

After spending few more days with the issue, I finally found the issue
listed in Spark Jira - https://issues.apache.org/jira/browse/SPARK-8659

I would love to know if there are any roadmap for this? Maybe someone from
dev group can confirm?

Thank you in advance

Best
Ayan

On Thu, Mar 10, 2016 at 10:32 PM, ayan guha  wrote:

> Thanks guys for reply. Yes, Zeppelin with Spark is pretty compelling
> choice, for single user. Any pointers for using Zeppelin for multi user
> scenario? In essence, can we either (a) Use Zeppelin to connect to a long
> running Spark Application which has some pre-cached Dataframes? (b) Can
> Zeppelin user be passed down and use Ranger to implement Hive RBAC?
>
> I know I am sounding a little vague, but such is the problem state in my
> mind :) Any help will be appreciated.
>
> Best
> Ayan
>
> On Thu, Mar 10, 2016 at 9:51 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Zeppelin is pretty a good choice for Spark. It has a UI that allows you
>> to run your code. It has Interpreter where you change the connection
>> configuration. I made mine run on port 21999 (a deamon process on Linux
>> host where your spark master is running). It is pretty easy to set up and
>> run.
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 10 March 2016 at 10:26, Sabarish Sasidharan <
>> sabarish.sasidha...@manthan.com> wrote:
>>
>>> I believe you need to co-locate your Zeppelin on the same node where
>>> Spark is installed. You need to specify the SPARK HOME. The master I used
>>> was YARN.
>>>
>>> Zeppelin exposes a notebook interface. A notebook can have many
>>> paragraphs. You run the paragraphs. You can mix multiple contexts in the
>>> same notebook. So first paragraph can be scala, second can be sql that uses
>>> DF from first paragraph etc. If you use a select query, the output is
>>> automatically displayed as a chart.
>>>
>>> As RDDs are bound to the context that creates them, I don't think
>>> Zeppelin can use those RDDs.
>>>
>>> I don't know if notebooks can be reused within other notebooks. It would
>>> be a nice way of doing some common preparatory work (like building these
>>> RDDs).
>>>
>>> Regards
>>> Sab
>>>
>>> On Thu, Mar 10, 2016 at 2:28 PM, ayan guha  wrote:
>>>
 Hi All

 I am writing this in order to get a fair understanding of how zeppelin
 can be integrated with Spark.

 Our use case is to load few tables from a DB to Spark, run some
 transformation. Once done, we want to expose data through Zeppelin for
 analytics. I have few question around that to sound off any gross
 architectural flaws.

 Questions:

 1. How Zeppelin connects to Spark? Thriftserver? Thrift JDBC?

 2. What is the scope of Spark application when it is used from
 Zeppelin? For example, if I have few subsequent actions in zeppelin like
 map,filter,reduceByKey, filter,collect. I assume this will translate to an
 application and get submitted to Spark. However, If I want to use reuse
 some part of the data (for example) after first map transformation in
 earlier application. Can I do it? Or will it be another application and
 another spark submit?

  In our use case data will already be loaded in RDDs. So how Zeppelin
 can access it?

 3. How can I control access on specific rdds to specific users in
 Zeppelin (assuming we have implemented some way of login mechanism in
 Zeppelin and we have a mapping between Zeppelin users and their LDAP
 accounts). Is it even possible?

 4. If Zeppelin is not a good choice, yet, for the use case, what are
 the other alternatives?

 appreciate any help/pointers/guidance.


 --
 Best Regards,
 Ayan Guha

>>>
>>>
>>>
>>> --
>>>
>>> Architect - Big Data
>>> Ph: +91 99805 99458
>>>
>>> Manthan Systems | *Company of the year - Analytics (2014 Frost and
>>> Sullivan India ICT)*
>>> +++
>>>
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>



-- 
Best Regards,
Ayan Guha


Re: Spark SQL Optimization

2016-03-23 Thread Takeshi Yamamuro
Hi, all

What's the size of three tables? Also, what's the performance difference of
the two queries?

On Tue, Mar 22, 2016 at 3:53 PM, Rishi Mishra  wrote:

> What we have observed so far is Spark picks join order in the same order
> as tables in from clause is specified.  Sometimes reordering benefits the
> join query.
> This can be an inbuilt optimization in Spark. But again its not going to
> be straight forward, where rather than table size,  selectivity of Join is
> important.
> Probably some kind of heuristic might help.
>
> Regards,
> Rishitesh Mishra,
> SnappyData . (http://www.snappydata.io/)
>
> https://in.linkedin.com/in/rishiteshmishra
>
> On Mon, Mar 21, 2016 at 11:18 PM, gtinside  wrote:
>
>> More details :
>>
>> Execution plan for Original query
>> select distinct pge.portfolio_code
>> from table1 pge join table2 p
>> on p.perm_group = pge.anc_port_group
>> join table3 uge
>> on p.user_group=uge.anc_user_group
>> where uge.user_name = 'user' and p.perm_type = 'TEST'
>>
>> == Physical Plan ==
>> TungstenAggregate(key=[portfolio_code#14119], functions=[],
>> output=[portfolio_code#14119])
>>  TungstenExchange hashpartitioning(portfolio_code#14119)
>>   TungstenAggregate(key=[portfolio_code#14119], functions=[],
>> output=[portfolio_code#14119])
>>TungstenProject [portfolio_code#14119]
>> BroadcastHashJoin [user_group#13665], [anc_user_group#13658],
>> BuildRight
>>  TungstenProject [portfolio_code#14119,user_group#13665]
>>   BroadcastHashJoin [anc_port_group#14117], [perm_group#13667],
>> BuildRight
>>ConvertToUnsafe
>> Scan
>>
>> ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table1.parquet][portfolio_code#14119,anc_port_group#14117]
>>ConvertToUnsafe
>> Project [user_group#13665,perm_group#13667]
>>  Filter (perm_type#13666 = TEST)
>>   Scan
>>
>> ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table2.parquet][user_group#13665,perm_group#13667,perm_type#13666]
>>  ConvertToUnsafe
>>   Project [anc_user_group#13658]
>>Filter (user_name#13659 = user)
>> Scan
>>
>> ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table3.parquet][anc_user_group#13658,user_name#13659]
>>
>>
>>
>> Execution plan for optimized query
>> select distinct pge.portfolio_code
>> from table1 uge, table2 p, table3 pge
>> where uge.user_name = 'user' and p.perm_type = 'TEST'
>> and p.perm_group = pge.anc_port_group
>> and p.user_group=uge.anc_user_group
>>
>> == Physical Plan ==
>> TungstenAggregate(key=[portfolio_code#14119], functions=[],
>> output=[portfolio_code#14119])
>>  TungstenExchange hashpartitioning(portfolio_code#14119)
>>   TungstenAggregate(key=[portfolio_code#14119], functions=[],
>> output=[portfolio_code#14119])
>>TungstenProject [portfolio_code#14119]
>> BroadcastHashJoin [perm_group#13667], [anc_port_group#14117],
>> BuildRight
>>  TungstenProject [perm_group#13667]
>>   BroadcastHashJoin [anc_user_group#13658], [user_group#13665],
>> BuildRight
>>ConvertToUnsafe
>> Project [anc_user_group#13658]
>>  Filter (user_name#13659 = user)
>>   Scan
>>
>> ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table3.parquet][anc_user_group#13658,user_name#13659]
>>ConvertToUnsafe
>> Project [perm_group#13667,user_group#13665]
>>  Filter (perm_type#13666 = TEST)
>>   Scan
>>
>> ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table2.parquet][perm_group#13667,user_group#13665,perm_type#13666]
>>  ConvertToUnsafe
>>   Scan
>>
>> ParquetRelation[snackfs://shared:9042/aladdin_data_beta/table1.parquet][portfolio_code#14119,anc_port_group#14117]
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Optimization-tp26548p26553.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 
---
Takeshi Yamamuro


Applying filter to a date column

2016-03-23 Thread Mich Talebzadeh
Hi,

I have a UDF created and registered as below

val df =
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
"true").option("header", "true").load("/data/stg/table2")
val current_date = sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(),
'dd/MM/') ").collect.apply(0).getString(0)
def ChangeDate(word : String) : String = {
   return
word.substring(6,10)+"-"+word.substring(3,5)+"-"+word.substring(0,2)
}
sqlContext.udf.register("ChangeDate", ChangeDate(_:String))

Now I want to filter on all Payment date older than 6 months. I do this


scala> df.filter(months_between(lit(current_date),ChangeDate(col("Payment
date")).toDate) > 6)
:29: error: type mismatch;
 found   : org.apache.spark.sql.Column
 required: String

df.filter(months_between(lit(current_date),ChangeDate(col("Payment
date")).toDate) > 6)


It comes back with the above error. Any ideas?


Thanks



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


Cached Parquet file paths problem

2016-03-23 Thread psmolinski
Hi,

After migration from Spark 1.5.2 to 1.6.1 I faced strange issue. I have a
Parquet directory
with partitions. Each partition (month) is a subject of incremental ETL that
takes current
Avro files and replaces the corresponding Parquet files.

Now there is a problem that appeared in 1.6.x:
I have a couple of derived data frames. After ETL finishes all RDDs and
DataFrames are
properly recreated, but for some reason the originally captured file paths
are retained.
Of course due to the override some paths are gone.

As a result I am getting exceptions as shown below. As I mentioned it all
worked flawlessly
in Spark 1.5.x, i.e. after ETL the engine nicely read the new directory
structure.

Is there any setting to restore the previous behaviour?

Regards,
Piotr

org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in
stage 32.0 failed 1 times, most recent failure: Lost task 7.0 in stage 32.0
(TID 386, localhost): java.io.FileNotFoundException: File does not exist:
hdfs://demo.sample/apps/demo/transactions/month=2015-09-01/part-r-00026-792365f9-d1f5-4a70-a3d4-e0b87f6ee087.gz.parquet
at
org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1122)
at
org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:385)
at
org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:157)
at
org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
at
org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.(SqlNewHadoopRDD.scala:180)
at
org.apache.spark.rdd.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:126)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cached-Parquet-file-paths-problem-tp26576.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Plot DataFrame with matplotlib

2016-03-23 Thread Teng Qiu
not sure about 3d plot, but there is a nice example:
https://github.com/zalando/spark-appliance/blob/master/examples/notebooks/PySpark_sklearn_matplotlib.ipynb

for plotting rdd or dataframe using matplotlib.

Am Mittwoch, 23. März 2016 schrieb Yavuz Nuzumlalı :
> Hi all,
> I'm trying to plot the result of a simple PCA operation, but couldn't
find a clear documentation about plotting data frames.
> Here is the output of my data frame:
> ++
> |pca_features|
> ++
> |[-255.4681508918886,2.9340031372956155,-0.5357914079267039] |
> |[-477.03566189308367,-6.170290817861212,-5.280827588464785] |
> |[-163.13388125540507,-4.571443623272966,-1.2349427928939671]|
> |[-53.721252166903255,0.6162589419996329,-0.39569546286098245]   |
> [-27.97717473880869,0.30883567826481106,-0.11159555340377557]   |
> |[-118.27508063853554,1.3484584740407748,-0.8088790388907207]|
> Values of `pca_features` column is DenseVector s created using
VectorAssembler.
> How can I draw a simple 3d scatter plot from this data frame?
> Thanks


Plot DataFrame with matplotlib

2016-03-23 Thread Yavuz Nuzumlalı
Hi all,

I'm trying to plot the result of a simple PCA operation, but couldn't find
a clear documentation about plotting data frames.

Here is the output of my data frame:

++
|pca_features|
++
|[-255.4681508918886,2.9340031372956155,-0.5357914079267039] |
|[-477.03566189308367,-6.170290817861212,-5.280827588464785] |
|[-163.13388125540507,-4.571443623272966,-1.2349427928939671]|
|[-53.721252166903255,0.6162589419996329,-0.39569546286098245]   |
[-27.97717473880869,0.30883567826481106,-0.11159555340377557]   |
|[-118.27508063853554,1.3484584740407748,-0.8088790388907207]|

Values of `pca_features` column is DenseVector s created using
VectorAssembler.

How can I draw a simple 3d scatter plot from this data frame?

Thanks


Re: Problem using saveAsNewAPIHadoopFile API

2016-03-23 Thread Surendra , Manchikanti
Hi Vetal,

You may try with MultiOutPutFormat instead of TextOutPutFormat in
saveAsNewAPIHadoopFile().

Regards,
Surendra M

-- Surendra Manchikanti

On Tue, Mar 22, 2016 at 10:26 AM, vetal king  wrote:

> We are using Spark 1.4 for Spark Streaming. Kafka is data source for the
> Spark Stream.
>
> Records are published on Kafka every second. Our requirement is to store
> records published on Kafka in a single folder per minute. The stream will
> read records every five seconds. For instance records published during 1200
> PM and 1201PM are stored in folder "1200"; between 1201PM and 1202PM in
> folder "1201" and so on.
>
> The code I wrote is as follows
>
> //First Group records in RDD by date
> stream.foreachRDD (rddWithinStream -> {
> JavaPairRDD rddGroupedByDirectory = 
> rddWithinStream.mapToPair(t -> {
> return new Tuple2 (targetHadoopFolder, t._2());
> }).groupByKey();
> // All records grouped by folders they will be stored in
>
>
> // Create RDD for each target folder.
> for (String hadoopFolder : rddGroupedByDirectory.keys().collect()) {
> JavaPairRDD  rddByKey = 
> rddGroupedByDirectory.filter(groupedTuples -> {
> return groupedTuples._1().equals(hadoopFolder);
> });
>
> // And store it in Hadoop
> rddByKey.saveAsNewAPIHadoopFile(directory, String.class, String.class, 
> TextOutputFormat.class);
> }
>
> Since the Stream processes data every five seconds, saveAsNewAPIHadoopFile
> gets invoked multiple times in a minute. This causes "Part-0" file to
> be overwritten every time.
>
> I was expecting that in the directory specified by "directory" parameter,
> saveAsNewAPIHadoopFile will keep creating part-N file even when I've a
> sinlge worker node.
>
> Any help/alternatives are greatly appreciated.
>
> Thanks.
>