Sorted partition ranges without overlap
Hi I have a RDDthat needs to be sorted lexicographically and then processed by partition. The partitions should be split in to ranged blocks where sorted order is maintained and each partition containing sequential, non-overlapping keys. Given keys (1,2,3,4,5,6) 1. Correct - 2 partition = (1,2,3),(4,5,6). - 3 partition = (1,2),(3,4),(5,6) 2. Incorrect, the ranges overlap even though they're sorted. - 2 partitions (1,3,5) (2,4,6) - 3 partitions (1,3),(2,5),(4,6) Is this possible with spark? Cheers, -Kristoffer - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: DataFrame select non-existing column
The problem is that I do not know which data frames has the pass.mobile column. I just list a HDFS directory which contain the parquet files and some files has the column and some don't. I really don't want to have conditional logic that inspect the schema. But maybe that's the only option? Maybe I misunderstand you, but the following code fails with the same error as before. DataFrame dataFrame = ctx.read().parquet(localPath) .select("pass") .withColumn("mobile", col("pass.mobile")); The flatten option works for my use case. But the problem is that there seems to be no way of dropping nested columns, i.e. drop("pass.auction") On Sun, Nov 20, 2016 at 10:55 AM, Mendelson, Assaf <assaf.mendel...@rsa.com> wrote: > The issue is that you already have a struct called pass. What you did was add > a new columned called "pass.mobile" instead of adding the element to pass - > The schema for pass element is the same as before. > When you do select pass.mobile, it finds the pass structure and checks for > mobile in it. > > You can do it the other way around: set the name to be: pass_mobile. Add it > as before with lit(0) for those that dataframes that do not have the mobile > field and do something like withColumn("pass_mobile", df["pass.modile"]) for > those that do. > Another option is to use do something like df.select("pass.*") to flatten the > pass structure and work on that (then you can do withColumn("mobile",...) > instead of "pass.mobile") but this would change the schema. > > > -Original Message- > From: Kristoffer Sjögren [mailto:sto...@gmail.com] > Sent: Saturday, November 19, 2016 4:57 PM > To: Mendelson, Assaf > Cc: user > Subject: Re: DataFrame select non-existing column > > Thanks. Here's my code example [1] and the printSchema() output [2]. > > This code still fails with the following message: "No such struct field > mobile in auction, geo" > > By looking at the schema, it seems that pass.mobile did not get nested, which > is the way it needs to be for my use case. Is nested columns not supported by > withColumn()? > > [1] > > DataFrame df = ctx.read().parquet(localPath).withColumn("pass.mobile", > lit(0L)); dataFrame.printSchema(); dataFrame.select("pass.mobile"); > > [2] > > root > |-- pass: struct (nullable = true) > ||-- auction: struct (nullable = true) > |||-- id: integer (nullable = true) > ||-- geo: struct (nullable = true) > |||-- postalCode: string (nullable = true) > |-- pass.mobile: long (nullable = false) > > On Sat, Nov 19, 2016 at 7:45 AM, Mendelson, Assaf <assaf.mendel...@rsa.com> > wrote: >> In pyspark for example you would do something like: >> >> df.withColumn("newColName",pyspark.sql.functions.lit(None)) >> >> Assaf. >> -Original Message- >> From: Kristoffer Sjögren [mailto:sto...@gmail.com] >> Sent: Friday, November 18, 2016 9:19 PM >> To: Mendelson, Assaf >> Cc: user >> Subject: Re: DataFrame select non-existing column >> >> Thanks for your answer. I have been searching the API for doing that but I >> could not find how to do it? >> >> Could you give me a code snippet? >> >> On Fri, Nov 18, 2016 at 8:03 PM, Mendelson, Assaf <assaf.mendel...@rsa.com> >> wrote: >>> You can always add the columns to old dataframes giving them null (or some >>> literal) as a preprocessing. >>> >>> -Original Message- >>> From: Kristoffer Sjögren [mailto:sto...@gmail.com] >>> Sent: Friday, November 18, 2016 4:32 PM >>> To: user >>> Subject: DataFrame select non-existing column >>> >>> Hi >>> >>> We have evolved a DataFrame by adding a few columns but cannot write select >>> statements on these columns for older data that doesn't have them since >>> they fail with a AnalysisException with message "No such struct field". >>> >>> We also tried dropping columns but this doesn't work for nested columns. >>> >>> Any non-hacky ways to get around this? >>> >>> Cheers, >>> -Kristoffer >>> >>> - >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: DataFrame select non-existing column
Thanks. Here's my code example [1] and the printSchema() output [2]. This code still fails with the following message: "No such struct field mobile in auction, geo" By looking at the schema, it seems that pass.mobile did not get nested, which is the way it needs to be for my use case. Is nested columns not supported by withColumn()? [1] DataFrame df = ctx.read().parquet(localPath).withColumn("pass.mobile", lit(0L)); dataFrame.printSchema(); dataFrame.select("pass.mobile"); [2] root |-- pass: struct (nullable = true) ||-- auction: struct (nullable = true) |||-- id: integer (nullable = true) ||-- geo: struct (nullable = true) |||-- postalCode: string (nullable = true) |-- pass.mobile: long (nullable = false) On Sat, Nov 19, 2016 at 7:45 AM, Mendelson, Assaf <assaf.mendel...@rsa.com> wrote: > In pyspark for example you would do something like: > > df.withColumn("newColName",pyspark.sql.functions.lit(None)) > > Assaf. > -Original Message- > From: Kristoffer Sjögren [mailto:sto...@gmail.com] > Sent: Friday, November 18, 2016 9:19 PM > To: Mendelson, Assaf > Cc: user > Subject: Re: DataFrame select non-existing column > > Thanks for your answer. I have been searching the API for doing that but I > could not find how to do it? > > Could you give me a code snippet? > > On Fri, Nov 18, 2016 at 8:03 PM, Mendelson, Assaf <assaf.mendel...@rsa.com> > wrote: >> You can always add the columns to old dataframes giving them null (or some >> literal) as a preprocessing. >> >> -Original Message- >> From: Kristoffer Sjögren [mailto:sto...@gmail.com] >> Sent: Friday, November 18, 2016 4:32 PM >> To: user >> Subject: DataFrame select non-existing column >> >> Hi >> >> We have evolved a DataFrame by adding a few columns but cannot write select >> statements on these columns for older data that doesn't have them since they >> fail with a AnalysisException with message "No such struct field". >> >> We also tried dropping columns but this doesn't work for nested columns. >> >> Any non-hacky ways to get around this? >> >> Cheers, >> -Kristoffer >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: DataFrame select non-existing column
Thanks for your answer. I have been searching the API for doing that but I could not find how to do it? Could you give me a code snippet? On Fri, Nov 18, 2016 at 8:03 PM, Mendelson, Assaf <assaf.mendel...@rsa.com> wrote: > You can always add the columns to old dataframes giving them null (or some > literal) as a preprocessing. > > -Original Message- > From: Kristoffer Sjögren [mailto:sto...@gmail.com] > Sent: Friday, November 18, 2016 4:32 PM > To: user > Subject: DataFrame select non-existing column > > Hi > > We have evolved a DataFrame by adding a few columns but cannot write select > statements on these columns for older data that doesn't have them since they > fail with a AnalysisException with message "No such struct field". > > We also tried dropping columns but this doesn't work for nested columns. > > Any non-hacky ways to get around this? > > Cheers, > -Kristoffer > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
DataFrame select non-existing column
Hi We have evolved a DataFrame by adding a few columns but cannot write select statements on these columns for older data that doesn't have them since they fail with a AnalysisException with message "No such struct field". We also tried dropping columns but this doesn't work for nested columns. Any non-hacky ways to get around this? Cheers, -Kristoffer - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark corrupts text lines
I managed to get remote debugging up and running and can in fact reproduce the error and get a breakpoint triggered as it happens. But it seems like the code does not go through TextInputFormat, or at least the breakpoint is not triggered from this class? Don't know what other class to look for the actual split could to occur? Any pointers? On Tue, Jun 14, 2016 at 4:03 PM, Kristoffer Sjögren <sto...@gmail.com> wrote: > I'm pretty confident the lines are encoded correctly since I can read > them both locally and on Spark (by ignoring the faulty line and > proceed to next). I also get the correct number of lines through > Spark, again by ignoring the faulty line. > > I get the same error by reading the original file using Spark, save as > new text file, then try decoding again. > > context.textFile("/orgfile").saveAsTextFile("/newfile"); > > Ok, not much left than to do some remote debugging. > > > On Tue, Jun 14, 2016 at 3:38 PM, Kristoffer Sjögren <sto...@gmail.com> wrote: >> Thanks for you help. Really appreciate it! >> >> Give me some time i'll come back after I've tried your suggestions. >> >> On Tue, Jun 14, 2016 at 3:28 PM, Kristoffer Sjögren <sto...@gmail.com> wrote: >>> I cannot reproduce it by running the file through Spark in local mode >>> on my machine. So it does indeed seems to be something related to >>> split across partitions. >>> >>> On Tue, Jun 14, 2016 at 3:04 PM, Kristoffer Sjögren <sto...@gmail.com> >>> wrote: >>>> Can you do remote debugging in Spark? Didn't know that. Do you have a link? >>>> >>>> Also noticed isSplittable in >>>> org.apache.hadoop.mapreduce.lib.input.TextInputFormat which checks for >>>> org.apache.hadoop.io.compress.SplittableCompressionCodec. Maybe there >>>> are some way to tell it not to split? >>>> >>>> On Tue, Jun 14, 2016 at 2:42 PM, Sean Owen <so...@cloudera.com> wrote: >>>>> It really sounds like the line is being split across partitions. This >>>>> is what TextInputFormat does but should be perfectly capable of >>>>> putting together lines that break across files (partitions). If you're >>>>> into debugging, that's where I would start if you can. Breakpoints >>>>> around how TextInputFormat is parsing lines. See if you can catch it >>>>> when it returns a line that doesn't contain what you expect. >>>>> >>>>> On Tue, Jun 14, 2016 at 1:38 PM, Kristoffer Sjögren <sto...@gmail.com> >>>>> wrote: >>>>>> That's funny. The line after is the rest of the whole line that got >>>>>> split in half. Every following lines after that are fine. >>>>>> >>>>>> I managed to reproduce without gzip also so maybe it's no gzip's fault >>>>>> after all.. >>>>>> >>>>>> I'm clueless... >>>>>> >>>>>> On Tue, Jun 14, 2016 at 12:53 PM, Kristoffer Sjögren <sto...@gmail.com> >>>>>> wrote: >>>>>>> Seems like it's the gzip. It works if download the file, gunzip and >>>>>>> put it back to another directory and read it the same way. >>>>>>> >>>>>>> Hm.. I wonder what happens with the lines after it.. >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Jun 14, 2016 at 11:52 AM, Sean Owen <so...@cloudera.com> wrote: >>>>>>>> What if you read it uncompressed from HDFS? >>>>>>>> gzip compression is unfriendly to MR in that it can't split the file. >>>>>>>> It still should just work, certainly if the line is in one file. But, >>>>>>>> a data point worth having. >>>>>>>> >>>>>>>> On Tue, Jun 14, 2016 at 10:49 AM, Kristoffer Sjögren >>>>>>>> <sto...@gmail.com> wrote: >>>>>>>>> The line is in one file. I did download the file manually from HDFS, >>>>>>>>> read and decoded it line-by-line successfully without Spark. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Jun 14, 2016 at 11:44 AM, Sean Owen <so...@cloudera.com> >>>>>>>>> wrote: >>>>>>>>>> The only thing I can think of is that a line is being broken across >>>>>>>>>
Re: Spark corrupts text lines
I'm pretty confident the lines are encoded correctly since I can read them both locally and on Spark (by ignoring the faulty line and proceed to next). I also get the correct number of lines through Spark, again by ignoring the faulty line. I get the same error by reading the original file using Spark, save as new text file, then try decoding again. context.textFile("/orgfile").saveAsTextFile("/newfile"); Ok, not much left than to do some remote debugging. On Tue, Jun 14, 2016 at 3:38 PM, Kristoffer Sjögren <sto...@gmail.com> wrote: > Thanks for you help. Really appreciate it! > > Give me some time i'll come back after I've tried your suggestions. > > On Tue, Jun 14, 2016 at 3:28 PM, Kristoffer Sjögren <sto...@gmail.com> wrote: >> I cannot reproduce it by running the file through Spark in local mode >> on my machine. So it does indeed seems to be something related to >> split across partitions. >> >> On Tue, Jun 14, 2016 at 3:04 PM, Kristoffer Sjögren <sto...@gmail.com> wrote: >>> Can you do remote debugging in Spark? Didn't know that. Do you have a link? >>> >>> Also noticed isSplittable in >>> org.apache.hadoop.mapreduce.lib.input.TextInputFormat which checks for >>> org.apache.hadoop.io.compress.SplittableCompressionCodec. Maybe there >>> are some way to tell it not to split? >>> >>> On Tue, Jun 14, 2016 at 2:42 PM, Sean Owen <so...@cloudera.com> wrote: >>>> It really sounds like the line is being split across partitions. This >>>> is what TextInputFormat does but should be perfectly capable of >>>> putting together lines that break across files (partitions). If you're >>>> into debugging, that's where I would start if you can. Breakpoints >>>> around how TextInputFormat is parsing lines. See if you can catch it >>>> when it returns a line that doesn't contain what you expect. >>>> >>>> On Tue, Jun 14, 2016 at 1:38 PM, Kristoffer Sjögren <sto...@gmail.com> >>>> wrote: >>>>> That's funny. The line after is the rest of the whole line that got >>>>> split in half. Every following lines after that are fine. >>>>> >>>>> I managed to reproduce without gzip also so maybe it's no gzip's fault >>>>> after all.. >>>>> >>>>> I'm clueless... >>>>> >>>>> On Tue, Jun 14, 2016 at 12:53 PM, Kristoffer Sjögren <sto...@gmail.com> >>>>> wrote: >>>>>> Seems like it's the gzip. It works if download the file, gunzip and >>>>>> put it back to another directory and read it the same way. >>>>>> >>>>>> Hm.. I wonder what happens with the lines after it.. >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Jun 14, 2016 at 11:52 AM, Sean Owen <so...@cloudera.com> wrote: >>>>>>> What if you read it uncompressed from HDFS? >>>>>>> gzip compression is unfriendly to MR in that it can't split the file. >>>>>>> It still should just work, certainly if the line is in one file. But, >>>>>>> a data point worth having. >>>>>>> >>>>>>> On Tue, Jun 14, 2016 at 10:49 AM, Kristoffer Sjögren <sto...@gmail.com> >>>>>>> wrote: >>>>>>>> The line is in one file. I did download the file manually from HDFS, >>>>>>>> read and decoded it line-by-line successfully without Spark. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Jun 14, 2016 at 11:44 AM, Sean Owen <so...@cloudera.com> wrote: >>>>>>>>> The only thing I can think of is that a line is being broken across >>>>>>>>> two files? >>>>>>>>> Hadoop easily puts things back together in this case, or should. There >>>>>>>>> could be some weird factor preventing that. One first place to look: >>>>>>>>> are you using a weird line separator? or at least different from the >>>>>>>>> host OS? >>>>>>>>> >>>>>>>>> On Tue, Jun 14, 2016 at 10:41 AM, Kristoffer Sjögren >>>>>>>>> <sto...@gmail.com> wrote: >>>>>>>>>> I should mention that we're in the end want to store the input from >>>>>>>>>> Protobuf binary to Parquet using the following code. But t
Re: Spark corrupts text lines
Thanks for you help. Really appreciate it! Give me some time i'll come back after I've tried your suggestions. On Tue, Jun 14, 2016 at 3:28 PM, Kristoffer Sjögren <sto...@gmail.com> wrote: > I cannot reproduce it by running the file through Spark in local mode > on my machine. So it does indeed seems to be something related to > split across partitions. > > On Tue, Jun 14, 2016 at 3:04 PM, Kristoffer Sjögren <sto...@gmail.com> wrote: >> Can you do remote debugging in Spark? Didn't know that. Do you have a link? >> >> Also noticed isSplittable in >> org.apache.hadoop.mapreduce.lib.input.TextInputFormat which checks for >> org.apache.hadoop.io.compress.SplittableCompressionCodec. Maybe there >> are some way to tell it not to split? >> >> On Tue, Jun 14, 2016 at 2:42 PM, Sean Owen <so...@cloudera.com> wrote: >>> It really sounds like the line is being split across partitions. This >>> is what TextInputFormat does but should be perfectly capable of >>> putting together lines that break across files (partitions). If you're >>> into debugging, that's where I would start if you can. Breakpoints >>> around how TextInputFormat is parsing lines. See if you can catch it >>> when it returns a line that doesn't contain what you expect. >>> >>> On Tue, Jun 14, 2016 at 1:38 PM, Kristoffer Sjögren <sto...@gmail.com> >>> wrote: >>>> That's funny. The line after is the rest of the whole line that got >>>> split in half. Every following lines after that are fine. >>>> >>>> I managed to reproduce without gzip also so maybe it's no gzip's fault >>>> after all.. >>>> >>>> I'm clueless... >>>> >>>> On Tue, Jun 14, 2016 at 12:53 PM, Kristoffer Sjögren <sto...@gmail.com> >>>> wrote: >>>>> Seems like it's the gzip. It works if download the file, gunzip and >>>>> put it back to another directory and read it the same way. >>>>> >>>>> Hm.. I wonder what happens with the lines after it.. >>>>> >>>>> >>>>> >>>>> On Tue, Jun 14, 2016 at 11:52 AM, Sean Owen <so...@cloudera.com> wrote: >>>>>> What if you read it uncompressed from HDFS? >>>>>> gzip compression is unfriendly to MR in that it can't split the file. >>>>>> It still should just work, certainly if the line is in one file. But, >>>>>> a data point worth having. >>>>>> >>>>>> On Tue, Jun 14, 2016 at 10:49 AM, Kristoffer Sjögren <sto...@gmail.com> >>>>>> wrote: >>>>>>> The line is in one file. I did download the file manually from HDFS, >>>>>>> read and decoded it line-by-line successfully without Spark. >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Jun 14, 2016 at 11:44 AM, Sean Owen <so...@cloudera.com> wrote: >>>>>>>> The only thing I can think of is that a line is being broken across >>>>>>>> two files? >>>>>>>> Hadoop easily puts things back together in this case, or should. There >>>>>>>> could be some weird factor preventing that. One first place to look: >>>>>>>> are you using a weird line separator? or at least different from the >>>>>>>> host OS? >>>>>>>> >>>>>>>> On Tue, Jun 14, 2016 at 10:41 AM, Kristoffer Sjögren >>>>>>>> <sto...@gmail.com> wrote: >>>>>>>>> I should mention that we're in the end want to store the input from >>>>>>>>> Protobuf binary to Parquet using the following code. But this comes >>>>>>>>> after the lines has been decoded from base64 into binary. >>>>>>>>> >>>>>>>>> >>>>>>>>> public static void save(JavaRDD rdd, Class >>>>>>>>> clazz, String path) { >>>>>>>>> try { >>>>>>>>> Job job = Job.getInstance(); >>>>>>>>> ParquetOutputFormat.setWriteSupportClass(job, >>>>>>>>> ProtoWriteSupport.class); >>>>>>>>> ProtoParquetOutputFormat.setProtobufClass(job, clazz); >>>>>>>>> rdd.mapToPair(order -> new Tuple2<>(null, order)) >>>>>>>>> .saveAsNewAPIHadoopFile(path
Re: Spark corrupts text lines
I cannot reproduce it by running the file through Spark in local mode on my machine. So it does indeed seems to be something related to split across partitions. On Tue, Jun 14, 2016 at 3:04 PM, Kristoffer Sjögren <sto...@gmail.com> wrote: > Can you do remote debugging in Spark? Didn't know that. Do you have a link? > > Also noticed isSplittable in > org.apache.hadoop.mapreduce.lib.input.TextInputFormat which checks for > org.apache.hadoop.io.compress.SplittableCompressionCodec. Maybe there > are some way to tell it not to split? > > On Tue, Jun 14, 2016 at 2:42 PM, Sean Owen <so...@cloudera.com> wrote: >> It really sounds like the line is being split across partitions. This >> is what TextInputFormat does but should be perfectly capable of >> putting together lines that break across files (partitions). If you're >> into debugging, that's where I would start if you can. Breakpoints >> around how TextInputFormat is parsing lines. See if you can catch it >> when it returns a line that doesn't contain what you expect. >> >> On Tue, Jun 14, 2016 at 1:38 PM, Kristoffer Sjögren <sto...@gmail.com> wrote: >>> That's funny. The line after is the rest of the whole line that got >>> split in half. Every following lines after that are fine. >>> >>> I managed to reproduce without gzip also so maybe it's no gzip's fault >>> after all.. >>> >>> I'm clueless... >>> >>> On Tue, Jun 14, 2016 at 12:53 PM, Kristoffer Sjögren <sto...@gmail.com> >>> wrote: >>>> Seems like it's the gzip. It works if download the file, gunzip and >>>> put it back to another directory and read it the same way. >>>> >>>> Hm.. I wonder what happens with the lines after it.. >>>> >>>> >>>> >>>> On Tue, Jun 14, 2016 at 11:52 AM, Sean Owen <so...@cloudera.com> wrote: >>>>> What if you read it uncompressed from HDFS? >>>>> gzip compression is unfriendly to MR in that it can't split the file. >>>>> It still should just work, certainly if the line is in one file. But, >>>>> a data point worth having. >>>>> >>>>> On Tue, Jun 14, 2016 at 10:49 AM, Kristoffer Sjögren <sto...@gmail.com> >>>>> wrote: >>>>>> The line is in one file. I did download the file manually from HDFS, >>>>>> read and decoded it line-by-line successfully without Spark. >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Jun 14, 2016 at 11:44 AM, Sean Owen <so...@cloudera.com> wrote: >>>>>>> The only thing I can think of is that a line is being broken across two >>>>>>> files? >>>>>>> Hadoop easily puts things back together in this case, or should. There >>>>>>> could be some weird factor preventing that. One first place to look: >>>>>>> are you using a weird line separator? or at least different from the >>>>>>> host OS? >>>>>>> >>>>>>> On Tue, Jun 14, 2016 at 10:41 AM, Kristoffer Sjögren <sto...@gmail.com> >>>>>>> wrote: >>>>>>>> I should mention that we're in the end want to store the input from >>>>>>>> Protobuf binary to Parquet using the following code. But this comes >>>>>>>> after the lines has been decoded from base64 into binary. >>>>>>>> >>>>>>>> >>>>>>>> public static void save(JavaRDD rdd, Class >>>>>>>> clazz, String path) { >>>>>>>> try { >>>>>>>> Job job = Job.getInstance(); >>>>>>>> ParquetOutputFormat.setWriteSupportClass(job, >>>>>>>> ProtoWriteSupport.class); >>>>>>>> ProtoParquetOutputFormat.setProtobufClass(job, clazz); >>>>>>>> rdd.mapToPair(order -> new Tuple2<>(null, order)) >>>>>>>> .saveAsNewAPIHadoopFile(path, Void.class, clazz, >>>>>>>> ParquetOutputFormat.class, job.getConfiguration()); >>>>>>>> } catch (IOException e) { >>>>>>>> throw new RuntimeException(e); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> org.apache.parquet >>>>>&
Re: Spark corrupts text lines
Can you do remote debugging in Spark? Didn't know that. Do you have a link? Also noticed isSplittable in org.apache.hadoop.mapreduce.lib.input.TextInputFormat which checks for org.apache.hadoop.io.compress.SplittableCompressionCodec. Maybe there are some way to tell it not to split? On Tue, Jun 14, 2016 at 2:42 PM, Sean Owen <so...@cloudera.com> wrote: > It really sounds like the line is being split across partitions. This > is what TextInputFormat does but should be perfectly capable of > putting together lines that break across files (partitions). If you're > into debugging, that's where I would start if you can. Breakpoints > around how TextInputFormat is parsing lines. See if you can catch it > when it returns a line that doesn't contain what you expect. > > On Tue, Jun 14, 2016 at 1:38 PM, Kristoffer Sjögren <sto...@gmail.com> wrote: >> That's funny. The line after is the rest of the whole line that got >> split in half. Every following lines after that are fine. >> >> I managed to reproduce without gzip also so maybe it's no gzip's fault >> after all.. >> >> I'm clueless... >> >> On Tue, Jun 14, 2016 at 12:53 PM, Kristoffer Sjögren <sto...@gmail.com> >> wrote: >>> Seems like it's the gzip. It works if download the file, gunzip and >>> put it back to another directory and read it the same way. >>> >>> Hm.. I wonder what happens with the lines after it.. >>> >>> >>> >>> On Tue, Jun 14, 2016 at 11:52 AM, Sean Owen <so...@cloudera.com> wrote: >>>> What if you read it uncompressed from HDFS? >>>> gzip compression is unfriendly to MR in that it can't split the file. >>>> It still should just work, certainly if the line is in one file. But, >>>> a data point worth having. >>>> >>>> On Tue, Jun 14, 2016 at 10:49 AM, Kristoffer Sjögren <sto...@gmail.com> >>>> wrote: >>>>> The line is in one file. I did download the file manually from HDFS, >>>>> read and decoded it line-by-line successfully without Spark. >>>>> >>>>> >>>>> >>>>> On Tue, Jun 14, 2016 at 11:44 AM, Sean Owen <so...@cloudera.com> wrote: >>>>>> The only thing I can think of is that a line is being broken across two >>>>>> files? >>>>>> Hadoop easily puts things back together in this case, or should. There >>>>>> could be some weird factor preventing that. One first place to look: >>>>>> are you using a weird line separator? or at least different from the >>>>>> host OS? >>>>>> >>>>>> On Tue, Jun 14, 2016 at 10:41 AM, Kristoffer Sjögren <sto...@gmail.com> >>>>>> wrote: >>>>>>> I should mention that we're in the end want to store the input from >>>>>>> Protobuf binary to Parquet using the following code. But this comes >>>>>>> after the lines has been decoded from base64 into binary. >>>>>>> >>>>>>> >>>>>>> public static void save(JavaRDD rdd, Class >>>>>>> clazz, String path) { >>>>>>> try { >>>>>>> Job job = Job.getInstance(); >>>>>>> ParquetOutputFormat.setWriteSupportClass(job, >>>>>>> ProtoWriteSupport.class); >>>>>>> ProtoParquetOutputFormat.setProtobufClass(job, clazz); >>>>>>> rdd.mapToPair(order -> new Tuple2<>(null, order)) >>>>>>> .saveAsNewAPIHadoopFile(path, Void.class, clazz, >>>>>>> ParquetOutputFormat.class, job.getConfiguration()); >>>>>>> } catch (IOException e) { >>>>>>> throw new RuntimeException(e); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> org.apache.parquet >>>>>>> parquet-protobuf >>>>>>> 1.8.1 >>>>>>> >>>>>>> >>>>>>> On Tue, Jun 14, 2016 at 11:37 AM, Kristoffer Sjögren <sto...@gmail.com> >>>>>>> wrote: >>>>>>>> I'm trying to figure out exactly what information could be useful but >>>>>>>> it's all as straight forward. >>>>>>>> >>>>>>>> - It's text files >>>>>>>> - Lines ends
Re: Spark corrupts text lines
That's funny. The line after is the rest of the whole line that got split in half. Every following lines after that are fine. I managed to reproduce without gzip also so maybe it's no gzip's fault after all.. I'm clueless... On Tue, Jun 14, 2016 at 12:53 PM, Kristoffer Sjögren <sto...@gmail.com> wrote: > Seems like it's the gzip. It works if download the file, gunzip and > put it back to another directory and read it the same way. > > Hm.. I wonder what happens with the lines after it.. > > > > On Tue, Jun 14, 2016 at 11:52 AM, Sean Owen <so...@cloudera.com> wrote: >> What if you read it uncompressed from HDFS? >> gzip compression is unfriendly to MR in that it can't split the file. >> It still should just work, certainly if the line is in one file. But, >> a data point worth having. >> >> On Tue, Jun 14, 2016 at 10:49 AM, Kristoffer Sjögren <sto...@gmail.com> >> wrote: >>> The line is in one file. I did download the file manually from HDFS, >>> read and decoded it line-by-line successfully without Spark. >>> >>> >>> >>> On Tue, Jun 14, 2016 at 11:44 AM, Sean Owen <so...@cloudera.com> wrote: >>>> The only thing I can think of is that a line is being broken across two >>>> files? >>>> Hadoop easily puts things back together in this case, or should. There >>>> could be some weird factor preventing that. One first place to look: >>>> are you using a weird line separator? or at least different from the >>>> host OS? >>>> >>>> On Tue, Jun 14, 2016 at 10:41 AM, Kristoffer Sjögren <sto...@gmail.com> >>>> wrote: >>>>> I should mention that we're in the end want to store the input from >>>>> Protobuf binary to Parquet using the following code. But this comes >>>>> after the lines has been decoded from base64 into binary. >>>>> >>>>> >>>>> public static void save(JavaRDD rdd, Class >>>>> clazz, String path) { >>>>> try { >>>>> Job job = Job.getInstance(); >>>>> ParquetOutputFormat.setWriteSupportClass(job, >>>>> ProtoWriteSupport.class); >>>>> ProtoParquetOutputFormat.setProtobufClass(job, clazz); >>>>> rdd.mapToPair(order -> new Tuple2<>(null, order)) >>>>> .saveAsNewAPIHadoopFile(path, Void.class, clazz, >>>>> ParquetOutputFormat.class, job.getConfiguration()); >>>>> } catch (IOException e) { >>>>> throw new RuntimeException(e); >>>>> } >>>>> } >>>>> >>>>> >>>>> >>>>> >>>>> org.apache.parquet >>>>> parquet-protobuf >>>>> 1.8.1 >>>>> >>>>> >>>>> On Tue, Jun 14, 2016 at 11:37 AM, Kristoffer Sjögren <sto...@gmail.com> >>>>> wrote: >>>>>> I'm trying to figure out exactly what information could be useful but >>>>>> it's all as straight forward. >>>>>> >>>>>> - It's text files >>>>>> - Lines ends with a new line character. >>>>>> - Files are gzipped before added to HDFS >>>>>> - Files are read as gzipped files from HDFS by Spark >>>>>> - There are some extra configuration >>>>>> >>>>>> conf.set("spark.files.overwrite", "true"); >>>>>> conf.set("spark.hadoop.validateOutputSpecs", "false"); >>>>>> >>>>>> Here's the code using Java 8 Base64 class. >>>>>> >>>>>> context.textFile("/log.gz") >>>>>> .map(line -> line.split("=")) >>>>>> .map(split -> Base64.getDecoder().decode(split[0])); >>>>>> >>>>>> >>>>>> On Tue, Jun 14, 2016 at 11:26 AM, Sean Owen <so...@cloudera.com> wrote: >>>>>>> It's really the MR InputSplit code that splits files into records. >>>>>>> Nothing particularly interesting happens in that process, except for >>>>>>> breaking on newlines. >>>>>>> >>>>>>> Do you have one huge line in the file? are you reading as a text file? >>>>>>> can you give any more detail about exactly how you parse it? it could >>>>>>> be something else in your code. >
Spark corrupts text lines
Hi We have log files that are written in base64 encoded text files (gzipped) where each line is ended with a new line character. For some reason a particular line [1] is split by Spark [2] making it unparsable by the base64 decoder. It does this consequently no matter if I gives it the particular file that contain the line or a bunch of files. I know the line is not corrupt because I can manually download the file from HDFS, gunzip it and read/decode all the lines without problems. Was thinking that maybe there is a limit to number of characters per line but that doesn't sound right? Maybe the combination of characters makes Spark think it's new line? I'm clueless. Cheers, -Kristoffer [1] Original line: CsAJCtwGCghwYWdlVmlldxC4PhjM1v66BSJFaHR0cDovL25hLnNlL3Nwb3J0ZW4vc3BvcnR0dC8xLjM5MTU5MjEtdXBwZ2lmdGVyLXNtZWRlcm5hLW1vdC1rb25rdXJzKjhVcHBnaWZ0ZXI6IFNtZWRlcm5hIG1vdCBrb25rdXJzIC0gU3BvcnQgKFRUKSAtIHd3dy5uYS5zZTJXaHR0cDovL25hLnNlL255aGV0ZXIvb3JlYnJvLzEuMzk2OTU0My1rcnlwaGFsLW9wcG5hci1mb3Itb2JvLWF0dC1iZWhhbGxhLXRqYW5zdGViaWxhcm5hOqECcWdrZWplNGpmLHBkMzBmdDRuNCxwbHB0b3JqNncscGxwczBvamZvLHBkYjVrZGM4eCxwbHBzN293Y3UscGE0czN1bXp5LHBhNHJla25sNyxwYTRyd3dxam4scGE0c21ra2Z4LHBkM2tpa3BmMixwZDNqcjE5dGMscGQ0ZGQ0M2F3LHAwZ3MwbmlqMSxwYTRvZTNrbXoscGE0cWJ3eDZxLHBkM2s2NW00dyxwYTRyazc3Z3IscGQzMHAzdW8wLHBkNGM1ajV5dixwbHB0c211NmcscGM3bXNibmM5LHBhNHFpaTdsZCxwbHB0dnpqdnUscGE0bmlsdmFnLHBhNHB6cjN2cyxwZDNsZDNnYmkscGl1a2M2NmVlLHB5dHoyOThzNErIAgoTNzI0NTY2NzU0MzQxNTUyOTQ4ORAAGAAioQJxZ2tlamU0amYscGQzMGZ0NG40LHBscHRvcmo2dyxwbHBzMG9qZm8scGRiNWtkYzh4LHBscHM3b3djdSxwYTRzM3VtenkscGE0cmVrbmw3LHBhNHJ3d3FqbixwYTRzbWtrZngscGQza2lrcGYyLHBkM2pyMTl0YyxwZDRkZDQzYXcscDBnczBuaWoxLHBhNG9lM2tteixwYTRxYnd4NnEscGQzazY1bTR3LHBhNHJrNzdncixwZDMwcDN1bzAscGQ0YzVqNXl2LHBscHRzbXU2ZyxwYzdtc2JuYzkscGE0cWlpN2xkLHBscHR2emp2dSxwYTRuaWx2YWcscGE0cHpyM3ZzLHBkM2xkM2diaSxwaXVrYzY2ZWUscHl0ejI5OHM0KgkzOTUxLDM5NjAS3gIIxNjxhJTVsJcVEqUBTW96aWxsYS81LjAgKExpbnV4OyBBbmRyb2lkIDUuMS4xOyBTQU1TVU5HIFNNLUczODhGIEJ1aWxkL0xNWTQ4QikgQXBwbGVXZWJLaXQvNTM3LjM2IChLSFRNTCwgbGlrZSBHZWNrbykgU2Ftc3VuZ0Jyb3dzZXIvMy4zIENocm9tZS8zOC4wLjIxMjUuMTAyIE1vYmlsZSBTYWZhcmkvNTM3LjM2IjUKDDYyLjIwLjE5Ni44MBWgd3NBHRgibUIiAlNFKgfDlnJlYnJvMg5UZWxpYVNvbmVyYSBBQigAMdejcD0K1+s/OABCCAiAAhWamRlAQgcIURUAAOBAQggIlAEVzczMP0IHCFQVmpkJQUIICJYBFTMzE0BCBwhYFZqZ+UBCCAj6ARWamdk/QggImwEVzcysQEoHCAYVO6ysPkoHCAQVRYO4PkoHCAEVIg0APw===1465887564 [2] Line as spark hands it over: CsAJCtwGCghwYWdlVmlldxC4PhjM1v66BSJFaHR0cDovL25hLnNlL3Nwb3J0ZW4vc3BvcnR0dC8xLjM5MTU5MjEtdXBwZ2lmdGVyLXNtZWRlcm5hLW1vdC1rb25rdXJzKjhVcHBnaWZ0ZXI6IFNtZWRlcm5hIG1vdCBrb25rdXJzIC0gU3BvcnQgKFRUKSAtIHd3dy5uYS5zZTJXaHR0cDovL25hLnNlL255aGV0ZXIvb3JlYnJvLzEuMzk2OTU0My1rcnlwaGFsLW9wcG5hci1mb3Itb2JvLWF0dC1iZWhhbGxhLXRqYW5zdGViaWxhcm5hOqECcWdrZWplNGpmLHBkMzBmdDRuNCxwbHB0b3JqNncscGxwczBvamZvLHBkYjVrZGM4eCxwbHBzN293Y3UscGE0czN1bXp5LHBhNHJla25sNyxwYTRyd3dxam4scGE0c21ra2Z4LHBkM2tpa3BmMixwZDNqcjE5dGMscGQ0ZGQ0M2F3LHAwZ3MwbmlqMSxwYTRvZTNrbXoscGE0cWJ3eDZxLHBkM2s2NW00dyxwYTRyazc3Z3IscGQzMHAzdW8wLHBkNGM1ajV5dixwbHB0c211NmcscGM3bXNibmM5LHBhNHFpaTdsZCxwbHB0dnpqdnUscGE0bmlsdmFnLHBhNHB6cjN2cyxwZDNsZDNnYmkscGl1a2M2NmVlLHB5dHoyOThzNErIAgoTNzI0NTY2NzU0MzQxNTUyOTQ4ORAAGAAioQJxZ2tlamU0amYscGQzMGZ0NG40LHBscHRvcmo2dyxwbHBzMG9qZm8scGRiNWtkYzh4LHBscHM3b3djdSxwYTRzM3VtenkscGE0cmVrbmw3LHBhNHJ3d3FqbixwYTRzbWtrZngscGQza2lrcGYyLHBkM2pyMTl0YyxwZDRkZDQzYXcscDBnczBuaWoxLHBhNG9lM2tteixwYTRxYnd4NnEscGQzazY1bTR3LHBhNHJrNzdncixwZDMwcDN1bzAscGQ0YzVqNXl2LHBscHRzbXU2ZyxwYzdtc2JuYzkscGE0cWlpN2xkLHBscHR2emp2dSxwYTRuaWx2YWcscGE0 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark and HBase RDD join/get
Hi We have a RDD that needs to be mapped with information from HBase, where the exact key is the user id. What's the different alternatives for doing this? - Is it possible to do HBase.get() requests from a map function in Spark? - Or should we join RDDs with all full HBase table scan? I ask because full table scans feels inefficient, especially if the input RDD is really small compared to the full table. But I realize that a full table scan may not be what happens in reality? Cheers, -Kristoffer - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark and HBase RDD join/get
Thanks Ted! On Thu, Jan 14, 2016 at 4:49 PM, Ted Yu <yuzhih...@gmail.com> wrote: > For #1, yes it is possible. > > You can find some example in hbase-spark module of hbase where hbase as > DataSource is provided. > e.g. > > https://github.com/apache/hbase/blob/master/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala > > Cheers > > On Thu, Jan 14, 2016 at 5:04 AM, Kristoffer Sjögren <sto...@gmail.com> > wrote: >> >> Hi >> >> We have a RDD that needs to be mapped with information from >> HBase, where the exact key is the user id. >> >> What's the different alternatives for doing this? >> >> - Is it possible to do HBase.get() requests from a map function in Spark? >> - Or should we join RDDs with all full HBase table scan? >> >> I ask because full table scans feels inefficient, especially if the >> input RDD is really small compared to the full table. But I >> realize that a full table scan may not be what happens in reality? >> >> Cheers, >> -Kristoffer >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Use TCP client for id lookup
Hi I'm trying to understand how to lookup certain id fields of RDDs to an external mapping table. The table is accessed through a two-way binary tcp client where an id is provided and entry returned. Entries cannot be listed/scanned. What's the simplest way of managing the tcp client and its connections towards the external system? I suppose I cannot use the tcp client in a mapToPair() call? Cheers, -Kristoffer - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Java 8 lambdas
Hi Is there a way to execute spark jobs with Java 8 lambdas instead of using anonymous inner classes as seen in the examples? I think I remember seeing real lambdas in the examples before and in articles [1]? Cheers, -Kristoffer [1] http://blog.cloudera.com/blog/2014/04/making-apache-spark-easier-to-use-in-java-with-java-8 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Job aborted due to stage failure: Master removed our application: FAILED
Hi I have trouble executing a really simple Java job on spark 1.0.0-cdh5.1.0 that runs inside a docker container: SparkConf sparkConf = new SparkConf().setAppName(TestApplication).setMaster(spark://localhost:7077); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaRDDString lines = ctx.textFile(hdfs://localhost:8020/tmp/hbase-env.sh, 1); Everything seems fine until the job is handed over to the worker. Seems like the job is rejected for some reason but the logs are _very_ quiet, giving no clue of what's wrong, making it hard to find the source of the problem. I have attached the logs from the client, master and worker. Any pointers? Cheers, -Kristoffer ala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) spark-master.out Description: Binary data spark-worker.out Description: Binary data - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark and Java 8
Running Hadoop and HDFS on unsupported JVM runtime sounds a little adventurous. But as long as Spark can run in a separate Java 8 runtime it's all good. I think having lambdas and type inference is huge when writing these jobs and using Scala (paying the price of complexity, poor tooling etc etc) for this tiny feature is often not justified. On Wed, May 7, 2014 at 2:03 AM, Dean Wampler deanwamp...@gmail.com wrote: Cloudera customers will need to put pressure on them to support Java 8. They only officially supported Java 7 when Oracle stopped supporting Java 6. dean On Wed, May 7, 2014 at 5:05 AM, Matei Zaharia matei.zaha...@gmail.comwrote: Java 8 support is a feature in Spark, but vendors need to decide for themselves when they’d like support Java 8 commercially. You can still run Spark on Java 7 or 6 without taking advantage of the new features (indeed our builds are always against Java 6). Matei On May 6, 2014, at 8:59 AM, Ian O'Connell i...@ianoconnell.com wrote: I think the distinction there might be they never said they ran that code under CDH5, just that spark supports it and spark runs under CDH5. Not that you can use these features while running under CDH5. They could use mesos or the standalone scheduler to run them On Tue, May 6, 2014 at 6:16 AM, Kristoffer Sjögren sto...@gmail.comwrote: Hi I just read an article [1] about Spark, CDH5 and Java 8 but did not get exactly how Spark can run Java 8 on a YARN cluster at runtime. Is Spark using a separate JVM that run on data nodes or is it reusing the YARN JVM runtime somehow, like hadoop1? CDH5 only supports Java 7 [2] as far as I know? Cheers, -Kristoffer [1] http://blog.cloudera.com/blog/2014/04/making-apache-spark-easier-to-use-in-java-with-java-8/ [2] http://www.cloudera.com/content/cloudera-content/cloudera-docs/CDH5/latest/CDH5-Requirements-and-Supported-Versions/CDH5-Requirements-and-Supported-Versions.html -- Dean Wampler, Ph.D. Typesafe @deanwampler http://typesafe.com http://polyglotprogramming.com
Spark and Java 8
Hi I just read an article [1] about Spark, CDH5 and Java 8 but did not get exactly how Spark can run Java 8 on a YARN cluster at runtime. Is Spark using a separate JVM that run on data nodes or is it reusing the YARN JVM runtime somehow, like hadoop1? CDH5 only supports Java 7 [2] as far as I know? Cheers, -Kristoffer [1] http://blog.cloudera.com/blog/2014/04/making-apache-spark-easier-to-use-in-java-with-java-8/ [2] http://www.cloudera.com/content/cloudera-content/cloudera-docs/CDH5/latest/CDH5-Requirements-and-Supported-Versions/CDH5-Requirements-and-Supported-Versions.html