The point is that this only works if you already knew the file was
presented in order within and across partitions, which was the
original problem anyway. I don't think it is in general, but in
practice, I do imagine it's already in the expected order from
textFile. Maybe under the hood this ends up being ensured by
TextInputFormat.

So, adding the index and sorting on it doesn't add anything.

On Tue, Sep 22, 2015 at 4:38 PM, Adrian Tanase <atan...@adobe.com> wrote:
> just give zipWithIndex a shot, use it early in the pipeline. I think it
> provides exactly the info you need, as the index is the original line number
> in the file, not the index in the partition.
>
> Sent from my iPhone
>
> On 22 Sep 2015, at 17:50, Philip Weaver <philip.wea...@gmail.com> wrote:
>
> Thanks. If textFile can be used in a way that preserves order, than both the
> partition index and the index within each partition should be consistent,
> right?
>
> I overcomplicated the question by asking about removing duplicates.
> Fundamentally I think my question is, how does one sort lines in a file by
> line number.
>
> On Tue, Sep 22, 2015 at 6:15 AM, Adrian Tanase <atan...@adobe.com> wrote:
>>
>> By looking through the docs and source code, I think you can get away with
>> rdd.zipWithIndex to get the index of each line in the file, as long as you
>> define the parallelism upfront:
>> sc.textFile("README.md", 4)
>>
>> You can then just do .groupBy(…).mapValues(_.sortBy(…).head) - I’m
>> skimming through some tuples, hopefully this is clear enough.
>>
>> -adrian
>>
>> From: Philip Weaver
>> Date: Tuesday, September 22, 2015 at 3:26 AM
>> To: user
>> Subject: Remove duplicate keys by always choosing first in file.
>>
>> I am processing a single file and want to remove duplicate rows by some
>> key by always choosing the first row in the file for that key.
>>
>> The best solution I could come up with is to zip each row with the
>> partition index and local index, like this:
>>
>> rdd.mapPartitionsWithIndex { case (partitionIndex, rows) =>
>>   rows.zipWithIndex.map { case (row, localIndex) => (row.key,
>> ((partitionIndex, localIndex), row)) }
>> }
>>
>>
>> And then using reduceByKey with a min ordering on the (partitionIndex,
>> localIndex) pair.
>>
>> First, can i count on SparkContext.textFile to read the lines in such that
>> the partition indexes are always increasing so that the above works?
>>
>> And, is there a better way to accomplish the same effect?
>>
>> Thanks!
>>
>> - Philip
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to