Re: How to remove empty strings from JavaRDD
flatmap? -- Chris Miller On Thu, Apr 7, 2016 at 10:25 PM, greg huang <debin.hu...@gmail.com> wrote: > Hi All, > >Can someone give me a example code to get rid of the empty string in > JavaRDD? I kwon there is a filter method in JavaRDD: > https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/rdd/RDD.html#filter(scala.Function1) > > Regards, >Greg >
Re: Spark schema evolution
With Avro you solve this by using a default value for the new field... maybe Parquet is the same? -- Chris Miller On Tue, Mar 22, 2016 at 9:34 PM, gtinside <gtins...@gmail.com> wrote: > Hi , > > I have a table sourced from* 2 parquet files* with few extra columns in one > of the parquet file. Simple * queries works fine but queries with predicate > on extra column doesn't work and I get column not found > > *Column resp_party_type exist in just one parquet file* > > a) Query that work : > select resp_party_type from operational_analytics > > b) Query that doesn't work : (complains about missing column > *resp_party_type *) > select category as Events, resp_party as Team, count(*) as Total from > operational_analytics where application = 'PeopleMover' and resp_party_type > = 'Team' group by category, resp_party > > *Query Plan for (b)* > == Physical Plan == > TungstenAggregate(key=[category#30986,resp_party#31006], > functions=[(count(1),mode=Final,isDistinct=false)], > output=[Events#36266,Team#36267,Total#36268L]) > TungstenExchange hashpartitioning(category#30986,resp_party#31006) > TungstenAggregate(key=[category#30986,resp_party#31006], > functions=[(count(1),mode=Partial,isDistinct=false)], > output=[category#30986,resp_party#31006,currentCount#36272L]) >Project [category#30986,resp_party#31006] > Filter ((application#30983 = PeopleMover) && (resp_party_type#31007 = > Team)) > Scan > > ParquetRelation[snackfs://tst:9042/aladdin_data_beta/operational_analytics/operational_analytics_peoplemover.parquet,snackfs://tst:9042/aladdin_data_beta/operational_analytics/operational_analytics_mis.parquet][category#30986,resp_party#31006,application#30983,resp_party_type#31007] > > > I have set spark.sql.parquet.mergeSchema = true and > spark.sql.parquet.filterPushdown = true. When I set > spark.sql.parquet.filterPushdown = false Query (b) starts working, > execution > plan after setting the filterPushdown = false for Query(b) > > == Physical Plan == > TungstenAggregate(key=[category#30986,resp_party#31006], > functions=[(count(1),mode=Final,isDistinct=false)], > output=[Events#36313,Team#36314,Total#36315L]) > TungstenExchange hashpartitioning(category#30986,resp_party#31006) > TungstenAggregate(key=[category#30986,resp_party#31006], > functions=[(count(1),mode=Partial,isDistinct=false)], > output=[category#30986,resp_party#31006,currentCount#36319L]) >Project [category#30986,resp_party#31006] > Filter ((application#30983 = PeopleMover) && (resp_party_type#31007 = > Team)) > Scan > > ParquetRelation[snackfs://tst:9042/aladdin_data_beta/operational_analytics/operational_analytics_peoplemover.parquet,snackfs://tst:9042/aladdin_data_beta/operational_analytics/operational_analytics_mis.parquet][category#30986,resp_party#31006,application#30983,resp_party_type#31007] > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-schema-evolution-tp26563.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: newbie HDFS S3 best practices
If you have lots of small files, distcp should handle that well -- it's supposed to distribute the transfer of files across the nodes in your cluster. Conductor looks interesting if you're trying to distribute the transfer of single, large file(s)... right? -- Chris Miller On Wed, Mar 16, 2016 at 4:43 AM, Andy Davidson < a...@santacruzintegration.com> wrote: > Hi Frank > > We have thousands of small files . Each file is between 6K to maybe 100k. > > Conductor looks interesting > > Andy > > From: Frank Austin Nothaft <fnoth...@berkeley.edu> > Date: Tuesday, March 15, 2016 at 11:59 AM > To: Andrew Davidson <a...@santacruzintegration.com> > Cc: "user @spark" <user@spark.apache.org> > Subject: Re: newbie HDFS S3 best practices > > Hard to say with #1 without knowing your application’s characteristics; > for #2, we use conductor <https://github.com/BD2KGenomics/conductor> with > IAM roles, .boto/.aws/credentials files. > > Frank Austin Nothaft > fnoth...@berkeley.edu > fnoth...@eecs.berkeley.edu > 202-340-0466 > > On Mar 15, 2016, at 11:45 AM, Andy Davidson <a...@santacruzintegration.com > <a...@santacruzintegration.com>> wrote: > > We use the spark-ec2 script to create AWS clusters as needed (we do not > use AWS EMR) > >1. will we get better performance if we copy data to HDFS before we >run instead of reading directly from S3? > > 2. What is a good way to move results from HDFS to S3? > > > It seems like there are many ways to bulk copy to s3. Many of them require > we explicitly use the AWS_ACCESS_KEY_ID:AWS_SECRET_ACCESS_KEY@ > <AWS_SECRET_ACCESS_KEY@/yasemindeneme/deneme.txt>. This seems like a bad > idea? > > What would you recommend? > > Thanks > > Andy > > > >
Re: Does parallelize and collect preserve the original order of list?
Short answer: Nope Less short answer: Spark is not designed to maintain sort order in this case... it *may*, but there's no guarantee... generally, it would not be in the same order unless you implement something to order by and then sort the result based on that. -- Chris Miller On Wed, Mar 16, 2016 at 10:16 AM, JoneZhang <joyoungzh...@gmail.com> wrote: > Step1 > List items = new ArrayList();items.addAll(XXX); > javaSparkContext.parallelize(items).saveAsTextFile(output); > Step2 > final List items2 = ctx.textFile(output).collect(); > > Does items and items2 has the same order? > > > Besh wishes. > Thanks. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Does-parallelize-and-collect-preserve-the-original-order-of-list-tp26512.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: reading file from S3
+1 for Sab's thoughtful answer... Yasemin: As Gourav said, using IAM roles is considered best practice and generally will give you fewer headaches in the end... but you may have a reason for doing it the way you are, and certainly the way you posted should be supported and not cause the error you described. -- Chris Miller On Tue, Mar 15, 2016 at 11:22 PM, Sabarish Sasidharan < sabarish.sasidha...@manthan.com> wrote: > There are many solutions to a problem. > > Also understand that sometimes your situation might be such. For ex what > if you are accessing S3 from your Spark job running in your continuous > integration server sitting in your data center or may be a box under your > desk. And sometimes you are just trying something. > > Also understand that sometimes you want answers to solve your problem at > hand without redirecting you to something else. Understand what you > suggested is an appropriate way of doing it, which I myself have proposed > before, but that doesn't solve the OP's problem at hand. > > Regards > Sab > On 15-Mar-2016 8:27 pm, "Gourav Sengupta" <gourav.sengu...@gmail.com> > wrote: > >> Oh!!! What the hell >> >> Please never use the URI >> >> *s3n://AWS_ACCESS_KEY_ID:AWS_SECRET_ACCESS_KEY.*That is a major cause of >> pain, security issues, code maintenance issues and ofcourse something that >> Amazon strongly suggests that we do not use. Please use roles and you will >> not have to worry about security. >> >> Regards, >> Gourav Sengupta >> >> On Tue, Mar 15, 2016 at 2:38 PM, Sabarish Sasidharan < >> sabarish@gmail.com> wrote: >> >>> You have a slash before the bucket name. It should be @. >>> >>> Regards >>> Sab >>> On 15-Mar-2016 4:03 pm, "Yasemin Kaya" <godo...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I am using Spark 1.6.0 standalone and I want to read a txt file from S3 >>>> bucket named yasemindeneme and my file name is deneme.txt. But I am getting >>>> this error. Here is the simple code >>>> <https://gist.github.com/anonymous/6d174f8587f0f3fd2334> >>>> Exception in thread "main" java.lang.IllegalArgumentException: Invalid >>>> hostname in URI s3n://AWS_ACCESS_KEY_ID:AWS_SECRET_ACCESS_KEY@ >>>> /yasemindeneme/deneme.txt >>>> at >>>> org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:45) >>>> at >>>> org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:55) >>>> >>>> >>>> I try 2 options >>>> *sc.hadoopConfiguration() *and >>>> *sc.textFile("s3n://AWS_ACCESS_KEY_ID:AWS_SECRET_ACCESS_KEY@/yasemindeneme/deneme.txt/");* >>>> >>>> Also I did export AWS_ACCESS_KEY_ID= . >>>> export AWS_SECRET_ACCESS_KEY= >>>> But there is no change about error. >>>> >>>> Could you please help me about this issue? >>>> >>>> >>>> -- >>>> hiç ender hiç >>>> >>> >>
Re: Correct way to use spark streaming with apache zeppelin
Cool! Thanks for sharing. -- Chris Miller On Sun, Mar 13, 2016 at 12:53 AM, Todd Nist <tsind...@gmail.com> wrote: > Below is a link to an example which Silvio Fiorito put together > demonstrating how to link Zeppelin with Spark Stream for real-time charts. > I think the original thread was pack in early November 2015, subject: Real > time chart in Zeppelin, if you care to try to find it. > > https://gist.github.com/granturing/a09aed4a302a7367be92 > > HTH. > > -Todd > > On Sat, Mar 12, 2016 at 6:21 AM, Chris Miller <cmiller11...@gmail.com> > wrote: > >> I'm pretty new to all of this stuff, so bare with me. >> >> Zeppelin isn't really intended for realtime dashboards as far as I know. >> Its reporting features (tables, graphs, etc.) are more for displaying the >> results from the output of something. As far as I know, there isn't really >> anything to "watch" a dataset and have updates pushed to the Zeppelin UI. >> >> As for Spark, unless you're doing a lot of processing that you didn't >> mention here, I don't think it's a good fit just for this. >> >> If it were me (just off the top of my head), I'd just build a simple web >> service that uses websockets to push updates to the client which could then >> be used to update graphs, tables, etc. The data itself -- that is, the >> accumulated totals -- you could store in something like Redis. When an >> order comes in, just add that quantity and price to the existing value and >> trigger your code to push out an updated value to any clients via the >> websocket. You could use something like a Redis pub/sub channel to trigger >> the web app to notify clients of an update. >> >> There are about 5 million other ways you could design this, but I would >> just keep it as simple as possible. I just threw one idea out... >> >> Good luck. >> >> >> -- >> Chris Miller >> >> On Sat, Mar 12, 2016 at 6:58 PM, trung kien <kient...@gmail.com> wrote: >> >>> Thanks Chris and Mich for replying. >>> >>> Sorry for not explaining my problem clearly. Yes i am talking about a >>> flexibke dashboard when mention Zeppelin. >>> >>> Here is the problem i am having: >>> >>> I am running a comercial website where we selle many products and we >>> have many branchs in many place. We have a lots of realtime transactions >>> and want to anaylyze it in realtime. >>> >>> We dont want every time doing analytics we have to aggregate every >>> single transactions ( each transaction have BranchID, ProductID, Qty, >>> Price). So, we maintain intermediate data which contains : BranchID, >>> ProducrID, totalQty, totalDollar >>> >>> Ideally, we have 2 tables: >>>Transaction ( BranchID, ProducrID, Qty, Price, Timestamp) >>> >>> And intermediate table Stats is just sum of every transaction group by >>> BranchID and ProductID( i am using Sparkstreaming to calculate this table >>> realtime) >>> >>> My thinking is that doing statistics ( realtime dashboard) on Stats >>> table is much easier, this table is also not enough for maintain. >>> >>> I'm just wondering, whats the best way to store Stats table( a database >>> or parquet file?) >>> What exactly are you trying to do? Zeppelin is for interactive analysis >>> of a dataset. What do you mean "realtime analytics" -- do you mean build a >>> report or dashboard that automatically updates as new data comes in? >>> >>> >>> -- >>> Chris Miller >>> >>> On Sat, Mar 12, 2016 at 3:13 PM, trung kien <kient...@gmail.com> wrote: >>> >>>> Hi all, >>>> >>>> I've just viewed some Zeppenlin's videos. The intergration between >>>> Zeppenlin and Spark is really amazing and i want to use it for my >>>> application. >>>> >>>> In my app, i will have a Spark streaming app to do some basic realtime >>>> aggregation ( intermediate data). Then i want to use Zeppenlin to do some >>>> realtime analytics on the intermediate data. >>>> >>>> My question is what's the most efficient storage engine to store >>>> realtime intermediate data? Is parquet file somewhere is suitable? >>>> >>> >>> >> >
Re: Correct way to use spark streaming with apache zeppelin
I'm pretty new to all of this stuff, so bare with me. Zeppelin isn't really intended for realtime dashboards as far as I know. Its reporting features (tables, graphs, etc.) are more for displaying the results from the output of something. As far as I know, there isn't really anything to "watch" a dataset and have updates pushed to the Zeppelin UI. As for Spark, unless you're doing a lot of processing that you didn't mention here, I don't think it's a good fit just for this. If it were me (just off the top of my head), I'd just build a simple web service that uses websockets to push updates to the client which could then be used to update graphs, tables, etc. The data itself -- that is, the accumulated totals -- you could store in something like Redis. When an order comes in, just add that quantity and price to the existing value and trigger your code to push out an updated value to any clients via the websocket. You could use something like a Redis pub/sub channel to trigger the web app to notify clients of an update. There are about 5 million other ways you could design this, but I would just keep it as simple as possible. I just threw one idea out... Good luck. -- Chris Miller On Sat, Mar 12, 2016 at 6:58 PM, trung kien <kient...@gmail.com> wrote: > Thanks Chris and Mich for replying. > > Sorry for not explaining my problem clearly. Yes i am talking about a > flexibke dashboard when mention Zeppelin. > > Here is the problem i am having: > > I am running a comercial website where we selle many products and we have > many branchs in many place. We have a lots of realtime transactions and > want to anaylyze it in realtime. > > We dont want every time doing analytics we have to aggregate every single > transactions ( each transaction have BranchID, ProductID, Qty, Price). So, > we maintain intermediate data which contains : BranchID, ProducrID, > totalQty, totalDollar > > Ideally, we have 2 tables: >Transaction ( BranchID, ProducrID, Qty, Price, Timestamp) > > And intermediate table Stats is just sum of every transaction group by > BranchID and ProductID( i am using Sparkstreaming to calculate this table > realtime) > > My thinking is that doing statistics ( realtime dashboard) on Stats table > is much easier, this table is also not enough for maintain. > > I'm just wondering, whats the best way to store Stats table( a database or > parquet file?) > What exactly are you trying to do? Zeppelin is for interactive analysis of > a dataset. What do you mean "realtime analytics" -- do you mean build a > report or dashboard that automatically updates as new data comes in? > > > -- > Chris Miller > > On Sat, Mar 12, 2016 at 3:13 PM, trung kien <kient...@gmail.com> wrote: > >> Hi all, >> >> I've just viewed some Zeppenlin's videos. The intergration between >> Zeppenlin and Spark is really amazing and i want to use it for my >> application. >> >> In my app, i will have a Spark streaming app to do some basic realtime >> aggregation ( intermediate data). Then i want to use Zeppenlin to do some >> realtime analytics on the intermediate data. >> >> My question is what's the most efficient storage engine to store realtime >> intermediate data? Is parquet file somewhere is suitable? >> > >
Re: Repeating Records w/ Spark + Avro?
Well, I kind of got it... this works below: * val rdd = sc.newAPIHadoopFile(path, classOf[AvroKeyInputFormat[GenericRecord]], classOf[AvroKey[GenericRecord]], classOf[NullWritable]).map(_._1.datum) rdd .map(item => { val item = i.copy() val record = i._1.datum() println(record.get("myValue")) }) .take(10) * Seems strange to me that I have to iterate over the RDD effectively two times -- one to create the RDD, and another to perform my action. It also seems strange that I can't actually access the data in my RDD until I've copied the records. I would think this is a *very* common use case of an RDD -- accessing the data it contains (otherwise, what's the point?). Is there a way to always enable cloning? There used to be a cloneRecords parameter on the hadoopfile method, but that seems to have been removed. Finally, if I add rdd.persist(), then it doesn't work. I guess I would need to do .map(_._1.datum) again before the map that does the real work. -- Chris Miller On Sat, Mar 12, 2016 at 4:15 PM, Chris Miller <cmiller11...@gmail.com> wrote: > Wow! That sure is buried in the documentation! But yeah, that's what I > thought more or less. > > I tried copying as follows, but that didn't work. > > * > val copyRDD = singleFileRDD.map(_.copy()) > * > > When I iterate over the new copyRDD (foreach or map), I still have the > same problem of duplicate records. I also tried copying within the block > where I'm using it, but that didn't work either: > > * > rdd > .take(10) > .collect() > .map(item => { > val item = i.copy() > val record = i._1.datum() > > println(record.get("myValue")) > }) > * > > What am I doing wrong? > > -- > Chris Miller > > On Sat, Mar 12, 2016 at 1:48 PM, Peyman Mohajerian <mohaj...@gmail.com> > wrote: > >> Here is the reason for the behavior: >> '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable >> object for each record, directly caching the returned RDD or directly >> passing it to an aggregation or shuffle operation will create many >> references to the same object. If you plan to directly cache, sort, or >> aggregate Hadoop writable objects, you should first copy them using a map >> function. >> >> >> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/SparkContext.html >> >> So it is Hadoop related. >> >> On Fri, Mar 11, 2016 at 3:19 PM, Chris Miller <cmiller11...@gmail.com> >> wrote: >> >>> I have a bit of a strange situation: >>> >>> * >>> import org.apache.avro.generic.{GenericData, GenericRecord} >>> import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper, AvroKey} >>> import org.apache.avro.mapreduce.AvroKeyInputFormat >>> import org.apache.hadoop.io.{NullWritable, WritableUtils} >>> >>> val path = "/path/to/data.avro" >>> >>> val rdd = sc.newAPIHadoopFile(path, >>> classOf[AvroKeyInputFormat[GenericRecord]], >>> classOf[AvroKey[GenericRecord]], classOf[NullWritable]) >>> rdd.take(10).foreach( x => println( x._1.datum() )) >>> * >>> >>> In this situation, I get the right number of records returned, and if I >>> look at the contents of rdd I see the individual records as tuple2's... >>> however, if I println on each one as shown above, I get the same result >>> every time. >>> >>> Apparently this has to do with something in Spark or Avro keeping a >>> reference to the item its iterating over, so I need to clone the object >>> before I use it. However, if I try to clone it (from the spark-shell >>> console), I get: >>> >>> * >>> rdd.take(10).foreach( x => { >>> val clonedDatum = x._1.datum().clone() >>> println(clonedDatum.datum()) >>> }) >>> >>> :37: error: method clone in class Object cannot be accessed in >>> org.apache.avro.generic.GenericRecord >>> Access to protected method clone not permitted because >>> prefix type org.apache.avro.generic.GenericRecord does not conform to >>> class $iwC where the access take place >>> val clonedDatum = x._1.datum().clone() >>> * >>> >>> So, how can I clone the datum? >>> >>> Seems I'm not the only one who ran into this problem: >>> https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/102. I >>> can't figure out how to fix it in my case without hacking away like the >>> person in the linked PR did. >>> >>> Suggestions? >>> >>> -- >>> Chris Miller >>> >> >> >
Re: Repeating Records w/ Spark + Avro?
Wow! That sure is buried in the documentation! But yeah, that's what I thought more or less. I tried copying as follows, but that didn't work. * val copyRDD = singleFileRDD.map(_.copy()) * When I iterate over the new copyRDD (foreach or map), I still have the same problem of duplicate records. I also tried copying within the block where I'm using it, but that didn't work either: * rdd .take(10) .collect() .map(item => { val item = i.copy() val record = i._1.datum() println(record.get("myValue")) }) * What am I doing wrong? -- Chris Miller On Sat, Mar 12, 2016 at 1:48 PM, Peyman Mohajerian <mohaj...@gmail.com> wrote: > Here is the reason for the behavior: > '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable > object for each record, directly caching the returned RDD or directly > passing it to an aggregation or shuffle operation will create many > references to the same object. If you plan to directly cache, sort, or > aggregate Hadoop writable objects, you should first copy them using a map > function. > > > https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/SparkContext.html > > So it is Hadoop related. > > On Fri, Mar 11, 2016 at 3:19 PM, Chris Miller <cmiller11...@gmail.com> > wrote: > >> I have a bit of a strange situation: >> >> * >> import org.apache.avro.generic.{GenericData, GenericRecord} >> import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper, AvroKey} >> import org.apache.avro.mapreduce.AvroKeyInputFormat >> import org.apache.hadoop.io.{NullWritable, WritableUtils} >> >> val path = "/path/to/data.avro" >> >> val rdd = sc.newAPIHadoopFile(path, >> classOf[AvroKeyInputFormat[GenericRecord]], >> classOf[AvroKey[GenericRecord]], classOf[NullWritable]) >> rdd.take(10).foreach( x => println( x._1.datum() )) >> * >> >> In this situation, I get the right number of records returned, and if I >> look at the contents of rdd I see the individual records as tuple2's... >> however, if I println on each one as shown above, I get the same result >> every time. >> >> Apparently this has to do with something in Spark or Avro keeping a >> reference to the item its iterating over, so I need to clone the object >> before I use it. However, if I try to clone it (from the spark-shell >> console), I get: >> >> * >> rdd.take(10).foreach( x => { >> val clonedDatum = x._1.datum().clone() >> println(clonedDatum.datum()) >> }) >> >> :37: error: method clone in class Object cannot be accessed in >> org.apache.avro.generic.GenericRecord >> Access to protected method clone not permitted because >> prefix type org.apache.avro.generic.GenericRecord does not conform to >> class $iwC where the access take place >> val clonedDatum = x._1.datum().clone() >> ********* >> >> So, how can I clone the datum? >> >> Seems I'm not the only one who ran into this problem: >> https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/102. I >> can't figure out how to fix it in my case without hacking away like the >> person in the linked PR did. >> >> Suggestions? >> >> -- >> Chris Miller >> > >
Re: Correct way to use spark streaming with apache zeppelin
What exactly are you trying to do? Zeppelin is for interactive analysis of a dataset. What do you mean "realtime analytics" -- do you mean build a report or dashboard that automatically updates as new data comes in? -- Chris Miller On Sat, Mar 12, 2016 at 3:13 PM, trung kien <kient...@gmail.com> wrote: > Hi all, > > I've just viewed some Zeppenlin's videos. The intergration between > Zeppenlin and Spark is really amazing and i want to use it for my > application. > > In my app, i will have a Spark streaming app to do some basic realtime > aggregation ( intermediate data). Then i want to use Zeppenlin to do some > realtime analytics on the intermediate data. > > My question is what's the most efficient storage engine to store realtime > intermediate data? Is parquet file somewhere is suitable? >
Repeating Records w/ Spark + Avro?
I have a bit of a strange situation: * import org.apache.avro.generic.{GenericData, GenericRecord} import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper, AvroKey} import org.apache.avro.mapreduce.AvroKeyInputFormat import org.apache.hadoop.io.{NullWritable, WritableUtils} val path = "/path/to/data.avro" val rdd = sc.newAPIHadoopFile(path, classOf[AvroKeyInputFormat[GenericRecord]], classOf[AvroKey[GenericRecord]], classOf[NullWritable]) rdd.take(10).foreach( x => println( x._1.datum() )) * In this situation, I get the right number of records returned, and if I look at the contents of rdd I see the individual records as tuple2's... however, if I println on each one as shown above, I get the same result every time. Apparently this has to do with something in Spark or Avro keeping a reference to the item its iterating over, so I need to clone the object before I use it. However, if I try to clone it (from the spark-shell console), I get: * rdd.take(10).foreach( x => { val clonedDatum = x._1.datum().clone() println(clonedDatum.datum()) }) :37: error: method clone in class Object cannot be accessed in org.apache.avro.generic.GenericRecord Access to protected method clone not permitted because prefix type org.apache.avro.generic.GenericRecord does not conform to class $iwC where the access take place val clonedDatum = x._1.datum().clone() * So, how can I clone the datum? Seems I'm not the only one who ran into this problem: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/102. I can't figure out how to fix it in my case without hacking away like the person in the linked PR did. Suggestions? -- Chris Miller
Re: Avro SerDe Issue w/ Manual Partitions?
For anyone running into this same issue, it looks like Avro deserialization is just broken when used with SparkSQL and partitioned schemas. I created an bug report with details and a simplified example on how to reproduce: https://issues.apache.org/jira/browse/SPARK-13709 -- Chris Miller On Fri, Mar 4, 2016 at 12:11 AM, Chris Miller <cmiller11...@gmail.com> wrote: > One more thing -- just to set aside any question about my specific schema > or data, I used the sample schema and data record from Oracle's > documentation on Avro support. It's a pretty simple schema: > https://docs.oracle.com/cd/E26161_02/html/GettingStartedGuide/jsonbinding-overview.html > > When I create a table with this schema and then try to query the > Avro-encoded record, I get the same type of error: > > > org.apache.avro.AvroTypeException: Found avro.FullName, expecting union > at > org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292) > at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) > at > org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155) > at > org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) > at > org.apache.hadoop.hive.serde2.avro.AvroDeserializer$SchemaReEncoder.reencode(AvroDeserializer.java:111) > at > org.apache.hadoop.hive.serde2.avro.AvroDeserializer.deserialize(AvroDeserializer.java:175) > at > org.apache.hadoop.hive.serde2.avro.AvroSerDe.deserialize(AvroSerDe.java:201) > at > org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:409) > at > org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:408) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at scala.collection.TraversableOnce$class.to > (TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > > To me, this "feels" like a bug -- I just can't identify if it's a Spark > issue or an Avro issue. Decoding the same files work fine with Hive, and I > imagine the same deserializer code is used there too. > > Thoughts? > > -- > Chris Miller > > On Thu, Mar 3, 2016 at 9:38 PM, Igor Berman <igor.ber...@gmail.com> wrote: > >> your field name is >> *enum1_values* >> >> but you have data >> { "foo1": "test123", *"enum1"*: "BLUE" } >> >> i.e. since you defined enum and not union(null, enum) >> it tries to find value for enum1_values and doesn't find one... >> >> On 3 March 2016 at 11:30, Chris Miller <cmiller11...@gmail.com>
Re: Is Spark right for us?
Gut instinct is no, Spark is overkill for your needs... you should be able to accomplish all of that with a relational database or a column oriented database (depending on the types of queries you most frequently run and the performance requirements). -- Chris Miller On Mon, Mar 7, 2016 at 1:17 AM, Laumegui Deaulobi < guillaume.bilod...@gmail.com> wrote: > Our problem space is survey analytics. Each survey comprises a set of > questions, with each question having a set of possible answers. Survey > fill-out tasks are sent to users, who have until a certain date to complete > it. Based on these survey fill-outs, reports need to be generated. Each > report deals with a subset of the survey fill-outs, and comprises a set of > data points (average rating for question 1, min/max for question 2, etc.) > > We are dealing with rather large data sets - although reading the internet > we get the impression that everyone is analyzing petabytes of data... > > Users: up to 100,000 > Surveys: up to 100,000 > Questions per survey: up to 100 > Possible answers per question: up to 10 > Survey fill-outs / user: up to 10 > Reports: up to 100,000 > Data points per report: up to 100 > > Data is currently stored in a relational database but a migration to a > different kind of store is possible. > > The naive algorithm for report generation can be summed up as this: > > for each report to be generated { > for each report data point to be calculated { > calculate data point > add data point to report > } > publish report > } > > In order to deal with the upper limits of these values, we will need to > distribute this algorithm to a compute / data cluster as much as possible. > > I've read about frameworks such as Apache Spark but also Hadoop, GridGain, > HazelCast and several others, and am still confused as to how each of these > can help us and how they fit together. > > Is Spark the right framework for us? > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-right-for-us-tp26412.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: MLLib + Streaming
Guru:This is a really great response. Thanks for taking the time to explain all of this. Helpful for me too. -- Chris Miller On Sun, Mar 6, 2016 at 1:54 PM, Guru Medasani <gdm...@gmail.com> wrote: > Hi Lan, > > Streaming Means, Linear Regression and Logistic Regression support online > machine learning as you mentioned. Online machine learning is where model > is being trained and updated on every batch of streaming data. These models > have trainOn() and predictOn() methods where you can simply pass in > DStreams you want to train the model on and DStreams you want the model to > predict on. So when the next batch of data arrives model is trained and > updated again. In this case model weights are continually updated and > hopefully model performs better in terms of convergence and accuracy over > time. What we are really trying to do in online learning case is that we > are only showing few examples of the data at a time ( stream of data) and > updating the parameters in case of Linear and Logistic Regression and > updating the centers in case of K-Means. In the case of Linear or Logistic > Regression this is possible due to the optimizer that is chosen for > minimizing the cost function which is Stochastic Gradient Descent. This > optimizer helps us to move closer and closer to the optimal weights after > every batch and over the time we will have a model that has learned how to > represent our data and predict well. > > In the scenario of using any MLlib algorithms and doing training with > DStream.transform() and DStream.foreachRDD() operations, when the first > batch of data arrives we build a model, let’s call this model1. Once you > have the model1 you can make predictions on the same DStream or a different > DStream source. But for the next batch if you follow the same procedure and > create a model, let’s call this model2. This model2 will be significantly > different than model1 based on how different the data is in the second > DStream vs the first DStream as it is not continually updating the model. > It’s like weight vectors are jumping from one place to the other for every > batch and we never know if the algorithm is converging to the optimal > weights. So I believe it is not possible to do true online learning with > other MLLib models in Spark Streaming. I am not sure if this is because > the models don’t generally support this streaming scenarios or if the > streaming versions simply haven’t been implemented yet. > > Though technically you can use any of the MLlib algorithms in Spark > Streaming with the procedure you mentioned and make predictions, it is > important to figure out if the model you are choosing can converge by > showing only a subset(batches - DStreams) of the data over time. Based on > the algorithm you choose certain optimizers won’t necessarily be able to > converge by showing only individual data points and require to see majority > of the data to be able to learn optimal weights. In these cases, you can > still do offline learning/training with Spark bach processing using any of > the MLlib algorithms and save those models on hdfs. You can then start a > streaming job and load these saved models into your streaming application > and make predictions. This is traditional offline learning. > > In general, online learning is hard as it’s hard to evaluate since we are > not holding any test data during the model training. We are simply training > the model and predicting. So in the initial batches, results can vary quite > a bit and have significant errors in terms of the predictions. So choosing > online learning vs. offline learning depends on how much tolerance the > application can have towards wild predictions in the beginning. Offline > training is simple and cheap where as online training can be hard and needs > to be constantly monitored to see how it is performing. > > Hope this helps in understanding offline learning vs. online learning and > which algorithms you can choose for online learning in MLlib. > > Guru Medasani > gdm...@gmail.com > > > > > On Mar 5, 2016, at 7:37 PM, Lan Jiang <ljia...@gmail.com> wrote: > > > > Hi, there > > > > I hope someone can clarify this for me. It seems that some of the MLlib > algorithms such as KMean, Linear Regression and Logistics Regression have a > Streaming version, which can do online machine learning. But does that mean > other MLLib algorithm cannot be used in Spark streaming applications, such > as random forest, SVM, collaborate filtering, etc?? > > > > DStreams are essentially a sequence of RDDs. We can use > DStream.transform() and DStream.foreachRDD() operations, which allows you > access RDDs in a DStream and apply MLLib function
Re: Best way to merge files from streaming jobs on S3
Why does the order matter? Coalesce runs in parallel and if it's just writing to the file, then I imagine it would do it in whatever order it happens to be executed in each thread. If you want to sort the resulting data, I imagine you'd need to save it to some sort of data structure instead of writing to the file from coalesce, sort that data structure, then write your file. -- Chris Miller On Sat, Mar 5, 2016 at 5:24 AM, jelez <je...@hotmail.com> wrote: > My streaming job is creating files on S3. > The problem is that those files end up very small if I just write them to > S3 > directly. > This is why I use coalesce() to reduce the number of files and make them > larger. > > However, coalesce shuffles data and my job processing time ends up higher > than sparkBatchIntervalMilliseconds. > > I have observed that if I coalesce the number of partitions to be equal to > the cores in the cluster I get less shuffling - but that is > unsubstantiated. > Is there any dependency/rule between number of executors, number of cores > etc. that I can use to minimize shuffling and at the same time achieve > minimum number of output files per batch? > What is the best practice? > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-merge-files-from-streaming-jobs-on-S3-tp26400.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Avro SerDe Issue w/ Manual Partitions?
One more thing -- just to set aside any question about my specific schema or data, I used the sample schema and data record from Oracle's documentation on Avro support. It's a pretty simple schema: https://docs.oracle.com/cd/E26161_02/html/GettingStartedGuide/jsonbinding-overview.html When I create a table with this schema and then try to query the Avro-encoded record, I get the same type of error: org.apache.avro.AvroTypeException: Found avro.FullName, expecting union at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292) at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) at org.apache.hadoop.hive.serde2.avro.AvroDeserializer$SchemaReEncoder.reencode(AvroDeserializer.java:111) at org.apache.hadoop.hive.serde2.avro.AvroDeserializer.deserialize(AvroDeserializer.java:175) at org.apache.hadoop.hive.serde2.avro.AvroSerDe.deserialize(AvroSerDe.java:201) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:409) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:408) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) To me, this "feels" like a bug -- I just can't identify if it's a Spark issue or an Avro issue. Decoding the same files work fine with Hive, and I imagine the same deserializer code is used there too. Thoughts? -- Chris Miller On Thu, Mar 3, 2016 at 9:38 PM, Igor Berman <igor.ber...@gmail.com> wrote: > your field name is > *enum1_values* > > but you have data > { "foo1": "test123", *"enum1"*: "BLUE" } > > i.e. since you defined enum and not union(null, enum) > it tries to find value for enum1_values and doesn't find one... > > On 3 March 2016 at 11:30, Chris Miller <cmiller11...@gmail.com> wrote: > >> I've been digging into this a little deeper. Here's what I've found: >> >> test1.avsc: >> >> { >> "namespace": "com.cmiller", >> "name": "test1", >> "type": "record", >> "fields": [ >> { "name":"foo1", "type":"string" } >> ] >> } >> >> >> test2.avsc: >> >> { >> "namespace": "com.cmiller", >> "name": "test1", >> "type": "record", >> "fields": [ >> { "name":"foo1", "type":"string"
Re: Avro SerDe Issue w/ Manual Partitions?
No, the name of the field is *enum1* -- the name of the field's type is *enum1_values*. It should not be looking for enum1_values -- that's not the way the specification states that the standard works, and it's not how any other implementation reads Avro data. For what it's worth, if I change enum1 to enum1_values, the data fails to encode (as it should): $ avro-tools fromjson --schema-file=test.avsc test.json > test.avro Exception in thread "main" org.apache.avro.AvroTypeException: Expected field name not found: enum1 at org.apache.avro.io.JsonDecoder.doAction(JsonDecoder.java:477) at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) at org.apache.avro.io.JsonDecoder.advance(JsonDecoder.java:139) at org.apache.avro.io.JsonDecoder.readEnum(JsonDecoder.java:332) at org.apache.avro.io.ResolvingDecoder.readEnum(ResolvingDecoder.java:256) at org.apache.avro.generic.GenericDatumReader.readEnum(GenericDatumReader.java:199) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) at org.apache.avro.tool.DataFileWriteTool.run(DataFileWriteTool.java:99) at org.apache.avro.tool.Main.run(Main.java:84) at org.apache.avro.tool.Main.main(Main.java:73) Any other ideas? -- Chris Miller On Thu, Mar 3, 2016 at 9:38 PM, Igor Berman <igor.ber...@gmail.com> wrote: > your field name is > *enum1_values* > > but you have data > { "foo1": "test123", *"enum1"*: "BLUE" } > > i.e. since you defined enum and not union(null, enum) > it tries to find value for enum1_values and doesn't find one... > > On 3 March 2016 at 11:30, Chris Miller <cmiller11...@gmail.com> wrote: > >> I've been digging into this a little deeper. Here's what I've found: >> >> test1.avsc: >> >> { >> "namespace": "com.cmiller", >> "name": "test1", >> "type": "record", >> "fields": [ >> { "name":"foo1", "type":"string" } >> ] >> } >> >> >> test2.avsc: >> >> { >> "namespace": "com.cmiller", >> "name": "test1", >> "type": "record", >> "fields": [ >> { "name":"foo1", "type":"string" }, >> { "name":"enum1", "type": { "type":"enum", "name":"enum1_values", >> "symbols":["BLUE","RED", "GREEN"]} } >> ] >> } >> >> >> test1.json (encoded and saved to test/test1.avro): >> >> { "foo1": "test123" } >> >> >> test2.json (encoded and saved to test/test1.avro): >> >> { "foo1": "test123", "enum1": "BLUE" } >> >> >> Here is how I create the tables and add the data: >> >> >> CREATE TABLE test1 >> PARTITIONED BY (ds STRING) >> ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' >> STORED AS INPUTFORMAT >> 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' >> OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' >> TBLPROPERTIES ('avro.schema.url'='hdfs:///path/to/schemas/test1.avsc'); >> >> ALTER TABLE test1 ADD PARTITION (ds='1') LOCATION >> 's3://spark-data/dev/test1'; >> >> >> CREATE TABLE test2 >> PARTITIONED BY (ds STRING) >> ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' >> STORED AS INPUTFORMAT >> 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' >> OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' >> TBLPROPERTIES ('avro.schema.url'='hdfs:///path/to/schemas/test2.avsc'); >> >> ALTER TABLE test2 ADD PARTITION (ds='1') LOCATION >> 's3://spark-data/dev/test2'; >> >> >> And here's what I get: >> >> >> SELECT * FROM test1; >> -- works fine, shows data >> >> SELECT *
Re: Avro SerDe Issue w/ Manual Partitions?
e.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) In addition to the above, I also tried putting the test Avro files on HDFS instead of S3 -- the error is the same. I also tried querying from Scala instead of using Zeppelin, and I get the same error. Where should I begin with troubleshooting this problem? This same query runs fine on Hive. Based on the error, it appears to be something in the deserializer though... but if it were a bug in the Avro deserializer, why does it only appear with Spark? I imagine Hive queries would be using the same deserializer, no? Thanks! -- Chris Miller On Thu, Mar 3, 2016 at 4:33 AM, Chris Miller <cmiller11...@gmail.com> wrote: > Hi, > > I have a strange issue occurring when I use manual partitions. > > If I create a table as follows, I am able to query the data with no > problem: > > > CREATE TABLE test1 > ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' > STORED AS INPUTFORMAT > 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' > OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' > LOCATION 's3://analytics-bucket/prod/logs/avro/2016/03/02/' > TBLPROPERTIES ('avro.schema.url'='hdfs:///data/schemas/schema.avsc'); > > > If I create the table like this, however, and then add a partition with a > LOCATION specified, I am unable to query: > > > CREATE TABLE test2 > PARTITIONED BY (ds STRING) > ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' > STORED AS INPUTFORMAT > 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' > OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' > TBLPROPERTIES ('avro.schema.url'='hdfs:///data/schemas/schema.avsc'); > > ALTER TABLE test7 ADD PARTITION (ds='1') LOCATION > 's3://analytics-bucket/prod/logs/avro/2016/03/02/'; > > > This is what happens > > > SELECT * FROM test2 LIMIT 1; > > org.apache.avro.AvroTypeException: Found ActionEnum, expecting union > at > org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292) > at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) > at > org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155) > at > org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) > at > org.apache.hadoop.hive.serde2.avro.AvroDeserializer$SchemaReEncoder.reencode(AvroDeserializer.java:111) > at > org.apache.hadoop.hive.serde2.avro.AvroDeserializer.deserialize(AvroDeserializer.java:175) > at > org.apache.hadoop.hive.serde2.avro.AvroSerDe.deserialize(AvroSerDe.java:201) > at > org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:409) > at > org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:408) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at scala.collection.TraversableOnce$class.to > (TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) > at > org.apache.spark.SparkConte
Avro SerDe Issue w/ Manual Partitions?
Hi, I have a strange issue occurring when I use manual partitions. If I create a table as follows, I am able to query the data with no problem: CREATE TABLE test1 ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' LOCATION 's3://analytics-bucket/prod/logs/avro/2016/03/02/' TBLPROPERTIES ('avro.schema.url'='hdfs:///data/schemas/schema.avsc'); If I create the table like this, however, and then add a partition with a LOCATION specified, I am unable to query: CREATE TABLE test2 PARTITIONED BY (ds STRING) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' TBLPROPERTIES ('avro.schema.url'='hdfs:///data/schemas/schema.avsc'); ALTER TABLE test7 ADD PARTITION (ds='1') LOCATION 's3://analytics-bucket/prod/logs/avro/2016/03/02/'; This is what happens SELECT * FROM test2 LIMIT 1; org.apache.avro.AvroTypeException: Found ActionEnum, expecting union at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292) at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) at org.apache.hadoop.hive.serde2.avro.AvroDeserializer$SchemaReEncoder.reencode(AvroDeserializer.java:111) at org.apache.hadoop.hive.serde2.avro.AvroDeserializer.deserialize(AvroDeserializer.java:175) at org.apache.hadoop.hive.serde2.avro.AvroSerDe.deserialize(AvroSerDe.java:201) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:409) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:408) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) The data is exactly the same, and I can still go back and query the test1 table without issue. I don't have control over the directory structure, so I need to add the partitions manually so that I can specify a location. For what it's worth, "ActionEnum" is the first field in my schema. This same table and query structure works fine with Hive. When I try to run this with SparkSQL, however, I get the above error. Anyone have any idea what the problem is here? Thanks! -- Chris Miller