Re: Check if dataframe is empty

2017-03-06 Thread Deepak Sharma
If the df is empty , the .take would return
java.util.NoSuchElementException.
This can be done as below:
df.rdd.isEmpty


On Tue, Mar 7, 2017 at 9:33 AM,  wrote:

> Dataframe.take(1) is faster.
>
>
>
> *From:* ashaita...@nz.imshealth.com [mailto:ashaita...@nz.imshealth.com]
> *Sent:* Tuesday, March 07, 2017 9:22 AM
> *To:* user@spark.apache.org
> *Subject:* Check if dataframe is empty
>
>
>
> Hello!
>
>
>
> I am pretty sure that I am asking something which has been already asked
> lots of times. However, I cannot find the question in the mailing list
> archive.
>
>
>
> The question is – I need to check whether dataframe is empty or not. I
> receive a dataframe from 3rd party library and this dataframe can be
> potentially empty, but also can be really huge – millions of rows. Thus, I
> want to avoid of doing some logic in case the dataframe is empty. How can I
> efficiently check it?
>
>
>
> Right now I am doing it in the following way:
>
>
>
> *private def *isEmpty(df: Option[DataFrame]): Boolean = {
>   df.isEmpty || (df.isDefined && df.get.limit(1).*rdd*.isEmpty())
> }
>
>
>
> But the performance is really slow for big dataframes. I would be grateful
> for any suggestions.
>
>
>
> Thank you in advance.
>
>
>
>
> Best regards,
>
>
>
> Artem
>
>
> --
>
> ** IMPORTANT--PLEASE READ 
> This electronic message, including its attachments, is CONFIDENTIAL and may
> contain PROPRIETARY or LEGALLY PRIVILEGED or PROTECTED information and is
> intended for the authorized recipient of the sender. If you are not the
> intended recipient, you are hereby notified that any use, disclosure,
> copying, or distribution of this message or any of the information included
> in it is unauthorized and strictly prohibited. If you have received this
> message in error, please immediately notify the sender by reply e-mail and
> permanently delete this message and its attachments, along with any copies
> thereof, from all locations received (e.g., computer, mobile device, etc.).
> Thank you. 
> 
>
> --
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Where allowed
> by local law, electronic communications with Accenture and its affiliates,
> including e-mail and instant messaging (including content), may be scanned
> by our systems for the purposes of information security and assessment of
> internal compliance with Accenture policy.
> 
> __
>
> www.accenture.com
>



-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


RE: Check if dataframe is empty

2017-03-06 Thread AShaitarov
Thank you for the prompt response. But why is it faster? There is an 
implementation of isEmpty for rdd:

def isEmpty(): Boolean = withScope {
  partitions.length == 0 || take(1).length == 0
}


Basically, the same take(1). Is it because of limit?


Regards,

Artem Shaitarov

From: jasbir.s...@accenture.com [mailto:jasbir.s...@accenture.com]
Sent: Tuesday, 7 March 2017 5:04 p.m.
To: Artem Shaitarov ; user@spark.apache.org
Subject: RE: Check if dataframe is empty

Dataframe.take(1) is faster.

From: ashaita...@nz.imshealth.com 
[mailto:ashaita...@nz.imshealth.com]
Sent: Tuesday, March 07, 2017 9:22 AM
To: user@spark.apache.org
Subject: Check if dataframe is empty

Hello!

I am pretty sure that I am asking something which has been already asked lots 
of times. However, I cannot find the question in the mailing list archive.

The question is - I need to check whether dataframe is empty or not. I receive 
a dataframe from 3rd party library and this dataframe can be potentially empty, 
but also can be really huge - millions of rows. Thus, I want to avoid of doing 
some logic in case the dataframe is empty. How can I efficiently check it?

Right now I am doing it in the following way:

private def isEmpty(df: Option[DataFrame]): Boolean = {
  df.isEmpty || (df.isDefined && df.get.limit(1).rdd.isEmpty())
}

But the performance is really slow for big dataframes. I would be grateful for 
any suggestions.

Thank you in advance.


Best regards,

Artem


** IMPORTANT--PLEASE READ  This 
electronic message, including its attachments, is CONFIDENTIAL and may contain 
PROPRIETARY or LEGALLY PRIVILEGED or PROTECTED information and is intended for 
the authorized recipient of the sender. If you are not the intended recipient, 
you are hereby notified that any use, disclosure, copying, or distribution of 
this message or any of the information included in it is unauthorized and 
strictly prohibited. If you have received this message in error, please 
immediately notify the sender by reply e-mail and permanently delete this 
message and its attachments, along with any copies thereof, from all locations 
received (e.g., computer, mobile device, etc.). Thank you. 




This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Where allowed by local law, electronic 
communications with Accenture and its affiliates, including e-mail and instant 
messaging (including content), may be scanned by our systems for the purposes 
of information security and assessment of internal compliance with Accenture 
policy.
__

www.accenture.com


RE: Check if dataframe is empty

2017-03-06 Thread jasbir.sing
Dataframe.take(1) is faster.

From: ashaita...@nz.imshealth.com [mailto:ashaita...@nz.imshealth.com]
Sent: Tuesday, March 07, 2017 9:22 AM
To: user@spark.apache.org
Subject: Check if dataframe is empty

Hello!

I am pretty sure that I am asking something which has been already asked lots 
of times. However, I cannot find the question in the mailing list archive.

The question is - I need to check whether dataframe is empty or not. I receive 
a dataframe from 3rd party library and this dataframe can be potentially empty, 
but also can be really huge - millions of rows. Thus, I want to avoid of doing 
some logic in case the dataframe is empty. How can I efficiently check it?

Right now I am doing it in the following way:

private def isEmpty(df: Option[DataFrame]): Boolean = {
  df.isEmpty || (df.isDefined && df.get.limit(1).rdd.isEmpty())
}

But the performance is really slow for big dataframes. I would be grateful for 
any suggestions.

Thank you in advance.


Best regards,

Artem


** IMPORTANT--PLEASE READ  This 
electronic message, including its attachments, is CONFIDENTIAL and may contain 
PROPRIETARY or LEGALLY PRIVILEGED or PROTECTED information and is intended for 
the authorized recipient of the sender. If you are not the intended recipient, 
you are hereby notified that any use, disclosure, copying, or distribution of 
this message or any of the information included in it is unauthorized and 
strictly prohibited. If you have received this message in error, please 
immediately notify the sender by reply e-mail and permanently delete this 
message and its attachments, along with any copies thereof, from all locations 
received (e.g., computer, mobile device, etc.). Thank you. 




This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Where allowed by local law, electronic 
communications with Accenture and its affiliates, including e-mail and instant 
messaging (including content), may be scanned by our systems for the purposes 
of information security and assessment of internal compliance with Accenture 
policy.
__

www.accenture.com


Check if dataframe is empty

2017-03-06 Thread AShaitarov
Hello!

I am pretty sure that I am asking something which has been already asked lots 
of times. However, I cannot find the question in the mailing list archive.

The question is - I need to check whether dataframe is empty or not. I receive 
a dataframe from 3rd party library and this dataframe can be potentially empty, 
but also can be really huge - millions of rows. Thus, I want to avoid of doing 
some logic in case the dataframe is empty. How can I efficiently check it?

Right now I am doing it in the following way:

private def isEmpty(df: Option[DataFrame]): Boolean = {
  df.isEmpty || (df.isDefined && df.get.limit(1).rdd.isEmpty())
}

But the performance is really slow for big dataframes. I would be grateful for 
any suggestions.

Thank you in advance.


Best regards,

Artem


** IMPORTANT--PLEASE READ  This 
electronic message, including its attachments, is CONFIDENTIAL and may contain 
PROPRIETARY or LEGALLY PRIVILEGED or PROTECTED information and is intended for 
the authorized recipient of the sender. If you are not the intended recipient, 
you are hereby notified that any use, disclosure, copying, or distribution of 
this message or any of the information included in it is unauthorized and 
strictly prohibited. If you have received this message in error, please 
immediately notify the sender by reply e-mail and permanently delete this 
message and its attachments, along with any copies thereof, from all locations 
received (e.g., computer, mobile device, etc.). Thank you. 



How does Spark provide Hive style bucketing support?

2017-03-06 Thread SRK
Hi,

How does Spark provide Hive style bucketing support in Spark 2.x?

Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-does-Spark-provide-Hive-style-bucketing-support-tp28462.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: FPGrowth Model is taking too long to generate frequent item sets

2017-03-06 Thread Raju Bairishetti
@Eli, Thanks for the suggestion. If you do not mind can you please
elaborate approaches?

On Mon, Mar 6, 2017 at 7:29 PM, Eli Super  wrote:

> Hi
>
> Try to implement binning and/or feature engineering (smart feature
> selection for example)
>
> Good luck
>
> On Mon, Mar 6, 2017 at 6:56 AM, Raju Bairishetti  wrote:
>
>> Hi,
>>   I am new to Spark ML Lib. I am using FPGrowth model for finding related
>> items.
>>
>> Number of transactions are 63K and the total number of items in all
>> transactions are 200K.
>>
>> I am running FPGrowth model to generate frequent items sets. It is taking
>> huge amount of time to generate frequent itemsets.* I am setting
>> min-support value such that each item appears in at least ~(number of
>> items)/(number of transactions).*
>>
>> It is taking lots of time in case If I say item can appear at least once
>> in the database.
>>
>> If I give higher value to min-support then output is very smaller.
>>
>> Could anyone please guide me how to reduce the execution time for
>> generating frequent items?
>>
>> --
>> Thanks,
>> Raju Bairishetti,
>> www.lazada.com
>>
>
>


-- 

--
Thanks,
Raju Bairishetti,
www.lazada.com


Trouble with Thriftserver with hsqldb (Spark 2.1.0)

2017-03-06 Thread Yana Kadiyska
Hi folks, trying to run Spark 2.1.0 thrift server against an hsqldb file
and it seems to...hang.

I am starting thrift server with:

sbin/start-thriftserver.sh --driver-class-path ./conf/hsqldb-2.3.4.jar ,
completely local setup


hive-site.xml is like this:






  

hive.metastore.warehouse.dir

/tmp

  

  

javax.jdo.option.ConnectionURL

jdbc:hsqldb:file:/tmp/hive-metastore

JDBC connect string for a JDBC metastore

  


  

 javax.jdo.option.ConnectionDriverName

 org.hsqldb.jdbc.JDBCDriver

  

  

  javax.jdo.option.ConnectionUserName

  SA



  

javax.jdo.option.ConnectionPassword



  

  

datanucleus.autoCreateSchema

true

  




I have turned on logging with

log4j.category.DataNucleus=ALL

log4j.logger.org.apache.spark.sql.hive.thriftserver=ALL

and my spark log seems stuck at:

17/03/06 16:42:46 DEBUG Schema: Schema Transaction started with connection
"com.jolbox.bonecp.ConnectionHandle@44fdce3c" with isolation "serializable"

17/03/06 16:42:46 DEBUG Schema: Check of existence of CDS returned no table

17/03/06 16:42:46 DEBUG Schema: Creating table CDS

17/03/06 16:42:46 DEBUG Schema: CREATE TABLE CDS

(

CD_ID BIGINT NOT NULL,

CONSTRAINT CDS_PK PRIMARY KEY (CD_ID)

)


Hoping someone an suggest what other logging I can turn on or what a
possible issue can be (nothing is listening on port 1 at this point,
java process did open 4040)


Re: org.apache.spark.SparkException: Task not serializable

2017-03-06 Thread Mina Aslani
Thank you Ankur for the quick response, really appreciate it! Making the
class serializable resolved the exception!

Best regards,
Mina

On Mon, Mar 6, 2017 at 4:20 PM, Ankur Srivastava  wrote:

> The fix for this make your class Serializable. The reason being the
> closures you have defined in the class need to be serialized and copied
> over to all executor nodes.
>
> Hope this helps.
>
> Thanks
> Ankur
>
> On Mon, Mar 6, 2017 at 1:06 PM, Mina Aslani  wrote:
>
>> Hi,
>>
>> I am trying to start with spark and get number of lines of a text file in my 
>> mac, however I get
>>
>> org.apache.spark.SparkException: Task not serializable error on
>>
>> JavaRDD logData = javaCtx.textFile(file);
>>
>> Please see below for the sample of code and the stackTrace.
>>
>> Any idea why this error is thrown?
>>
>> Best regards,
>>
>> Mina
>>
>> System.out.println("Creating Spark Configuration");
>> SparkConf javaConf = new SparkConf();
>> javaConf.setAppName("My First Spark Java Application");
>> javaConf.setMaster("PATH to my spark");
>> System.out.println("Creating Spark Context");
>> JavaSparkContext javaCtx = new JavaSparkContext(javaConf);
>> System.out.println("Loading the Dataset and will further process it");
>> String file = "file:///file.txt";
>> JavaRDD logData = javaCtx.textFile(file);
>>
>> long numLines = logData.filter(new Function() {
>>public Boolean call(String s) {
>>   return true;
>>}
>> }).count();
>>
>> System.out.println("Number of Lines in the Dataset "+numLines);
>>
>> javaCtx.close();
>>
>> Exception in thread "main" org.apache.spark.SparkException: Task not 
>> serializable
>>  at 
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
>>  at 
>> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
>>  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>>  at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
>>  at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:387)
>>  at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:386)
>>  at 
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>  at 
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>>  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>>  at org.apache.spark.rdd.RDD.filter(RDD.scala:386)
>>  at org.apache.spark.api.java.JavaRDD.filter(JavaRDD.scala:78)
>>
>>
>


Re: org.apache.spark.SparkException: Task not serializable

2017-03-06 Thread Ankur Srivastava
The fix for this make your class Serializable. The reason being the
closures you have defined in the class need to be serialized and copied
over to all executor nodes.

Hope this helps.

Thanks
Ankur

On Mon, Mar 6, 2017 at 1:06 PM, Mina Aslani  wrote:

> Hi,
>
> I am trying to start with spark and get number of lines of a text file in my 
> mac, however I get
>
> org.apache.spark.SparkException: Task not serializable error on
>
> JavaRDD logData = javaCtx.textFile(file);
>
> Please see below for the sample of code and the stackTrace.
>
> Any idea why this error is thrown?
>
> Best regards,
>
> Mina
>
> System.out.println("Creating Spark Configuration");
> SparkConf javaConf = new SparkConf();
> javaConf.setAppName("My First Spark Java Application");
> javaConf.setMaster("PATH to my spark");
> System.out.println("Creating Spark Context");
> JavaSparkContext javaCtx = new JavaSparkContext(javaConf);
> System.out.println("Loading the Dataset and will further process it");
> String file = "file:///file.txt";
> JavaRDD logData = javaCtx.textFile(file);
>
> long numLines = logData.filter(new Function() {
>public Boolean call(String s) {
>   return true;
>}
> }).count();
>
> System.out.println("Number of Lines in the Dataset "+numLines);
>
> javaCtx.close();
>
> Exception in thread "main" org.apache.spark.SparkException: Task not 
> serializable
>   at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
>   at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
>   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>   at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
>   at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:387)
>   at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:386)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>   at org.apache.spark.rdd.RDD.filter(RDD.scala:386)
>   at org.apache.spark.api.java.JavaRDD.filter(JavaRDD.scala:78)
>
>


org.apache.spark.SparkException: Task not serializable

2017-03-06 Thread Mina Aslani
Hi,

I am trying to start with spark and get number of lines of a text file
in my mac, however I get

org.apache.spark.SparkException: Task not serializable error on

JavaRDD logData = javaCtx.textFile(file);

Please see below for the sample of code and the stackTrace.

Any idea why this error is thrown?

Best regards,

Mina

System.out.println("Creating Spark Configuration");
SparkConf javaConf = new SparkConf();
javaConf.setAppName("My First Spark Java Application");
javaConf.setMaster("PATH to my spark");
System.out.println("Creating Spark Context");
JavaSparkContext javaCtx = new JavaSparkContext(javaConf);
System.out.println("Loading the Dataset and will further process it");
String file = "file:///file.txt";
JavaRDD logData = javaCtx.textFile(file);

long numLines = logData.filter(new Function() {
   public Boolean call(String s) {
  return true;
   }
}).count();

System.out.println("Number of Lines in the Dataset "+numLines);

javaCtx.close();

Exception in thread "main" org.apache.spark.SparkException: Task not
serializable
at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:387)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:386)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.filter(RDD.scala:386)
at org.apache.spark.api.java.JavaRDD.filter(JavaRDD.scala:78)


Fwd: Spark application does not work with only one core

2017-03-06 Thread Maximilien Belinga
I am currently working to deploy two spark applications  and I want to
restrict cores and executors per application. My config is as follows:

spark.executor.cores=1
spark.driver.cores=1
spark.cores.max=1
spark.executor.instances=1

Now the issue is that with this exact configuration, one streaming
application works while the other doesn't. The application that doesn't
work remain in state: RUNNING and continuously print the following message
in logs:

17/03/06 10:31:50 INFO JobScheduler: Added jobs for time 148881431 ms
17/03/06 10:31:55 INFO JobScheduler: Added jobs for time 1488814315000 ms

Surprisingly, if I change the configuration to the following, the same
application that was not working now proceed without problem.

spark.executor.cores=3
spark.driver.cores=1
spark.cores.max=3
spark.executor.instances=3

*Note:* The application does not work with value 2. This is why I took 3.

It thus appears that some streaming applications need more cores that
others. My question is what determines how much resources an application
needs? Why is one application not able to run with once single core while
it can run with 3 cores?


Regards,
Maximilien.


Spark application does not work with only one core

2017-03-06 Thread Maximilien Belinga
I am currently working to deploy two spark applications  and I want to
restrict cores and executors per application. My config is as follows:

spark.executor.cores=1
spark.driver.cores=1
spark.cores.max=1
spark.executor.instances=1

Now the issue is that with this exact configuration, one streaming
application works while the other doesn't. The application that doesn't
work remain in state: RUNNING and continuously print the following message
in logs:

17/03/06 10:31:50 INFO JobScheduler: Added jobs for time 148881431 ms
17/03/06 10:31:55 INFO JobScheduler: Added jobs for time 1488814315000 ms

Surprisingly, if I change the configuration to the following, the same
application that was not working now proceed without problem.

spark.executor.cores=3
spark.driver.cores=1
spark.cores.max=3
spark.executor.instances=3

*Note:* The application does not work with value 2. This is why I took 3.

It thus appears that some streaming applications need more cores that
others. My question is what determines how much resources an application
needs? Why is one application not able to run with once single core while
it can run with 3 cores?


Regards,
Maximilien.


Re: Wrong runtime type when using newAPIHadoopFile in Java

2017-03-06 Thread Steve Loughran

On 6 Mar 2017, at 12:30, Nira Amit 
> wrote:

 And it's very difficult if it's doing unexpected things.

All serialisations do unexpected things. Nobody understands them. Sorry



Re: Wrong runtime type when using newAPIHadoopFile in Java

2017-03-06 Thread Nira Amit
And by the way - I don't want the Avro details to be hidden away from me.
The whole purpose of the work I'm doing is to benchmark different
serialization tools and strategies. If I want to use Kryo serialization for
example, then I need to understand how the API works. And it's very
difficult if it's doing unexpected things.

On Mon, Mar 6, 2017 at 1:25 PM, Nira Amit  wrote:

> Hi Sean,
> Yes, we discussed this in Jira and you suggested I take this discussion to
> the mailing list, so I did.
> I don't have the option to migrate the code I'm working on to Datasets at
> the moment (or to Scala, as another developer suggested in the Jira
> discussion), so I have to work with the the Java RDD API.
> I've been working with Java for many years and understand that not all
> type errors can be caught in compile time. What I don't understand is how
> you manage to create an object of type AvroKey with the
> actual datum it encloses being GenericData$Record. If my code threw a
> RuntimeException in the line `MyCustomAvroKey customKey = first._1;` for
> example, saying it has a AvroKey - then there would
> be no confusion. But what happens in practice is that somehow my customKey
> is of type AvroKey and only when I try to retrieve the
> MyCustomType datum I get the exception. There must be some hackish things
> going on under the hood here, because this is just not how Java is supposed
> to work.
> Which is why I still think that this should be considered a bug.
>
> On Mon, Mar 6, 2017 at 1:02 PM, Sean Owen  wrote:
>
>> I think this is the same thing we already discussed extensively on your
>> JIRA.
>>
>> The type of the key/value class argument to newAPIHadoopFile are not the
>> type of your custom class, but of the Writable describing encoding of keys
>> and values in the file. I think that's the start of part of the problem.
>> This is how all Hadoop-related APIs would work, because Hadoop uses
>> Writables for encoding.
>>
>> You're asking again why it isn't caught at compile time, and that stems
>> from two basic causes. First is the way the underlying Hadoop API works,
>> needing Class parameters because of it's Java roots. Second is the
>> Scala/Java difference; the Scala API will accept, for instance,
>> non-Writable arguments if you can supply implicit conversion to Writable
>> (if I recall correctly). This isn't available in Java, leaving its API
>> expressing flexibility that isn't there. This isn't the exact issue here;
>> it's that you're using raw class literals in Java which have no generic
>> types -- they are Class. The InputFormat arg expresses nothing about the
>> key/value types; there's nothing to 'contradict' your declaration, which is
>> doesn't represent the actual types correctly. (You can cast class literals
>> to (Class<..>) to express this if you want. It's a little mess in Java.)
>> That's why it compiles just as any Java code with an invalid cast compiles
>> but fails at runtime.
>>
>> It is a bit weird if you're not familiar with the Hadoop APIs, Writables,
>> or how Class arguments shake out in the context of generics. It does take
>> the research you did. It does work as you've found. The reason you were
>> steered several times to the DataFrame API is that it can hide a lot of
>> this from you, including details of Avro and Writables. You're directly
>> accessing Hadoop APIs that are foreign to you.
>>
>> This and the JIRA do not describe a bug.
>>
>>
>>
>> On Mon, Mar 6, 2017 at 11:29 AM Nira  wrote:
>>
>>> I tried to load a custom type from avro files into a RDD using the
>>> newAPIHadoopFile. I started with the following naive code:
>>>
>>> JavaPairRDD events =
>>> sc.newAPIHadoopFile("file:/path/to/data.avro",
>>> AvroKeyInputFormat.class, MyCustomClass.class,
>>> NullWritable.class,
>>> sc.hadoopConfiguration());
>>> Tuple2 first = events.first();
>>>
>>> This doesn't work and shouldn't work, because the AvroKeyInputFormat
>>> returns
>>> a GenericData$Record. The thing is it compiles, and you can even assign
>>> the
>>> first tuple to the variable "first". You will get a runtime error only
>>> when
>>> you try to access a field of MyCustomClass from the tuple (e.g
>>> first._1.getSomeField()).
>>> This behavior sent me on a wild goose chase that took many hours over
>>> many
>>> weeks to figure out, because I never expected the method to return a
>>> wrong
>>> type at runtime. If there's a mismatch between what the InputFormat
>>> returns
>>> and the class I'm trying to load - shouldn't this be a compilation
>>> error? Or
>>> at least the runtime error should occur already when I try to assign the
>>> tuple to a variable of the wrong type. This is very unexpected behavior.
>>>
>>> Moreover, I actually fixed my code and implemented all the required
>>> wrapper
>>> and custom classes:
>>> 

Re: Wrong runtime type when using newAPIHadoopFile in Java

2017-03-06 Thread Nira Amit
Hi Sean,
Yes, we discussed this in Jira and you suggested I take this discussion to
the mailing list, so I did.
I don't have the option to migrate the code I'm working on to Datasets at
the moment (or to Scala, as another developer suggested in the Jira
discussion), so I have to work with the the Java RDD API.
I've been working with Java for many years and understand that not all type
errors can be caught in compile time. What I don't understand is how you
manage to create an object of type AvroKey with the actual
datum it encloses being GenericData$Record. If my code threw a
RuntimeException in the line `MyCustomAvroKey customKey = first._1;` for
example, saying it has a AvroKey - then there would be
no confusion. But what happens in practice is that somehow my customKey is
of type AvroKey and only when I try to retrieve the
MyCustomType datum I get the exception. There must be some hackish things
going on under the hood here, because this is just not how Java is supposed
to work.
Which is why I still think that this should be considered a bug.

On Mon, Mar 6, 2017 at 1:02 PM, Sean Owen  wrote:

> I think this is the same thing we already discussed extensively on your
> JIRA.
>
> The type of the key/value class argument to newAPIHadoopFile are not the
> type of your custom class, but of the Writable describing encoding of keys
> and values in the file. I think that's the start of part of the problem.
> This is how all Hadoop-related APIs would work, because Hadoop uses
> Writables for encoding.
>
> You're asking again why it isn't caught at compile time, and that stems
> from two basic causes. First is the way the underlying Hadoop API works,
> needing Class parameters because of it's Java roots. Second is the
> Scala/Java difference; the Scala API will accept, for instance,
> non-Writable arguments if you can supply implicit conversion to Writable
> (if I recall correctly). This isn't available in Java, leaving its API
> expressing flexibility that isn't there. This isn't the exact issue here;
> it's that you're using raw class literals in Java which have no generic
> types -- they are Class. The InputFormat arg expresses nothing about the
> key/value types; there's nothing to 'contradict' your declaration, which is
> doesn't represent the actual types correctly. (You can cast class literals
> to (Class<..>) to express this if you want. It's a little mess in Java.)
> That's why it compiles just as any Java code with an invalid cast compiles
> but fails at runtime.
>
> It is a bit weird if you're not familiar with the Hadoop APIs, Writables,
> or how Class arguments shake out in the context of generics. It does take
> the research you did. It does work as you've found. The reason you were
> steered several times to the DataFrame API is that it can hide a lot of
> this from you, including details of Avro and Writables. You're directly
> accessing Hadoop APIs that are foreign to you.
>
> This and the JIRA do not describe a bug.
>
>
>
> On Mon, Mar 6, 2017 at 11:29 AM Nira  wrote:
>
>> I tried to load a custom type from avro files into a RDD using the
>> newAPIHadoopFile. I started with the following naive code:
>>
>> JavaPairRDD events =
>> sc.newAPIHadoopFile("file:/path/to/data.avro",
>> AvroKeyInputFormat.class, MyCustomClass.class,
>> NullWritable.class,
>> sc.hadoopConfiguration());
>> Tuple2 first = events.first();
>>
>> This doesn't work and shouldn't work, because the AvroKeyInputFormat
>> returns
>> a GenericData$Record. The thing is it compiles, and you can even assign
>> the
>> first tuple to the variable "first". You will get a runtime error only
>> when
>> you try to access a field of MyCustomClass from the tuple (e.g
>> first._1.getSomeField()).
>> This behavior sent me on a wild goose chase that took many hours over many
>> weeks to figure out, because I never expected the method to return a wrong
>> type at runtime. If there's a mismatch between what the InputFormat
>> returns
>> and the class I'm trying to load - shouldn't this be a compilation error?
>> Or
>> at least the runtime error should occur already when I try to assign the
>> tuple to a variable of the wrong type. This is very unexpected behavior.
>>
>> Moreover, I actually fixed my code and implemented all the required
>> wrapper
>> and custom classes:
>> JavaPairRDD records =
>> sc.newAPIHadoopFile("file:/path/to/data.avro",
>> MyCustomInputFormat.class, MyCustomAvroKey.class,
>> NullWritable.class,
>> sc.hadoopConfiguration());
>> Tuple2 first = records.first();
>> MyCustomAvroKey customKey = first._1;
>>
>> But this time I forgot that I moved the class to another package so the
>> namespace in the schema file was 

Re: Wrong runtime type when using newAPIHadoopFile in Java

2017-03-06 Thread Sean Owen
I think this is the same thing we already discussed extensively on your
JIRA.

The type of the key/value class argument to newAPIHadoopFile are not the
type of your custom class, but of the Writable describing encoding of keys
and values in the file. I think that's the start of part of the problem.
This is how all Hadoop-related APIs would work, because Hadoop uses
Writables for encoding.

You're asking again why it isn't caught at compile time, and that stems
from two basic causes. First is the way the underlying Hadoop API works,
needing Class parameters because of it's Java roots. Second is the
Scala/Java difference; the Scala API will accept, for instance,
non-Writable arguments if you can supply implicit conversion to Writable
(if I recall correctly). This isn't available in Java, leaving its API
expressing flexibility that isn't there. This isn't the exact issue here;
it's that you're using raw class literals in Java which have no generic
types -- they are Class. The InputFormat arg expresses nothing about the
key/value types; there's nothing to 'contradict' your declaration, which is
doesn't represent the actual types correctly. (You can cast class literals
to (Class<..>) to express this if you want. It's a little mess in Java.)
That's why it compiles just as any Java code with an invalid cast compiles
but fails at runtime.

It is a bit weird if you're not familiar with the Hadoop APIs, Writables,
or how Class arguments shake out in the context of generics. It does take
the research you did. It does work as you've found. The reason you were
steered several times to the DataFrame API is that it can hide a lot of
this from you, including details of Avro and Writables. You're directly
accessing Hadoop APIs that are foreign to you.

This and the JIRA do not describe a bug.



On Mon, Mar 6, 2017 at 11:29 AM Nira  wrote:

> I tried to load a custom type from avro files into a RDD using the
> newAPIHadoopFile. I started with the following naive code:
>
> JavaPairRDD events =
> sc.newAPIHadoopFile("file:/path/to/data.avro",
> AvroKeyInputFormat.class, MyCustomClass.class,
> NullWritable.class,
> sc.hadoopConfiguration());
> Tuple2 first = events.first();
>
> This doesn't work and shouldn't work, because the AvroKeyInputFormat
> returns
> a GenericData$Record. The thing is it compiles, and you can even assign the
> first tuple to the variable "first". You will get a runtime error only when
> you try to access a field of MyCustomClass from the tuple (e.g
> first._1.getSomeField()).
> This behavior sent me on a wild goose chase that took many hours over many
> weeks to figure out, because I never expected the method to return a wrong
> type at runtime. If there's a mismatch between what the InputFormat returns
> and the class I'm trying to load - shouldn't this be a compilation error?
> Or
> at least the runtime error should occur already when I try to assign the
> tuple to a variable of the wrong type. This is very unexpected behavior.
>
> Moreover, I actually fixed my code and implemented all the required wrapper
> and custom classes:
> JavaPairRDD records =
> sc.newAPIHadoopFile("file:/path/to/data.avro",
> MyCustomInputFormat.class, MyCustomAvroKey.class,
> NullWritable.class,
> sc.hadoopConfiguration());
> Tuple2 first = records.first();
> MyCustomAvroKey customKey = first._1;
>
> But this time I forgot that I moved the class to another package so the
> namespace in the schema file was wrong. And again, in runtime the method
> datum() of customKey returned a GenericData$Record instead of a
> MyCustomClass.
>
> Now, I understand that this has to do with the avro library (the
> GenericDatumReader class has an "expected" and "actual" schema, and it
> defaults to a GenericData$Record if something is wrong with my schema). But
> does it really make sense to return a different class from this API, which
> is not even assignable to my class, when this happens? Why would I ever get
> a class U from a wrapper class declared to be a Wrapper? It's just
> confusing and makes it so much harder to pinpoint the real problem.
>
> As I said, this weird behavior cost me a lot of time, and I've been
> googling
> this for weeks and am getting the impression that very few Java developers
> figured this API out. I posted  a question
> <
> http://stackoverflow.com/questions/41836851/wrong-runtime-type-in-rdd-when-reading-from-avro-with-custom-serializer
> >
> about it in StackOverflow and got several views and upvotes but no replies
> (a  similar question
> <
> http://stackoverflow.com/questions/41834120/override-avroio-default-coder-in-dataflow/
> >
> about loading custom types in Google Dataflow got answered within a couple
> of days).
>
> I think this 

Wrong runtime type when using newAPIHadoopFile in Java

2017-03-06 Thread Nira
I tried to load a custom type from avro files into a RDD using the
newAPIHadoopFile. I started with the following naive code:

JavaPairRDD events =
sc.newAPIHadoopFile("file:/path/to/data.avro",
AvroKeyInputFormat.class, MyCustomClass.class,
NullWritable.class,
sc.hadoopConfiguration());
Tuple2 first = events.first();

This doesn't work and shouldn't work, because the AvroKeyInputFormat returns
a GenericData$Record. The thing is it compiles, and you can even assign the
first tuple to the variable "first". You will get a runtime error only when
you try to access a field of MyCustomClass from the tuple (e.g
first._1.getSomeField()).
This behavior sent me on a wild goose chase that took many hours over many
weeks to figure out, because I never expected the method to return a wrong
type at runtime. If there's a mismatch between what the InputFormat returns
and the class I'm trying to load - shouldn't this be a compilation error? Or
at least the runtime error should occur already when I try to assign the
tuple to a variable of the wrong type. This is very unexpected behavior.

Moreover, I actually fixed my code and implemented all the required wrapper
and custom classes:
JavaPairRDD records =
sc.newAPIHadoopFile("file:/path/to/data.avro",
MyCustomInputFormat.class, MyCustomAvroKey.class,
NullWritable.class,
sc.hadoopConfiguration());
Tuple2 first = records.first();
MyCustomAvroKey customKey = first._1;

But this time I forgot that I moved the class to another package so the
namespace in the schema file was wrong. And again, in runtime the method
datum() of customKey returned a GenericData$Record instead of a
MyCustomClass.

Now, I understand that this has to do with the avro library (the
GenericDatumReader class has an "expected" and "actual" schema, and it
defaults to a GenericData$Record if something is wrong with my schema). But
does it really make sense to return a different class from this API, which
is not even assignable to my class, when this happens? Why would I ever get
a class U from a wrapper class declared to be a Wrapper? It's just
confusing and makes it so much harder to pinpoint the real problem.

As I said, this weird behavior cost me a lot of time, and I've been googling
this for weeks and am getting the impression that very few Java developers
figured this API out. I posted  a question

  
about it in StackOverflow and got several views and upvotes but no replies
(a  similar question

  
about loading custom types in Google Dataflow got answered within a couple
of days).

I think this behavior should be considered a bug.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Wrong-runtime-type-when-using-newAPIHadoopFile-in-Java-tp28459.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: FPGrowth Model is taking too long to generate frequent item sets

2017-03-06 Thread Eli Super
Hi

Try to implement binning and/or feature engineering (smart feature
selection for example)

Good luck

On Mon, Mar 6, 2017 at 6:56 AM, Raju Bairishetti  wrote:

> Hi,
>   I am new to Spark ML Lib. I am using FPGrowth model for finding related
> items.
>
> Number of transactions are 63K and the total number of items in all
> transactions are 200K.
>
> I am running FPGrowth model to generate frequent items sets. It is taking
> huge amount of time to generate frequent itemsets.* I am setting
> min-support value such that each item appears in at least ~(number of
> items)/(number of transactions).*
>
> It is taking lots of time in case If I say item can appear at least once
> in the database.
>
> If I give higher value to min-support then output is very smaller.
>
> Could anyone please guide me how to reduce the execution time for
> generating frequent items?
>
> --
> Thanks,
> Raju Bairishetti,
> www.lazada.com
>


Re: LinearRegressionModel - Negative Predicted Value

2017-03-06 Thread Manish Maheshwari
Thanks Sean. Our training MSE is really large. We definitely need better
predictor variables.

Training Mean Squared Error = 7.72E8

Thanks,
Manish


On Mon, Mar 6, 2017 at 4:45 PM, Sean Owen  wrote:

> There's nothing unusual about negative values from a linear regression.
> If, generally, your predicted values are far from your actual values, then
> your model hasn't fit well. You may have a bug somewhere in your pipeline
> or you may have data without much linear relationship. Most of this isn't a
> Spark problem.
>
> On Mon, Mar 6, 2017 at 8:05 AM Manish Maheshwari 
> wrote:
>
>> Hi All,
>>
>> We are using a LinearRegressionModel in Scala. We are using a standard
>> StandardScaler to normalize the data before modelling.. the Code snippet
>> looks like this -
>>
>> *Modellng - *
>> val labeledPointsRDD = tableRecords.map(row =>
>> {
>> val filtered = row.toSeq.filter({ case s: String => false case _ => true
>> })
>> val converted = filtered.map({ case i: Int => i.toDouble case l: Long =>
>> l.toDouble case d: Double => d case _ => 0.0 })
>> val features = Vectors.dense(converted.slice(1,
>> converted.length).toArray)
>> LabeledPoint(converted(0), features)
>> })
>> val scaler1 = new StandardScaler().fit(labeledPointsRDD.map(x =>
>> x.features))
>> save(sc, scalarModelOutputPath, scaler1)
>> val normalizedData = labeledPointsRDD.map(lp => {LabeledPoint(lp.label,
>> scaler1.transform(lp.features))})
>> val splits = normalizedData.randomSplit(Array(0.8, 0.2))
>> val trainingData = splits(0)
>> val testingData = splits(1)
>> trainingData.cache()
>> var regression = new LinearRegressionWithSGD().setIntercept(true)
>> regression.optimizer.setStepSize(0.01)
>> val model = regression.run(trainingData)
>> model.save(sc, modelOutputPath)
>>
>> Post that when we score the model on the same data that it was trained on
>> using the below snippet we see this -
>>
>> *Scoring - *
>> val labeledPointsRDD = tableRecords.map(row =>
>> {val filtered = row.toSeq.filter({ case s: String => false case _ => true
>> })
>> val converted = filtered.map({ case i: Int => i.toDouble case l: Long =>
>> l.toDouble case d: Double => d case _ => 0.0 })
>> val features = Vectors.dense(converted.toArray)
>> (row(0), features)
>> })
>> val scaler1 = read(sc,scalarModelOutputPath)
>> val normalizedData = labeledPointsRDD.map(p => (p._1,
>> scaler1.transform(p._2)))
>> normalizedData.cache()
>> val model = LinearRegressionModel.load(sc,modelOutputPath)
>> val valuesAndPreds = normalizedData.map(p => (p._1.toString(),
>> model.predict(p._2)))
>>
>> However, a lot of predicted values are negative. The input data has no
>> negative values we we are unable to understand this behaviour.
>> Further the order and sequence of all the variables remains the same in
>> the modelling and testing data frames.
>>
>> Any ideas?
>>
>> Thanks,
>> Manish
>>
>>


Re: LinearRegressionModel - Negative Predicted Value

2017-03-06 Thread Sean Owen
There's nothing unusual about negative values from a linear regression. If,
generally, your predicted values are far from your actual values, then your
model hasn't fit well. You may have a bug somewhere in your pipeline or you
may have data without much linear relationship. Most of this isn't a Spark
problem.

On Mon, Mar 6, 2017 at 8:05 AM Manish Maheshwari 
wrote:

> Hi All,
>
> We are using a LinearRegressionModel in Scala. We are using a standard
> StandardScaler to normalize the data before modelling.. the Code snippet
> looks like this -
>
> *Modellng - *
> val labeledPointsRDD = tableRecords.map(row =>
> {
> val filtered = row.toSeq.filter({ case s: String => false case _ => true })
> val converted = filtered.map({ case i: Int => i.toDouble case l: Long =>
> l.toDouble case d: Double => d case _ => 0.0 })
> val features = Vectors.dense(converted.slice(1, converted.length).toArray)
> LabeledPoint(converted(0), features)
> })
> val scaler1 = new StandardScaler().fit(labeledPointsRDD.map(x =>
> x.features))
> save(sc, scalarModelOutputPath, scaler1)
> val normalizedData = labeledPointsRDD.map(lp => {LabeledPoint(lp.label,
> scaler1.transform(lp.features))})
> val splits = normalizedData.randomSplit(Array(0.8, 0.2))
> val trainingData = splits(0)
> val testingData = splits(1)
> trainingData.cache()
> var regression = new LinearRegressionWithSGD().setIntercept(true)
> regression.optimizer.setStepSize(0.01)
> val model = regression.run(trainingData)
> model.save(sc, modelOutputPath)
>
> Post that when we score the model on the same data that it was trained on
> using the below snippet we see this -
>
> *Scoring - *
> val labeledPointsRDD = tableRecords.map(row =>
> {val filtered = row.toSeq.filter({ case s: String => false case _ => true
> })
> val converted = filtered.map({ case i: Int => i.toDouble case l: Long =>
> l.toDouble case d: Double => d case _ => 0.0 })
> val features = Vectors.dense(converted.toArray)
> (row(0), features)
> })
> val scaler1 = read(sc,scalarModelOutputPath)
> val normalizedData = labeledPointsRDD.map(p => (p._1,
> scaler1.transform(p._2)))
> normalizedData.cache()
> val model = LinearRegressionModel.load(sc,modelOutputPath)
> val valuesAndPreds = normalizedData.map(p => (p._1.toString(),
> model.predict(p._2)))
>
> However, a lot of predicted values are negative. The input data has no
> negative values we we are unable to understand this behaviour.
> Further the order and sequence of all the variables remains the same in
> the modelling and testing data frames.
>
> Any ideas?
>
> Thanks,
> Manish
>
>


LinearRegressionModel - Negative Predicted Value

2017-03-06 Thread Manish Maheshwari
Hi All,

We are using a LinearRegressionModel in Scala. We are using a standard
StandardScaler to normalize the data before modelling.. the Code snippet
looks like this -

*Modellng - *
val labeledPointsRDD = tableRecords.map(row =>
{
val filtered = row.toSeq.filter({ case s: String => false case _ => true })
val converted = filtered.map({ case i: Int => i.toDouble case l: Long =>
l.toDouble case d: Double => d case _ => 0.0 })
val features = Vectors.dense(converted.slice(1, converted.length).toArray)
LabeledPoint(converted(0), features)
})
val scaler1 = new StandardScaler().fit(labeledPointsRDD.map(x =>
x.features))
save(sc, scalarModelOutputPath, scaler1)
val normalizedData = labeledPointsRDD.map(lp => {LabeledPoint(lp.label,
scaler1.transform(lp.features))})
val splits = normalizedData.randomSplit(Array(0.8, 0.2))
val trainingData = splits(0)
val testingData = splits(1)
trainingData.cache()
var regression = new LinearRegressionWithSGD().setIntercept(true)
regression.optimizer.setStepSize(0.01)
val model = regression.run(trainingData)
model.save(sc, modelOutputPath)

Post that when we score the model on the same data that it was trained on
using the below snippet we see this -

*Scoring - *
val labeledPointsRDD = tableRecords.map(row =>
{val filtered = row.toSeq.filter({ case s: String => false case _ => true })
val converted = filtered.map({ case i: Int => i.toDouble case l: Long =>
l.toDouble case d: Double => d case _ => 0.0 })
val features = Vectors.dense(converted.toArray)
(row(0), features)
})
val scaler1 = read(sc,scalarModelOutputPath)
val normalizedData = labeledPointsRDD.map(p => (p._1,
scaler1.transform(p._2)))
normalizedData.cache()
val model = LinearRegressionModel.load(sc,modelOutputPath)
val valuesAndPreds = normalizedData.map(p => (p._1.toString(),
model.predict(p._2)))

However, a lot of predicted values are negative. The input data has no
negative values we we are unable to understand this behaviour.
Further the order and sequence of all the variables remains the same in the
modelling and testing data frames.

Any ideas?

Thanks,
Manish