Re: How to remove empty strings from JavaRDD

2016-04-07 Thread Chris Miller
flatmap?


--
Chris Miller

On Thu, Apr 7, 2016 at 10:25 PM, greg huang <debin.hu...@gmail.com> wrote:

> Hi All,
>
>Can someone give me a example code to get rid of the empty string in
> JavaRDD? I kwon there is a filter method in JavaRDD:
> https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/rdd/RDD.html#filter(scala.Function1)
>
> Regards,
>Greg
>


Re: Spark schema evolution

2016-03-22 Thread Chris Miller
With Avro you solve this by using a default value for the new field...
maybe Parquet is the same?


--
Chris Miller

On Tue, Mar 22, 2016 at 9:34 PM, gtinside <gtins...@gmail.com> wrote:

> Hi ,
>
> I have a table sourced from* 2 parquet files* with few extra columns in one
> of the parquet file. Simple * queries works fine but queries with predicate
> on extra column doesn't work and I get column not found
>
> *Column resp_party_type exist in just one parquet file*
>
> a) Query that work :
> select resp_party_type  from operational_analytics
>
> b) Query that doesn't work : (complains about missing column
> *resp_party_type *)
> select category as Events, resp_party as Team, count(*) as Total from
> operational_analytics where application = 'PeopleMover' and resp_party_type
> = 'Team' group by category, resp_party
>
> *Query Plan for (b)*
> == Physical Plan ==
> TungstenAggregate(key=[category#30986,resp_party#31006],
> functions=[(count(1),mode=Final,isDistinct=false)],
> output=[Events#36266,Team#36267,Total#36268L])
>  TungstenExchange hashpartitioning(category#30986,resp_party#31006)
>   TungstenAggregate(key=[category#30986,resp_party#31006],
> functions=[(count(1),mode=Partial,isDistinct=false)],
> output=[category#30986,resp_party#31006,currentCount#36272L])
>Project [category#30986,resp_party#31006]
> Filter ((application#30983 = PeopleMover) && (resp_party_type#31007 =
> Team))
>  Scan
>
> ParquetRelation[snackfs://tst:9042/aladdin_data_beta/operational_analytics/operational_analytics_peoplemover.parquet,snackfs://tst:9042/aladdin_data_beta/operational_analytics/operational_analytics_mis.parquet][category#30986,resp_party#31006,application#30983,resp_party_type#31007]
>
>
> I have set spark.sql.parquet.mergeSchema = true and
> spark.sql.parquet.filterPushdown = true. When I set
> spark.sql.parquet.filterPushdown = false Query (b) starts working,
> execution
> plan after setting the filterPushdown = false for Query(b)
>
> == Physical Plan ==
> TungstenAggregate(key=[category#30986,resp_party#31006],
> functions=[(count(1),mode=Final,isDistinct=false)],
> output=[Events#36313,Team#36314,Total#36315L])
>  TungstenExchange hashpartitioning(category#30986,resp_party#31006)
>   TungstenAggregate(key=[category#30986,resp_party#31006],
> functions=[(count(1),mode=Partial,isDistinct=false)],
> output=[category#30986,resp_party#31006,currentCount#36319L])
>Project [category#30986,resp_party#31006]
> Filter ((application#30983 = PeopleMover) && (resp_party_type#31007 =
> Team))
>  Scan
>
> ParquetRelation[snackfs://tst:9042/aladdin_data_beta/operational_analytics/operational_analytics_peoplemover.parquet,snackfs://tst:9042/aladdin_data_beta/operational_analytics/operational_analytics_mis.parquet][category#30986,resp_party#31006,application#30983,resp_party_type#31007]
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-schema-evolution-tp26563.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: newbie HDFS S3 best practices

2016-03-16 Thread Chris Miller
If you have lots of small files, distcp should handle that well -- it's
supposed to distribute the transfer of files across the nodes in your
cluster. Conductor looks interesting if you're trying to distribute the
transfer of single, large file(s)...

right?

--
Chris Miller

On Wed, Mar 16, 2016 at 4:43 AM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> Hi Frank
>
> We have thousands of small files . Each file is between 6K to maybe 100k.
>
> Conductor looks interesting
>
> Andy
>
> From: Frank Austin Nothaft <fnoth...@berkeley.edu>
> Date: Tuesday, March 15, 2016 at 11:59 AM
> To: Andrew Davidson <a...@santacruzintegration.com>
> Cc: "user @spark" <user@spark.apache.org>
> Subject: Re: newbie HDFS S3 best practices
>
> Hard to say with #1 without knowing your application’s characteristics;
> for #2, we use conductor <https://github.com/BD2KGenomics/conductor> with
> IAM roles, .boto/.aws/credentials files.
>
> Frank Austin Nothaft
> fnoth...@berkeley.edu
> fnoth...@eecs.berkeley.edu
> 202-340-0466
>
> On Mar 15, 2016, at 11:45 AM, Andy Davidson <a...@santacruzintegration.com
> <a...@santacruzintegration.com>> wrote:
>
> We use the spark-ec2 script to create AWS clusters as needed (we do not
> use AWS EMR)
>
>1. will we get better performance if we copy data to HDFS before we
>run instead of reading directly from S3?
>
>  2. What is a good way to move results from HDFS to S3?
>
>
> It seems like there are many ways to bulk copy to s3. Many of them require
> we explicitly use the AWS_ACCESS_KEY_ID:AWS_SECRET_ACCESS_KEY@
> <AWS_SECRET_ACCESS_KEY@/yasemindeneme/deneme.txt>. This seems like a bad
> idea?
>
> What would you recommend?
>
> Thanks
>
> Andy
>
>
>
>


Re: Does parallelize and collect preserve the original order of list?

2016-03-16 Thread Chris Miller
Short answer: Nope

Less short answer: Spark is not designed to maintain sort order in this
case... it *may*, but there's no guarantee... generally, it would not be in
the same order unless you implement something to order by and then sort the
result based on that.

--
Chris Miller

On Wed, Mar 16, 2016 at 10:16 AM, JoneZhang <joyoungzh...@gmail.com> wrote:

> Step1
> List items = new ArrayList();items.addAll(XXX);
> javaSparkContext.parallelize(items).saveAsTextFile(output);
> Step2
> final List items2 = ctx.textFile(output).collect();
>
> Does items and items2 has the same order?
>
>
> Besh wishes.
> Thanks.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Does-parallelize-and-collect-preserve-the-original-order-of-list-tp26512.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: reading file from S3

2016-03-16 Thread Chris Miller
+1 for Sab's thoughtful answer...

Yasemin: As Gourav said, using IAM roles is considered best practice and
generally will give you fewer headaches in the end... but you may have a
reason for doing it the way you are, and certainly the way you posted
should be supported and not cause the error you described.

--
Chris Miller

On Tue, Mar 15, 2016 at 11:22 PM, Sabarish Sasidharan <
sabarish.sasidha...@manthan.com> wrote:

> There are many solutions to a problem.
>
> Also understand that sometimes your situation might be such. For ex what
> if you are accessing S3 from your Spark job running in your continuous
> integration server sitting in your data center or may be a box under your
> desk. And sometimes you are just trying something.
>
> Also understand that sometimes you want answers to solve your problem at
> hand without redirecting you to something else. Understand what you
> suggested is an appropriate way of doing it, which I myself have proposed
> before, but that doesn't solve the OP's problem at hand.
>
> Regards
> Sab
> On 15-Mar-2016 8:27 pm, "Gourav Sengupta" <gourav.sengu...@gmail.com>
> wrote:
>
>> Oh!!! What the hell
>>
>> Please never use the URI
>>
>> *s3n://AWS_ACCESS_KEY_ID:AWS_SECRET_ACCESS_KEY.*That is a major cause of
>> pain, security issues, code maintenance issues and ofcourse something that
>> Amazon strongly suggests that we do not use. Please use roles and you will
>> not have to worry about security.
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Tue, Mar 15, 2016 at 2:38 PM, Sabarish Sasidharan <
>> sabarish@gmail.com> wrote:
>>
>>> You have a slash before the bucket name. It should be @.
>>>
>>> Regards
>>> Sab
>>> On 15-Mar-2016 4:03 pm, "Yasemin Kaya" <godo...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am using Spark 1.6.0 standalone and I want to read a txt file from S3
>>>> bucket named yasemindeneme and my file name is deneme.txt. But I am getting
>>>> this error. Here is the simple code
>>>> <https://gist.github.com/anonymous/6d174f8587f0f3fd2334>
>>>> Exception in thread "main" java.lang.IllegalArgumentException: Invalid
>>>> hostname in URI s3n://AWS_ACCESS_KEY_ID:AWS_SECRET_ACCESS_KEY@
>>>> /yasemindeneme/deneme.txt
>>>> at
>>>> org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:45)
>>>> at
>>>> org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:55)
>>>>
>>>>
>>>> I try 2 options
>>>> *sc.hadoopConfiguration() *and
>>>> *sc.textFile("s3n://AWS_ACCESS_KEY_ID:AWS_SECRET_ACCESS_KEY@/yasemindeneme/deneme.txt/");*
>>>>
>>>> Also I did export AWS_ACCESS_KEY_ID= .
>>>>  export AWS_SECRET_ACCESS_KEY=
>>>> But there is no change about error.
>>>>
>>>> Could you please help me about this issue?
>>>>
>>>>
>>>> --
>>>> hiç ender hiç
>>>>
>>>
>>


Re: Correct way to use spark streaming with apache zeppelin

2016-03-13 Thread Chris Miller
Cool! Thanks for sharing.


--
Chris Miller

On Sun, Mar 13, 2016 at 12:53 AM, Todd Nist <tsind...@gmail.com> wrote:

> Below is a link to an example which Silvio Fiorito put together
> demonstrating how to link Zeppelin with Spark Stream for real-time charts.
> I think the original thread was pack in early November 2015, subject: Real
> time chart in Zeppelin, if you care to try to find it.
>
> https://gist.github.com/granturing/a09aed4a302a7367be92
>
> HTH.
>
> -Todd
>
> On Sat, Mar 12, 2016 at 6:21 AM, Chris Miller <cmiller11...@gmail.com>
> wrote:
>
>> I'm pretty new to all of this stuff, so bare with me.
>>
>> Zeppelin isn't really intended for realtime dashboards as far as I know.
>> Its reporting features (tables, graphs, etc.) are more for displaying the
>> results from the output of something. As far as I know, there isn't really
>> anything to "watch" a dataset and have updates pushed to the Zeppelin UI.
>>
>> As for Spark, unless you're doing a lot of processing that you didn't
>> mention here, I don't think it's a good fit just for this.
>>
>> If it were me (just off the top of my head), I'd just build a simple web
>> service that uses websockets to push updates to the client which could then
>> be used to update graphs, tables, etc. The data itself -- that is, the
>> accumulated totals -- you could store in something like Redis. When an
>> order comes in, just add that quantity and price to the existing value and
>> trigger your code to push out an updated value to any clients via the
>> websocket. You could use something like a Redis pub/sub channel to trigger
>> the web app to notify clients of an update.
>>
>> There are about 5 million other ways you could design this, but I would
>> just keep it as simple as possible. I just threw one idea out...
>>
>> Good luck.
>>
>>
>> --
>> Chris Miller
>>
>> On Sat, Mar 12, 2016 at 6:58 PM, trung kien <kient...@gmail.com> wrote:
>>
>>> Thanks Chris and Mich for replying.
>>>
>>> Sorry for not explaining my problem clearly.  Yes i am talking about a
>>> flexibke dashboard when mention Zeppelin.
>>>
>>> Here is the problem i am having:
>>>
>>> I am running a comercial website where we selle many products and we
>>> have many branchs in many place. We have a lots of realtime transactions
>>> and want to anaylyze it in realtime.
>>>
>>> We dont want every time doing analytics we have to aggregate every
>>> single transactions ( each transaction have BranchID, ProductID, Qty,
>>> Price). So, we maintain intermediate data which contains : BranchID,
>>> ProducrID, totalQty, totalDollar
>>>
>>> Ideally, we have 2 tables:
>>>Transaction ( BranchID, ProducrID, Qty, Price, Timestamp)
>>>
>>> And intermediate table Stats is just sum of every transaction group by
>>> BranchID and ProductID( i am using Sparkstreaming to calculate this table
>>> realtime)
>>>
>>> My thinking is that doing statistics ( realtime dashboard)  on Stats
>>> table is much easier, this table is also not enough for maintain.
>>>
>>> I'm just wondering, whats the best way to store Stats table( a database
>>> or parquet file?)
>>> What exactly are you trying to do? Zeppelin is for interactive analysis
>>> of a dataset. What do you mean "realtime analytics" -- do you mean build a
>>> report or dashboard that automatically updates as new data comes in?
>>>
>>>
>>> --
>>> Chris Miller
>>>
>>> On Sat, Mar 12, 2016 at 3:13 PM, trung kien <kient...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I've just viewed some Zeppenlin's videos. The intergration between
>>>> Zeppenlin and Spark is really amazing and i want to use it for my
>>>> application.
>>>>
>>>> In my app, i will have a Spark streaming app to do some basic realtime
>>>> aggregation ( intermediate data). Then i want to use Zeppenlin to do some
>>>> realtime analytics on the intermediate data.
>>>>
>>>> My question is what's the most efficient storage engine to store
>>>> realtime intermediate data? Is parquet file somewhere is suitable?
>>>>
>>>
>>>
>>
>


Re: Correct way to use spark streaming with apache zeppelin

2016-03-12 Thread Chris Miller
I'm pretty new to all of this stuff, so bare with me.

Zeppelin isn't really intended for realtime dashboards as far as I know.
Its reporting features (tables, graphs, etc.) are more for displaying the
results from the output of something. As far as I know, there isn't really
anything to "watch" a dataset and have updates pushed to the Zeppelin UI.

As for Spark, unless you're doing a lot of processing that you didn't
mention here, I don't think it's a good fit just for this.

If it were me (just off the top of my head), I'd just build a simple web
service that uses websockets to push updates to the client which could then
be used to update graphs, tables, etc. The data itself -- that is, the
accumulated totals -- you could store in something like Redis. When an
order comes in, just add that quantity and price to the existing value and
trigger your code to push out an updated value to any clients via the
websocket. You could use something like a Redis pub/sub channel to trigger
the web app to notify clients of an update.

There are about 5 million other ways you could design this, but I would
just keep it as simple as possible. I just threw one idea out...

Good luck.


--
Chris Miller

On Sat, Mar 12, 2016 at 6:58 PM, trung kien <kient...@gmail.com> wrote:

> Thanks Chris and Mich for replying.
>
> Sorry for not explaining my problem clearly.  Yes i am talking about a
> flexibke dashboard when mention Zeppelin.
>
> Here is the problem i am having:
>
> I am running a comercial website where we selle many products and we have
> many branchs in many place. We have a lots of realtime transactions and
> want to anaylyze it in realtime.
>
> We dont want every time doing analytics we have to aggregate every single
> transactions ( each transaction have BranchID, ProductID, Qty, Price). So,
> we maintain intermediate data which contains : BranchID, ProducrID,
> totalQty, totalDollar
>
> Ideally, we have 2 tables:
>Transaction ( BranchID, ProducrID, Qty, Price, Timestamp)
>
> And intermediate table Stats is just sum of every transaction group by
> BranchID and ProductID( i am using Sparkstreaming to calculate this table
> realtime)
>
> My thinking is that doing statistics ( realtime dashboard)  on Stats table
> is much easier, this table is also not enough for maintain.
>
> I'm just wondering, whats the best way to store Stats table( a database or
> parquet file?)
> What exactly are you trying to do? Zeppelin is for interactive analysis of
> a dataset. What do you mean "realtime analytics" -- do you mean build a
> report or dashboard that automatically updates as new data comes in?
>
>
> --
> Chris Miller
>
> On Sat, Mar 12, 2016 at 3:13 PM, trung kien <kient...@gmail.com> wrote:
>
>> Hi all,
>>
>> I've just viewed some Zeppenlin's videos. The intergration between
>> Zeppenlin and Spark is really amazing and i want to use it for my
>> application.
>>
>> In my app, i will have a Spark streaming app to do some basic realtime
>> aggregation ( intermediate data). Then i want to use Zeppenlin to do some
>> realtime analytics on the intermediate data.
>>
>> My question is what's the most efficient storage engine to store realtime
>> intermediate data? Is parquet file somewhere is suitable?
>>
>
>


Re: Repeating Records w/ Spark + Avro?

2016-03-12 Thread Chris Miller
Well, I kind of got it... this works below:

*
val rdd = sc.newAPIHadoopFile(path, classOf[AvroKeyInputFormat[GenericRecord]],
classOf[AvroKey[GenericRecord]], classOf[NullWritable]).map(_._1.datum)

rdd
  .map(item => {
val item = i.copy()
val record = i._1.datum()

println(record.get("myValue"))
  })
  .take(10)
*

Seems strange to me that I have to iterate over the RDD effectively two
times -- one to create the RDD, and another to perform my action. It also
seems strange that I can't actually access the data in my RDD until I've
copied the records. I would think this is a *very* common use case of an
RDD -- accessing the data it contains (otherwise, what's the point?).

Is there a way to always enable cloning? There used to be a cloneRecords
parameter on the hadoopfile method, but that seems to have been removed.

Finally, if I add rdd.persist(), then it doesn't work. I guess I would need
to do .map(_._1.datum) again before the map that does the real work.


--
Chris Miller

On Sat, Mar 12, 2016 at 4:15 PM, Chris Miller <cmiller11...@gmail.com>
wrote:

> Wow! That sure is buried in the documentation! But yeah, that's what I
> thought more or less.
>
> I tried copying as follows, but that didn't work.
>
> *
> val copyRDD = singleFileRDD.map(_.copy())
> *
>
> When I iterate over the new copyRDD (foreach or map), I still have the
> same problem of duplicate records. I also tried copying within the block
> where I'm using it, but that didn't work either:
>
> *
> rdd
>   .take(10)
>   .collect()
>   .map(item => {
> val item = i.copy()
> val record = i._1.datum()
>
> println(record.get("myValue"))
>   })
> *
>
> What am I doing wrong?
>
> --
> Chris Miller
>
> On Sat, Mar 12, 2016 at 1:48 PM, Peyman Mohajerian <mohaj...@gmail.com>
> wrote:
>
>> Here is the reason for the behavior:
>> '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable
>> object for each record, directly caching the returned RDD or directly
>> passing it to an aggregation or shuffle operation will create many
>> references to the same object. If you plan to directly cache, sort, or
>> aggregate Hadoop writable objects, you should first copy them using a map
>>  function.
>>
>>
>> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/SparkContext.html
>>
>> So it is Hadoop related.
>>
>> On Fri, Mar 11, 2016 at 3:19 PM, Chris Miller <cmiller11...@gmail.com>
>> wrote:
>>
>>> I have a bit of a strange situation:
>>>
>>> *
>>> import org.apache.avro.generic.{GenericData, GenericRecord}
>>> import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper, AvroKey}
>>> import org.apache.avro.mapreduce.AvroKeyInputFormat
>>> import org.apache.hadoop.io.{NullWritable, WritableUtils}
>>>
>>> val path = "/path/to/data.avro"
>>>
>>> val rdd = sc.newAPIHadoopFile(path,
>>> classOf[AvroKeyInputFormat[GenericRecord]],
>>> classOf[AvroKey[GenericRecord]], classOf[NullWritable])
>>> rdd.take(10).foreach( x => println( x._1.datum() ))
>>> *
>>>
>>> In this situation, I get the right number of records returned, and if I
>>> look at the contents of rdd I see the individual records as tuple2's...
>>> however, if I println on each one as shown above, I get the same result
>>> every time.
>>>
>>> Apparently this has to do with something in Spark or Avro keeping a
>>> reference to the item its iterating over, so I need to clone the object
>>> before I use it. However, if I try to clone it (from the spark-shell
>>> console), I get:
>>>
>>> *
>>> rdd.take(10).foreach( x => {
>>>   val clonedDatum = x._1.datum().clone()
>>>   println(clonedDatum.datum())
>>> })
>>>
>>> :37: error: method clone in class Object cannot be accessed in
>>> org.apache.avro.generic.GenericRecord
>>>  Access to protected method clone not permitted because
>>>  prefix type org.apache.avro.generic.GenericRecord does not conform to
>>>  class $iwC where the access take place
>>> val clonedDatum = x._1.datum().clone()
>>> *
>>>
>>> So, how can I clone the datum?
>>>
>>> Seems I'm not the only one who ran into this problem:
>>> https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/102. I
>>> can't figure out how to fix it in my case without hacking away like the
>>> person in the linked PR did.
>>>
>>> Suggestions?
>>>
>>> --
>>> Chris Miller
>>>
>>
>>
>


Re: Repeating Records w/ Spark + Avro?

2016-03-12 Thread Chris Miller
Wow! That sure is buried in the documentation! But yeah, that's what I
thought more or less.

I tried copying as follows, but that didn't work.

*
val copyRDD = singleFileRDD.map(_.copy())
*

When I iterate over the new copyRDD (foreach or map), I still have the same
problem of duplicate records. I also tried copying within the block where
I'm using it, but that didn't work either:

*
rdd
  .take(10)
  .collect()
  .map(item => {
val item = i.copy()
val record = i._1.datum()

println(record.get("myValue"))
  })
*

What am I doing wrong?

--
Chris Miller

On Sat, Mar 12, 2016 at 1:48 PM, Peyman Mohajerian <mohaj...@gmail.com>
wrote:

> Here is the reason for the behavior:
> '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable
> object for each record, directly caching the returned RDD or directly
> passing it to an aggregation or shuffle operation will create many
> references to the same object. If you plan to directly cache, sort, or
> aggregate Hadoop writable objects, you should first copy them using a map
>  function.
>
>
> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/SparkContext.html
>
> So it is Hadoop related.
>
> On Fri, Mar 11, 2016 at 3:19 PM, Chris Miller <cmiller11...@gmail.com>
> wrote:
>
>> I have a bit of a strange situation:
>>
>> *
>> import org.apache.avro.generic.{GenericData, GenericRecord}
>> import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper, AvroKey}
>> import org.apache.avro.mapreduce.AvroKeyInputFormat
>> import org.apache.hadoop.io.{NullWritable, WritableUtils}
>>
>> val path = "/path/to/data.avro"
>>
>> val rdd = sc.newAPIHadoopFile(path,
>> classOf[AvroKeyInputFormat[GenericRecord]],
>> classOf[AvroKey[GenericRecord]], classOf[NullWritable])
>> rdd.take(10).foreach( x => println( x._1.datum() ))
>> *
>>
>> In this situation, I get the right number of records returned, and if I
>> look at the contents of rdd I see the individual records as tuple2's...
>> however, if I println on each one as shown above, I get the same result
>> every time.
>>
>> Apparently this has to do with something in Spark or Avro keeping a
>> reference to the item its iterating over, so I need to clone the object
>> before I use it. However, if I try to clone it (from the spark-shell
>> console), I get:
>>
>> *
>> rdd.take(10).foreach( x => {
>>   val clonedDatum = x._1.datum().clone()
>>   println(clonedDatum.datum())
>> })
>>
>> :37: error: method clone in class Object cannot be accessed in
>> org.apache.avro.generic.GenericRecord
>>  Access to protected method clone not permitted because
>>  prefix type org.apache.avro.generic.GenericRecord does not conform to
>>  class $iwC where the access take place
>> val clonedDatum = x._1.datum().clone()
>> *********
>>
>> So, how can I clone the datum?
>>
>> Seems I'm not the only one who ran into this problem:
>> https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/102. I
>> can't figure out how to fix it in my case without hacking away like the
>> person in the linked PR did.
>>
>> Suggestions?
>>
>> --
>> Chris Miller
>>
>
>


Re: Correct way to use spark streaming with apache zeppelin

2016-03-12 Thread Chris Miller
What exactly are you trying to do? Zeppelin is for interactive analysis of
a dataset. What do you mean "realtime analytics" -- do you mean build a
report or dashboard that automatically updates as new data comes in?


--
Chris Miller

On Sat, Mar 12, 2016 at 3:13 PM, trung kien <kient...@gmail.com> wrote:

> Hi all,
>
> I've just viewed some Zeppenlin's videos. The intergration between
> Zeppenlin and Spark is really amazing and i want to use it for my
> application.
>
> In my app, i will have a Spark streaming app to do some basic realtime
> aggregation ( intermediate data). Then i want to use Zeppenlin to do some
> realtime analytics on the intermediate data.
>
> My question is what's the most efficient storage engine to store realtime
> intermediate data? Is parquet file somewhere is suitable?
>


Repeating Records w/ Spark + Avro?

2016-03-11 Thread Chris Miller
I have a bit of a strange situation:

*
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper, AvroKey}
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.{NullWritable, WritableUtils}

val path = "/path/to/data.avro"

val rdd = sc.newAPIHadoopFile(path,
classOf[AvroKeyInputFormat[GenericRecord]],
classOf[AvroKey[GenericRecord]], classOf[NullWritable])
rdd.take(10).foreach( x => println( x._1.datum() ))
*

In this situation, I get the right number of records returned, and if I
look at the contents of rdd I see the individual records as tuple2's...
however, if I println on each one as shown above, I get the same result
every time.

Apparently this has to do with something in Spark or Avro keeping a
reference to the item its iterating over, so I need to clone the object
before I use it. However, if I try to clone it (from the spark-shell
console), I get:

*
rdd.take(10).foreach( x => {
  val clonedDatum = x._1.datum().clone()
  println(clonedDatum.datum())
})

:37: error: method clone in class Object cannot be accessed in
org.apache.avro.generic.GenericRecord
 Access to protected method clone not permitted because
 prefix type org.apache.avro.generic.GenericRecord does not conform to
 class $iwC where the access take place
val clonedDatum = x._1.datum().clone()
*

So, how can I clone the datum?

Seems I'm not the only one who ran into this problem:
https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/102. I can't
figure out how to fix it in my case without hacking away like the person in
the linked PR did.

Suggestions?

--
Chris Miller


Re: Avro SerDe Issue w/ Manual Partitions?

2016-03-06 Thread Chris Miller
For anyone running into this same issue, it looks like Avro deserialization
is just broken when used with SparkSQL and partitioned schemas. I created
an bug report with details and a simplified example on how to reproduce:
https://issues.apache.org/jira/browse/SPARK-13709


--
Chris Miller

On Fri, Mar 4, 2016 at 12:11 AM, Chris Miller <cmiller11...@gmail.com>
wrote:

> One more thing -- just to set aside any question about my specific schema
> or data, I used the sample schema and data record from Oracle's
> documentation on Avro support. It's a pretty simple schema:
> https://docs.oracle.com/cd/E26161_02/html/GettingStartedGuide/jsonbinding-overview.html
>
> When I create a table with this schema and then try to query the
> Avro-encoded record, I get the same type of error:
>
> 
>  org.apache.avro.AvroTypeException: Found avro.FullName, expecting union
> at
> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
> at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
> at
> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
> at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
> at
> org.apache.hadoop.hive.serde2.avro.AvroDeserializer$SchemaReEncoder.reencode(AvroDeserializer.java:111)
> at
> org.apache.hadoop.hive.serde2.avro.AvroDeserializer.deserialize(AvroDeserializer.java:175)
> at
> org.apache.hadoop.hive.serde2.avro.AvroSerDe.deserialize(AvroSerDe.java:201)
> at
> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:409)
> at
> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:408)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at scala.collection.TraversableOnce$class.to
> (TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 
>
> To me, this "feels" like a bug -- I just can't identify if it's a Spark
> issue or an Avro issue. Decoding the same files work fine with Hive, and I
> imagine the same deserializer code is used there too.
>
> Thoughts?
>
> --
> Chris Miller
>
> On Thu, Mar 3, 2016 at 9:38 PM, Igor Berman <igor.ber...@gmail.com> wrote:
>
>> your field name is
>> *enum1_values*
>>
>> but you have data
>> { "foo1": "test123", *"enum1"*: "BLUE" }
>>
>> i.e. since you defined enum and not union(null, enum)
>> it tries to find value for enum1_values and doesn't find one...
>>
>> On 3 March 2016 at 11:30, Chris Miller <cmiller11...@gmail.com>

Re: Is Spark right for us?

2016-03-06 Thread Chris Miller
Gut instinct is no, Spark is overkill for your needs... you should be able
to accomplish all of that with a relational database or a column oriented
database (depending on the types of queries you most frequently run and the
performance requirements).

--
Chris Miller

On Mon, Mar 7, 2016 at 1:17 AM, Laumegui Deaulobi <
guillaume.bilod...@gmail.com> wrote:

> Our problem space is survey analytics.  Each survey comprises a set of
> questions, with each question having a set of possible answers.  Survey
> fill-out tasks are sent to users, who have until a certain date to complete
> it.  Based on these survey fill-outs, reports need to be generated.  Each
> report deals with a subset of the survey fill-outs, and comprises a set of
> data points (average rating for question 1, min/max for question 2, etc.)
>
> We are dealing with rather large data sets - although reading the internet
> we get the impression that everyone is analyzing petabytes of data...
>
> Users: up to 100,000
> Surveys: up to 100,000
> Questions per survey: up to 100
> Possible answers per question: up to 10
> Survey fill-outs / user: up to 10
> Reports: up to 100,000
> Data points per report: up to 100
>
> Data is currently stored in a relational database but a migration to a
> different kind of store is possible.
>
> The naive algorithm for report generation can be summed up as this:
>
> for each report to be generated {
>   for each report data point to be calculated {
> calculate data point
> add data point to report
>   }
>   publish report
> }
>
> In order to deal with the upper limits of these values, we will need to
> distribute this algorithm to a compute / data cluster as much as possible.
>
> I've read about frameworks such as Apache Spark but also Hadoop, GridGain,
> HazelCast and several others, and am still confused as to how each of these
> can help us and how they fit together.
>
> Is Spark the right framework for us?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-right-for-us-tp26412.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: MLLib + Streaming

2016-03-06 Thread Chris Miller
Guru:This is a really great response. Thanks for taking the time to explain
all of this. Helpful for me too.


--
Chris Miller

On Sun, Mar 6, 2016 at 1:54 PM, Guru Medasani <gdm...@gmail.com> wrote:

> Hi Lan,
>
> Streaming Means, Linear Regression and Logistic Regression support online
> machine learning as you mentioned. Online machine learning is where model
> is being trained and updated on every batch of streaming data. These models
> have trainOn() and predictOn() methods where you can simply pass in
> DStreams you want to train the model on and DStreams you want the model to
> predict on. So when the next batch of data arrives model is trained and
> updated again. In this case model weights are continually updated and
> hopefully model performs better in terms of convergence and accuracy over
> time. What we are really trying to do in online learning case is that we
> are only showing few examples of the data at a time ( stream of data) and
> updating the parameters in case of Linear and Logistic Regression and
> updating the centers in case of K-Means. In the case of Linear or Logistic
> Regression this is possible due to the optimizer that is chosen for
> minimizing the cost function which is Stochastic Gradient Descent. This
> optimizer helps us to move closer and closer to the optimal weights after
> every batch and over the time we will have a model that has learned how to
> represent our data and predict well.
>
> In the scenario of using any MLlib algorithms and doing training with
> DStream.transform() and DStream.foreachRDD() operations, when the first
> batch of data arrives we build a model, let’s call this model1. Once you
> have the model1 you can make predictions on the same DStream or a different
> DStream source. But for the next batch if you follow the same procedure and
> create a model, let’s call this model2. This model2 will be significantly
> different than model1 based on how different the data is in the second
> DStream vs the first DStream as it is not continually updating the model.
> It’s like weight vectors are jumping from one place to the other for every
> batch and we never know if the algorithm is converging to the optimal
> weights. So I believe it is not possible to do true online learning with
> other MLLib models in Spark Streaming.  I am not sure if this is because
> the models don’t generally support this streaming scenarios or if the
> streaming versions simply haven’t been implemented yet.
>
> Though technically you can use any of the MLlib algorithms in Spark
> Streaming with the procedure you mentioned and make predictions, it is
> important to figure out if the model you are choosing can converge by
> showing only a subset(batches  - DStreams) of the data over time. Based on
> the algorithm you choose certain optimizers won’t necessarily be able to
> converge by showing only individual data points and require to see majority
> of the data to be able to learn optimal weights.  In these cases, you can
> still do offline learning/training with Spark bach processing using any of
> the MLlib algorithms and save those models on hdfs. You can then start a
> streaming job and load these saved models into your streaming application
> and make predictions. This is traditional offline learning.
>
> In general, online learning is hard as it’s hard to evaluate since we are
> not holding any test data during the model training. We are simply training
> the model and predicting. So in the initial batches, results can vary quite
> a bit and have significant errors in terms of the predictions. So choosing
> online learning vs. offline learning depends on how much tolerance the
> application can have towards wild predictions in the beginning. Offline
> training is simple and cheap where as online training can be hard and needs
> to be constantly monitored to see how it is performing.
>
> Hope this helps in understanding offline learning vs. online learning and
> which algorithms you can choose for online learning in MLlib.
>
> Guru Medasani
> gdm...@gmail.com
>
>
>
> > On Mar 5, 2016, at 7:37 PM, Lan Jiang <ljia...@gmail.com> wrote:
> >
> > Hi, there
> >
> > I hope someone can clarify this for me.  It seems that some of the MLlib
> algorithms such as KMean, Linear Regression and Logistics Regression have a
> Streaming version, which can do online machine learning. But does that mean
> other MLLib algorithm cannot be used in Spark streaming applications, such
> as random forest, SVM, collaborate filtering, etc??
> >
> > DStreams are essentially a sequence of RDDs. We can use
> DStream.transform() and DStream.foreachRDD() operations, which allows you
> access RDDs in a DStream and apply MLLib function

Re: Best way to merge files from streaming jobs‏ on S3

2016-03-04 Thread Chris Miller
Why does the order matter? Coalesce runs in parallel and if it's just
writing to the file, then I imagine it would do it in whatever order it
happens to be executed in each thread. If you want to sort the resulting
data, I imagine you'd need to save it to some sort of data structure
instead of writing to the file from coalesce, sort that data structure,
then write your file.


--
Chris Miller

On Sat, Mar 5, 2016 at 5:24 AM, jelez <je...@hotmail.com> wrote:

> My streaming job is creating files on S3.
> The problem is that those files end up very small if I just write them to
> S3
> directly.
> This is why I use coalesce() to reduce the number of files and make them
> larger.
>
> However, coalesce shuffles data and my job processing time ends up higher
> than sparkBatchIntervalMilliseconds.
>
> I have observed that if I coalesce the number of partitions to be equal to
> the cores in the cluster I get less shuffling - but that is
> unsubstantiated.
> Is there any dependency/rule between number of executors, number of cores
> etc. that I can use to minimize shuffling and at the same time achieve
> minimum number of output files per batch?
> What is the best practice?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-merge-files-from-streaming-jobs-on-S3-tp26400.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Avro SerDe Issue w/ Manual Partitions?

2016-03-03 Thread Chris Miller
One more thing -- just to set aside any question about my specific schema
or data, I used the sample schema and data record from Oracle's
documentation on Avro support. It's a pretty simple schema:
https://docs.oracle.com/cd/E26161_02/html/GettingStartedGuide/jsonbinding-overview.html

When I create a table with this schema and then try to query the
Avro-encoded record, I get the same type of error:


 org.apache.avro.AvroTypeException: Found avro.FullName, expecting union
at
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
at
org.apache.hadoop.hive.serde2.avro.AvroDeserializer$SchemaReEncoder.reencode(AvroDeserializer.java:111)
at
org.apache.hadoop.hive.serde2.avro.AvroDeserializer.deserialize(AvroDeserializer.java:175)
at
org.apache.hadoop.hive.serde2.avro.AvroSerDe.deserialize(AvroSerDe.java:201)
at
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:409)
at
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:408)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


To me, this "feels" like a bug -- I just can't identify if it's a Spark
issue or an Avro issue. Decoding the same files work fine with Hive, and I
imagine the same deserializer code is used there too.

Thoughts?

--
Chris Miller

On Thu, Mar 3, 2016 at 9:38 PM, Igor Berman <igor.ber...@gmail.com> wrote:

> your field name is
> *enum1_values*
>
> but you have data
> { "foo1": "test123", *"enum1"*: "BLUE" }
>
> i.e. since you defined enum and not union(null, enum)
> it tries to find value for enum1_values and doesn't find one...
>
> On 3 March 2016 at 11:30, Chris Miller <cmiller11...@gmail.com> wrote:
>
>> I've been digging into this a little deeper. Here's what I've found:
>>
>> test1.avsc:
>> 
>> {
>>   "namespace": "com.cmiller",
>>   "name": "test1",
>>   "type": "record",
>>   "fields": [
>> { "name":"foo1", "type":"string" }
>>   ]
>> }
>> 
>>
>> test2.avsc:
>> 
>> {
>>   "namespace": "com.cmiller",
>>   "name": "test1",
>>   "type": "record",
>>   "fields": [
>> { "name":"foo1", "type":"string"

Re: Avro SerDe Issue w/ Manual Partitions?

2016-03-03 Thread Chris Miller
No, the name of the field is *enum1* -- the name of the field's type is
*enum1_values*. It should not be looking for enum1_values -- that's not the
way the specification states that the standard works, and it's not how any
other implementation reads Avro data.

For what it's worth, if I change enum1 to enum1_values, the data fails to
encode (as it should):


$ avro-tools fromjson --schema-file=test.avsc test.json > test.avro
Exception in thread "main" org.apache.avro.AvroTypeException: Expected
field name not found: enum1
at org.apache.avro.io.JsonDecoder.doAction(JsonDecoder.java:477)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.JsonDecoder.advance(JsonDecoder.java:139)
at org.apache.avro.io.JsonDecoder.readEnum(JsonDecoder.java:332)
at
org.apache.avro.io.ResolvingDecoder.readEnum(ResolvingDecoder.java:256)
at
org.apache.avro.generic.GenericDatumReader.readEnum(GenericDatumReader.java:199)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
at org.apache.avro.tool.DataFileWriteTool.run(DataFileWriteTool.java:99)
at org.apache.avro.tool.Main.run(Main.java:84)
at org.apache.avro.tool.Main.main(Main.java:73)


Any other ideas?


--
Chris Miller

On Thu, Mar 3, 2016 at 9:38 PM, Igor Berman <igor.ber...@gmail.com> wrote:

> your field name is
> *enum1_values*
>
> but you have data
> { "foo1": "test123", *"enum1"*: "BLUE" }
>
> i.e. since you defined enum and not union(null, enum)
> it tries to find value for enum1_values and doesn't find one...
>
> On 3 March 2016 at 11:30, Chris Miller <cmiller11...@gmail.com> wrote:
>
>> I've been digging into this a little deeper. Here's what I've found:
>>
>> test1.avsc:
>> 
>> {
>>   "namespace": "com.cmiller",
>>   "name": "test1",
>>   "type": "record",
>>   "fields": [
>> { "name":"foo1", "type":"string" }
>>   ]
>> }
>> 
>>
>> test2.avsc:
>> 
>> {
>>   "namespace": "com.cmiller",
>>   "name": "test1",
>>   "type": "record",
>>   "fields": [
>> { "name":"foo1", "type":"string" },
>> { "name":"enum1", "type": { "type":"enum", "name":"enum1_values",
>> "symbols":["BLUE","RED", "GREEN"]} }
>>   ]
>> }
>> 
>>
>> test1.json (encoded and saved to test/test1.avro):
>> 
>> { "foo1": "test123" }
>> 
>>
>> test2.json (encoded and saved to test/test1.avro):
>> 
>> { "foo1": "test123", "enum1": "BLUE" }
>> 
>>
>> Here is how I create the tables and add the data:
>>
>> 
>> CREATE TABLE test1
>> PARTITIONED BY (ds STRING)
>> ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
>> STORED AS INPUTFORMAT
>> 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
>> OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
>> TBLPROPERTIES ('avro.schema.url'='hdfs:///path/to/schemas/test1.avsc');
>>
>> ALTER TABLE test1 ADD PARTITION (ds='1') LOCATION
>> 's3://spark-data/dev/test1';
>>
>>
>> CREATE TABLE test2
>> PARTITIONED BY (ds STRING)
>> ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
>> STORED AS INPUTFORMAT
>> 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
>> OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
>> TBLPROPERTIES ('avro.schema.url'='hdfs:///path/to/schemas/test2.avsc');
>>
>> ALTER TABLE test2 ADD PARTITION (ds='1') LOCATION
>> 's3://spark-data/dev/test2';
>> 
>>
>> And here's what I get:
>>
>> 
>> SELECT * FROM test1;
>> -- works fine, shows data
>>
>> SELECT *

Re: Avro SerDe Issue w/ Manual Partitions?

2016-03-03 Thread Chris Miller
e.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


In addition to the above, I also tried putting the test Avro files on HDFS
instead of S3 -- the error is the same. I also tried querying from Scala
instead of using Zeppelin, and I get the same error.

Where should I begin with troubleshooting this problem? This same query
runs fine on Hive. Based on the error, it appears to be something in the
deserializer though... but if it were a bug in the Avro deserializer, why
does it only appear with Spark? I imagine Hive queries would be using the
same deserializer, no?

Thanks!



--
Chris Miller

On Thu, Mar 3, 2016 at 4:33 AM, Chris Miller <cmiller11...@gmail.com> wrote:

> Hi,
>
> I have a strange issue occurring when I use manual partitions.
>
> If I create a table as follows, I am able to query the data with no
> problem:
>
> 
> CREATE TABLE test1
> ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
> STORED AS INPUTFORMAT
> 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
> OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
> LOCATION 's3://analytics-bucket/prod/logs/avro/2016/03/02/'
> TBLPROPERTIES ('avro.schema.url'='hdfs:///data/schemas/schema.avsc');
> 
>
> If I create the table like this, however, and then add a partition with a
> LOCATION specified, I am unable to query:
>
> 
> CREATE TABLE test2
> PARTITIONED BY (ds STRING)
> ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
> STORED AS INPUTFORMAT
> 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
> OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
> TBLPROPERTIES ('avro.schema.url'='hdfs:///data/schemas/schema.avsc');
>
> ALTER TABLE test7 ADD PARTITION (ds='1') LOCATION
> 's3://analytics-bucket/prod/logs/avro/2016/03/02/';
> 
>
> This is what happens
>
> 
> SELECT * FROM test2 LIMIT 1;
>
> org.apache.avro.AvroTypeException: Found ActionEnum, expecting union
> at
> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
> at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
> at
> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
> at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
> at
> org.apache.hadoop.hive.serde2.avro.AvroDeserializer$SchemaReEncoder.reencode(AvroDeserializer.java:111)
> at
> org.apache.hadoop.hive.serde2.avro.AvroDeserializer.deserialize(AvroDeserializer.java:175)
> at
> org.apache.hadoop.hive.serde2.avro.AvroSerDe.deserialize(AvroSerDe.java:201)
> at
> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:409)
> at
> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:408)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at scala.collection.TraversableOnce$class.to
> (TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
> at
> org.apache.spark.SparkConte

Avro SerDe Issue w/ Manual Partitions?

2016-03-02 Thread Chris Miller
Hi,

I have a strange issue occurring when I use manual partitions.

If I create a table as follows, I am able to query the data with no problem:


CREATE TABLE test1
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION 's3://analytics-bucket/prod/logs/avro/2016/03/02/'
TBLPROPERTIES ('avro.schema.url'='hdfs:///data/schemas/schema.avsc');


If I create the table like this, however, and then add a partition with a
LOCATION specified, I am unable to query:


CREATE TABLE test2
PARTITIONED BY (ds STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
TBLPROPERTIES ('avro.schema.url'='hdfs:///data/schemas/schema.avsc');

ALTER TABLE test7 ADD PARTITION (ds='1') LOCATION
's3://analytics-bucket/prod/logs/avro/2016/03/02/';


This is what happens


SELECT * FROM test2 LIMIT 1;

org.apache.avro.AvroTypeException: Found ActionEnum, expecting union
at
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
at
org.apache.hadoop.hive.serde2.avro.AvroDeserializer$SchemaReEncoder.reencode(AvroDeserializer.java:111)
at
org.apache.hadoop.hive.serde2.avro.AvroDeserializer.deserialize(AvroDeserializer.java:175)
at
org.apache.hadoop.hive.serde2.avro.AvroSerDe.deserialize(AvroSerDe.java:201)
at
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:409)
at
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:408)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


The data is exactly the same, and I can still go back and query the test1
table without issue. I don't have control over the directory structure, so
I need to add the partitions manually so that I can specify a location.

For what it's worth, "ActionEnum" is the first field in my schema. This
same table and query structure works fine with Hive. When I try to run this
with SparkSQL, however, I get the above error.

Anyone have any idea what the problem is here? Thanks!

--
Chris Miller