Spark SQL 1.6.3 ORDER BY and partitions

2017-01-06 Thread Joseph Naegele
I have two separate but similar issues that I've narrowed down to a pretty good 
level of detail. I'm using Spark 1.6.3, particularly Spark SQL.

I'm concerned with a single dataset for now, although the details apply to 
other, larger datasets. I'll call it "table". It's around 160 M records, 
average of 78 bytes each, so about 12 GB uncompressed. It's 2 GB compressed in 
HDFS.

First issue:
The following query works if "table" is comprised of 200 partitions (on disk), 
but fails when "table" is 1200 partitions with the "Total size of serialized 
results of 1031 tasks (6.0 GB) is bigger than spark.driver.maxResultSize (6.0 
GB)" error:

SELECT * FROM orc.`table` ORDER BY field DESC LIMIT 10;

This is possibly related to the TakeOrderedAndProject step in the execution 
plan, because the following queries do not give me problems:

SELECT * FROM orc.`table`;
SELECT * FROM orc.`table` ORDER BY field DESC;
SELECT * FROM orc.`table` LIMIT 10;

All of which have different execution plans.
My "table" has 1200 partitions because I must use a large value for 
spark.sql.shuffle.partitions to handle joins and window functions on much 
larger DataFrames in my application. Too many partitions may be suboptimal, but 
it shouldn't lead to large serialized results, correct?

Any ideas? I've seen https://issues.apache.org/jira/browse/SPARK-12837, but I 
think my issue is a bit more specific.


Second issue:
The difference between execution when calling .cache() and .count() on the 
following two DataFrames:

A: sqlContext.sql("SELECT * FROM table")
B: sqlContext.sql("SELECT * FROM table ORDER BY field DESC")

Counting the rows of A works as expected. A single Spark job with 2 stages. 
Load from Hadoop, map, aggregate, reduce to a number.

The same can't be said for B, however. The .cache() call spawns a Spark job 
before I even call .count(), loading from HDFS and performing ConvertToSafe and 
Exchange. The .count() call spawns another job, the first task of which appears 
to re-load from HDFS and again perform ConvertToSafe and Exchange, writing 1200 
shuffle partitions. The next stage then proceeds to read the shuffle data 
across only 2 tasks. One of these tasks completes immediately and the other 
runs indefinitely, failing because the partition is too large (the 
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE error).

Does this behavior make sense at all? Obviously it doesn't make sense to sort 
rows if I'm just counting them, but this is a simplified example of a more 
complex application in which caching makes sense. My executors have more than 
enough memory to cache this entire DataFrame.

Thanks for reading

---
Joe Naegele
Grier Forensics



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark GraphFrame ConnectedComponents

2017-01-06 Thread Steve Loughran

On 5 Jan 2017, at 21:10, Ankur Srivastava 
mailto:ankur.srivast...@gmail.com>> wrote:

Yes I did try it out and it choses the local file system as my checkpoint 
location starts with s3n://

I am not sure how can I make it load the S3FileSystem.

set fs.default.name to s3n://whatever , or, in spark context, 
spark.hadoop.fs.default.name

However

1. you should really use s3a, if you have the hadoop 2.7 JARs on your classpath.
2. neither s3n or s3a are real filesystems, and certain assumptions that 
checkpointing code tends to make "renames being O(1) atomic calls" do not hold. 
It may be that checkpointing to s3 isn't as robust as you'd like




On Thu, Jan 5, 2017 at 12:12 PM, Felix Cheung 
mailto:felixcheun...@hotmail.com>> wrote:
Right, I'd agree, it seems to be only with delete.

Could you by chance run just the delete to see if it fails

FileSystem.get(sc.hadoopConfiguration)
.delete(new Path(somepath), true)

From: Ankur Srivastava 
mailto:ankur.srivast...@gmail.com>>
Sent: Thursday, January 5, 2017 10:05:03 AM
To: Felix Cheung
Cc: user@spark.apache.org

Subject: Re: Spark GraphFrame ConnectedComponents

Yes it works to read the vertices and edges data from S3 location and is also 
able to write the checkpoint files to S3. It only fails when deleting the data 
and that is because it tries to use the default file system. I tried looking up 
how to update the default file system but could not find anything in that 
regard.

Thanks
Ankur

On Thu, Jan 5, 2017 at 12:55 AM, Felix Cheung 
mailto:felixcheun...@hotmail.com>> wrote:
>From the stack it looks to be an error from the explicit call to 
>hadoop.fs.FileSystem.

Is the URL scheme for s3n registered?
Does it work when you try to read from s3 from Spark?

_
From: Ankur Srivastava 
mailto:ankur.srivast...@gmail.com>>
Sent: Wednesday, January 4, 2017 9:23 PM
Subject: Re: Spark GraphFrame ConnectedComponents
To: Felix Cheung mailto:felixcheun...@hotmail.com>>
Cc: mailto:user@spark.apache.org>>



This is the exact trace from the driver logs

Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: 
s3n:///8ac233e4-10f9-4eb3-aa53-df6d9d7ea7be/connected-components-c1dbc2b0/3,
 expected: file:///
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:645)
at 
org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:80)
at 
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:529)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:747)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:524)
at org.apache.hadoop.fs.ChecksumFileSystem.delete(ChecksumFileSystem.java:534)
at 
org.graphframes.lib.ConnectedComponents$.org$graphframes$lib$ConnectedComponents$$run(ConnectedComponents.scala:340)
at org.graphframes.lib.ConnectedComponents.run(ConnectedComponents.scala:139)
at GraphTest.main(GraphTest.java:31) --- Application Class
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

And I am running spark v 1.6.2 and graphframes v 0.3.0-spark1.6-s_2.10

Thanks
Ankur

On Wed, Jan 4, 2017 at 8:03 PM, Ankur Srivastava 
mailto:ankur.srivast...@gmail.com>> wrote:
Hi

I am rerunning the pipeline to generate the exact trace, I have below part of 
trace from last run:

Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: 
s3n://, expected: file:///
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:642)
at 
org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:69)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:516)
at org.apache.hadoop.fs.ChecksumFileSystem.delete(ChecksumFileSystem.java:528)

Also I think the error is happening in this part of the code 
"ConnectedComponents.scala:339" I am referring the code 
@https://github.com/graphframes/graphframes/blob/master/src/main/scala/org/graphframes/lib/ConnectedComponents.scala

  if (shouldCheckpoint && (iteration % checkpointInterval == 0)) {
// TODO: remove this after DataFrame.checkpoint is implemented
val out = s"${checkpointDir.get}/$iteration"
ee.write.parquet(out)
// may hit S3 eventually consistent issue
ee = sqlContext.read.parquet(out)


Re: Spark Read from Google store and save in AWS s3

2017-01-06 Thread Steve Loughran

On 5 Jan 2017, at 20:07, Manohar Reddy 
mailto:manohar.re...@happiestminds.com>> wrote:

Hi Steve,
Thanks for the reply and below is follow-up help needed from you.
Do you mean we can set up two native file system to single sparkcontext ,so 
then based on urls prefix( gs://bucket/path and dest s3a://bucket-on-s3/path2) 
will that identify and write/read appropriate cloud.

Is that my understanding right?


I wouldn't use the term "native FS", as they are all just client libraries to 
talk to the relevant object stores. You'd still have to have the cluster 
"default" FS.

but yes, you can use them: get your classpath right and they are all just URLS 
you use your code


Re: Kafka 0.8 + Spark 2.0 Partition Issue

2017-01-06 Thread Cody Koeninger
Kafka is designed to only allow reads from leaders.  You need to fix
this at the kafka level not the spark level.

On Fri, Jan 6, 2017 at 7:33 AM, Raghu Vadapalli  wrote:
>
> My spark 2.0 +  kafka 0.8 streaming job fails with error partition leaderset
> exception. When I check the kafka topic the partition, it is indeed in error
> with Leader = -1 and empty ISR.  I did lot of google and all of them point
> to either restarting or deleting the topic.  To do any of those two in
> production system while other topics are in heavy use is next to impossible.
> Now my question, is there way to force spark to read from leaderless
> partition accepting some dataloss or inconsistency ? Or force the immediate
> sync followed by election ( a kafka users group question but I am pushing my
> luck:) here )
>
> Topic: vzecsapplog Partition: 8 Leader: -1 Replicas: 5,4 Isr:
>
> --
> Regards,
> Raghu
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Kafka 0.8 + Spark 2.0 Partition Issue

2017-01-06 Thread Raghu Vadapalli



My spark 2.0 +  kafka 0.8 streaming job fails with error partition leaderset 
exception. When I check the kafka topic the partition, it is indeed in error 
with Leader = -1 and empty ISR.  I did lot of google and all of them point to 
either restarting or deleting the topic.  To do any of those two in production 
system while other topics are in heavy use is next to impossible.  Now my 
question, is there way to force spark to read from leaderless partition 
accepting some dataloss or inconsistency ? Or force the immediate sync followed 
by election ( a kafka users group question but I am pushing my luck:) here ) 

Topic: vzecsapplog  Partition: 8Leader: -1  Replicas: 5,4   Isr:



--
Regards,
Raghu 





Re: Approach: Incremental data load from HBASE

2017-01-06 Thread Chetan Khatri
Ayan, Thanks
Correct I am not thinking RDBMS terms, i am wearing NoSQL glasses !


On Fri, Jan 6, 2017 at 3:23 PM, ayan guha  wrote:

> IMHO you should not "think" HBase in RDMBS terms, but you can use
> ColumnFilters to filter out new records
>
> On Fri, Jan 6, 2017 at 7:22 PM, Chetan Khatri  > wrote:
>
>> Hi Ayan,
>>
>> I mean by Incremental load from HBase, weekly running batch jobs takes
>> rows from HBase table and dump it out to Hive. Now when next i run Job it
>> only takes newly arrived jobs.
>>
>> Same as if we use Sqoop for incremental load from RDBMS to Hive with
>> below command,
>>
>> sqoop job --create myssb1 -- import --connect
>> jdbc:mysql://:/sakila --username admin --password admin
>> --driver=com.mysql.jdbc.Driver --query "SELECT address_id, address,
>> district, city_id, postal_code, alast_update, cityid, city, country_id,
>> clast_update FROM(SELECT a.address_id as address_id, a.address as address,
>> a.district as district, a.city_id as city_id, a.postal_code as postal_code,
>> a.last_update as alast_update, c.city_id as cityid, c.city as city,
>> c.country_id as country_id, c.last_update as clast_update FROM
>> sakila.address a INNER JOIN sakila.city c ON a.city_id=c.city_id) as sub
>> WHERE $CONDITIONS" --incremental lastmodified --check-column alast_update
>> --last-value 1900-01-01 --target-dir /user/cloudera/ssb7 --hive-import
>> --hive-table test.sakila -m 1 --hive-drop-import-delims --map-column-java
>> address=String
>>
>> Probably i am looking for any tool from HBase incubator family which does
>> the job for me, or other alternative approaches can be done through reading
>> Hbase tables in RDD and saving RDD to Hive.
>>
>> Thanks.
>>
>>
>> On Thu, Jan 5, 2017 at 2:02 AM, ayan guha  wrote:
>>
>>> Hi Chetan
>>>
>>> What do you mean by incremental load from HBase? There is a timestamp
>>> marker for each cell, but not at Row level.
>>>
>>> On Wed, Jan 4, 2017 at 10:37 PM, Chetan Khatri <
>>> chetan.opensou...@gmail.com> wrote:
>>>
 Ted Yu,

 You understood wrong, i said Incremental load from HBase to Hive,
 individually you can say Incremental Import from HBase.

 On Wed, Dec 21, 2016 at 10:04 PM, Ted Yu  wrote:

> Incremental load traditionally means generating hfiles and
> using org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles to load
> the data into hbase.
>
> For your use case, the producer needs to find rows where the flag is 0
> or 1.
> After such rows are obtained, it is up to you how the result of
> processing is delivered to hbase.
>
> Cheers
>
> On Wed, Dec 21, 2016 at 8:00 AM, Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
>
>> Ok, Sure will ask.
>>
>> But what would be generic best practice solution for Incremental load
>> from HBASE.
>>
>> On Wed, Dec 21, 2016 at 8:42 PM, Ted Yu  wrote:
>>
>>> I haven't used Gobblin.
>>> You can consider asking Gobblin mailing list of the first option.
>>>
>>> The second option would work.
>>>
>>>
>>> On Wed, Dec 21, 2016 at 2:28 AM, Chetan Khatri <
>>> chetan.opensou...@gmail.com> wrote:
>>>
 Hello Guys,

 I would like to understand different approach for Distributed
 Incremental load from HBase, Is there any *tool / incubactor tool* 
 which
 satisfy requirement ?

 *Approach 1:*

 Write Kafka Producer and maintain manually column flag for events
 and ingest it with Linkedin Gobblin to HDFS / S3.

 *Approach 2:*

 Run Scheduled Spark Job - Read from HBase and do transformations
 and maintain flag column at HBase Level.

 In above both approach, I need to maintain column level flags. such
 as 0 - by default, 1-sent,2-sent and acknowledged. So next time 
 Producer
 will take another 1000 rows of batch where flag is 0 or 1.

 I am looking for best practice approach with any distributed tool.

 Thanks.

 - Chetan Khatri

>>>
>>>
>>
>

>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Approach: Incremental data load from HBASE

2017-01-06 Thread ayan guha
IMHO you should not "think" HBase in RDMBS terms, but you can use
ColumnFilters to filter out new records

On Fri, Jan 6, 2017 at 7:22 PM, Chetan Khatri 
wrote:

> Hi Ayan,
>
> I mean by Incremental load from HBase, weekly running batch jobs takes
> rows from HBase table and dump it out to Hive. Now when next i run Job it
> only takes newly arrived jobs.
>
> Same as if we use Sqoop for incremental load from RDBMS to Hive with below
> command,
>
> sqoop job --create myssb1 -- import --connect
> jdbc:mysql://:/sakila --username admin --password admin
> --driver=com.mysql.jdbc.Driver --query "SELECT address_id, address,
> district, city_id, postal_code, alast_update, cityid, city, country_id,
> clast_update FROM(SELECT a.address_id as address_id, a.address as address,
> a.district as district, a.city_id as city_id, a.postal_code as postal_code,
> a.last_update as alast_update, c.city_id as cityid, c.city as city,
> c.country_id as country_id, c.last_update as clast_update FROM
> sakila.address a INNER JOIN sakila.city c ON a.city_id=c.city_id) as sub
> WHERE $CONDITIONS" --incremental lastmodified --check-column alast_update
> --last-value 1900-01-01 --target-dir /user/cloudera/ssb7 --hive-import
> --hive-table test.sakila -m 1 --hive-drop-import-delims --map-column-java
> address=String
>
> Probably i am looking for any tool from HBase incubator family which does
> the job for me, or other alternative approaches can be done through reading
> Hbase tables in RDD and saving RDD to Hive.
>
> Thanks.
>
>
> On Thu, Jan 5, 2017 at 2:02 AM, ayan guha  wrote:
>
>> Hi Chetan
>>
>> What do you mean by incremental load from HBase? There is a timestamp
>> marker for each cell, but not at Row level.
>>
>> On Wed, Jan 4, 2017 at 10:37 PM, Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Ted Yu,
>>>
>>> You understood wrong, i said Incremental load from HBase to Hive,
>>> individually you can say Incremental Import from HBase.
>>>
>>> On Wed, Dec 21, 2016 at 10:04 PM, Ted Yu  wrote:
>>>
 Incremental load traditionally means generating hfiles and
 using org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles to load
 the data into hbase.

 For your use case, the producer needs to find rows where the flag is 0
 or 1.
 After such rows are obtained, it is up to you how the result of
 processing is delivered to hbase.

 Cheers

 On Wed, Dec 21, 2016 at 8:00 AM, Chetan Khatri <
 chetan.opensou...@gmail.com> wrote:

> Ok, Sure will ask.
>
> But what would be generic best practice solution for Incremental load
> from HBASE.
>
> On Wed, Dec 21, 2016 at 8:42 PM, Ted Yu  wrote:
>
>> I haven't used Gobblin.
>> You can consider asking Gobblin mailing list of the first option.
>>
>> The second option would work.
>>
>>
>> On Wed, Dec 21, 2016 at 2:28 AM, Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Hello Guys,
>>>
>>> I would like to understand different approach for Distributed
>>> Incremental load from HBase, Is there any *tool / incubactor tool* which
>>> satisfy requirement ?
>>>
>>> *Approach 1:*
>>>
>>> Write Kafka Producer and maintain manually column flag for events
>>> and ingest it with Linkedin Gobblin to HDFS / S3.
>>>
>>> *Approach 2:*
>>>
>>> Run Scheduled Spark Job - Read from HBase and do transformations and
>>> maintain flag column at HBase Level.
>>>
>>> In above both approach, I need to maintain column level flags. such
>>> as 0 - by default, 1-sent,2-sent and acknowledged. So next time Producer
>>> will take another 1000 rows of batch where flag is 0 or 1.
>>>
>>> I am looking for best practice approach with any distributed tool.
>>>
>>> Thanks.
>>>
>>> - Chetan Khatri
>>>
>>
>>
>

>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Best Regards,
Ayan Guha


CompileException with Maps in Spark 2.1.0

2017-01-06 Thread nils.grabbert
Hi all,

the following code will run with Spark 2.0.2 but not with Spark 2.1.0:

// 
case class Data(id: Int, param: Map[String, InnerData])
case class InnerData(name: String, value: Int)

import spark.implicits._
  
val e= Data(1, Map("key" -> InnerData("name", 123)))
val data = Seq(e)
val d= data.toDS()
//

Here is the exception:

Caused by: org.codehaus.commons.compiler.CompileException: File
'generated.java', Line 63, Column 46: Expression
"ExternalMapToCatalyst_value_isNull1" is not an rvalue
  at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11004)
  at
org.codehaus.janino.UnitCompiler.toRvalueOrCompileException(UnitCompiler.java:6639)
  at
org.codehaus.janino.UnitCompiler.getConstantValue2(UnitCompiler.java:5001)
  at org.codehaus.janino.UnitCompiler.access$10500(UnitCompiler.java:206)
  at
org.codehaus.janino.UnitCompiler$13.visitAmbiguousName(UnitCompiler.java:4984)
  at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:3633)
  at org.codehaus.janino.Java$Lvalue.accept(Java.java:3563)
  at
org.codehaus.janino.UnitCompiler.getConstantValue(UnitCompiler.java:4956)
  at
org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4925)
  at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3189)
  at org.codehaus.janino.UnitCompiler.access$5100(UnitCompiler.java:206)
  at
org.codehaus.janino.UnitCompiler$9.visitAssignment(UnitCompiler.java:3143)
  at
org.codehaus.janino.UnitCompiler$9.visitAssignment(UnitCompiler.java:3139)
  at org.codehaus.janino.Java$Assignment.accept(Java.java:3847)
  at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3139)
  at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2112)
  at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:206)
  at
org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1377)
  at
org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1370)
  at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2558)
  at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1370)
  at
org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1450)
  at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2811)
  at
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1262)
  at
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1234)
  at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:538)
  at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:890)
  at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:894)
  at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:206)
  at
org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:377)
  at
org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:369)
  at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1128)
  at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:369)
  at
org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:1209)
  at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:564)
  at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:420)
  at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:206)
  at
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:374)
  at
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:369)
  at
org.codehaus.janino.Java$AbstractPackageMemberClassDeclaration.accept(Java.java:1309)
  at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:369)
  at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:345)
  at
org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:396)
  at
org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:311)
  at
org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:229)
  at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:196)
  at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:91)
  at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:935)
  ... 77 more

Best,
Nils



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/CompileException-with-Maps-in-Spark-2-1-0-tp28283.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



CompileException with Maps in Spark 2.1.0

2017-01-06 Thread Nils Grabbert
Hi all,

the following code will run with Spark 2.0.2 but not with Spark 2.1.0:

//
case class Data(id: Int, param: Map[String, InnerData])
case class InnerData(name: String, value: Int)

import spark.implicits._

val e= Data(1, Map("key" -> InnerData("name", 123)))
val data = Seq(e)
val d= data.toDS()
//

Here is the exception:

Caused by: org.codehaus.commons.compiler.CompileException: File
'generated.java', Line 63, Column 46: Expression
"ExternalMapToCatalyst_value_isNull1" is not an rvalue
  at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11004)
  at
org.codehaus.janino.UnitCompiler.toRvalueOrCompileException(UnitCompiler.java:6639)
  at
org.codehaus.janino.UnitCompiler.getConstantValue2(UnitCompiler.java:5001)
  at org.codehaus.janino.UnitCompiler.access$10500(UnitCompiler.java:206)
  at
org.codehaus.janino.UnitCompiler$13.visitAmbiguousName(UnitCompiler.java:4984)
  at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:3633)
  at org.codehaus.janino.Java$Lvalue.accept(Java.java:3563)
  at
org.codehaus.janino.UnitCompiler.getConstantValue(UnitCompiler.java:4956)
  at
org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4925)
  at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3189)
  at org.codehaus.janino.UnitCompiler.access$5100(UnitCompiler.java:206)
  at
org.codehaus.janino.UnitCompiler$9.visitAssignment(UnitCompiler.java:3143)
  at
org.codehaus.janino.UnitCompiler$9.visitAssignment(UnitCompiler.java:3139)
  at org.codehaus.janino.Java$Assignment.accept(Java.java:3847)
  at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3139)
  at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2112)
  at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:206)
  at
org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1377)
  at
org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1370)
  at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2558)
  at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1370)
  at
org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1450)
  at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2811)
  at
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1262)
  at
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1234)
  at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:538)
  at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:890)
  at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:894)
  at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:206)
  at
org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:377)
  at
org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:369)
  at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1128)
  at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:369)
  at
org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:1209)
  at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:564)
  at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:420)
  at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:206)
  at
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:374)
  at
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:369)
  at
org.codehaus.janino.Java$AbstractPackageMemberClassDeclaration.accept(Java.java:1309)
  at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:369)
  at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:345)
  at
org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:396)
  at
org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:311)
  at
org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:229)
  at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:196)
  at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:91)
  at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:935)
  ... 77 more

Best,
Nils
-- 
Nils Grabbert
VP Data Science

NumberFour AG
St. Annenufer 5
20457 Hamburg
Germany

Mobile: +49 170 2213300
Phone: +49 40 32507726
Fax: +49 40 32507736
nils.grabb...@numberfour.eu


Re: Approach: Incremental data load from HBASE

2017-01-06 Thread Chetan Khatri
Hi Ayan,

I mean by Incremental load from HBase, weekly running batch jobs takes rows
from HBase table and dump it out to Hive. Now when next i run Job it only
takes newly arrived jobs.

Same as if we use Sqoop for incremental load from RDBMS to Hive with below
command,

sqoop job --create myssb1 -- import --connect
jdbc:mysql://:/sakila --username admin --password admin
--driver=com.mysql.jdbc.Driver --query "SELECT address_id, address,
district, city_id, postal_code, alast_update, cityid, city, country_id,
clast_update FROM(SELECT a.address_id as address_id, a.address as address,
a.district as district, a.city_id as city_id, a.postal_code as postal_code,
a.last_update as alast_update, c.city_id as cityid, c.city as city,
c.country_id as country_id, c.last_update as clast_update FROM
sakila.address a INNER JOIN sakila.city c ON a.city_id=c.city_id) as sub
WHERE $CONDITIONS" --incremental lastmodified --check-column alast_update
--last-value 1900-01-01 --target-dir /user/cloudera/ssb7 --hive-import
--hive-table test.sakila -m 1 --hive-drop-import-delims --map-column-java
address=String

Probably i am looking for any tool from HBase incubator family which does
the job for me, or other alternative approaches can be done through reading
Hbase tables in RDD and saving RDD to Hive.

Thanks.


On Thu, Jan 5, 2017 at 2:02 AM, ayan guha  wrote:

> Hi Chetan
>
> What do you mean by incremental load from HBase? There is a timestamp
> marker for each cell, but not at Row level.
>
> On Wed, Jan 4, 2017 at 10:37 PM, Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
>
>> Ted Yu,
>>
>> You understood wrong, i said Incremental load from HBase to Hive,
>> individually you can say Incremental Import from HBase.
>>
>> On Wed, Dec 21, 2016 at 10:04 PM, Ted Yu  wrote:
>>
>>> Incremental load traditionally means generating hfiles and
>>> using org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles to load
>>> the data into hbase.
>>>
>>> For your use case, the producer needs to find rows where the flag is 0
>>> or 1.
>>> After such rows are obtained, it is up to you how the result of
>>> processing is delivered to hbase.
>>>
>>> Cheers
>>>
>>> On Wed, Dec 21, 2016 at 8:00 AM, Chetan Khatri <
>>> chetan.opensou...@gmail.com> wrote:
>>>
 Ok, Sure will ask.

 But what would be generic best practice solution for Incremental load
 from HBASE.

 On Wed, Dec 21, 2016 at 8:42 PM, Ted Yu  wrote:

> I haven't used Gobblin.
> You can consider asking Gobblin mailing list of the first option.
>
> The second option would work.
>
>
> On Wed, Dec 21, 2016 at 2:28 AM, Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
>
>> Hello Guys,
>>
>> I would like to understand different approach for Distributed
>> Incremental load from HBase, Is there any *tool / incubactor tool* which
>> satisfy requirement ?
>>
>> *Approach 1:*
>>
>> Write Kafka Producer and maintain manually column flag for events and
>> ingest it with Linkedin Gobblin to HDFS / S3.
>>
>> *Approach 2:*
>>
>> Run Scheduled Spark Job - Read from HBase and do transformations and
>> maintain flag column at HBase Level.
>>
>> In above both approach, I need to maintain column level flags. such
>> as 0 - by default, 1-sent,2-sent and acknowledged. So next time Producer
>> will take another 1000 rows of batch where flag is 0 or 1.
>>
>> I am looking for best practice approach with any distributed tool.
>>
>> Thanks.
>>
>> - Chetan Khatri
>>
>
>

>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


How do I read data in dockerized kafka from a spark streaming application

2017-01-06 Thread shyla deshpande
My kafka is in a docker container.

How do I read this Kafka data in my Spark streaming app.

Also, I need to write data from Spark Streaming to Cassandra database which
is in docker container.

I appreciate any help.

Thanks.