Re: dataset sort

2018-02-09 Thread Fabian Hueske
The reason why this isn't working in Flink are that

* a file can only be written by a single process
* Flink does not support merging of sorted network partitions but reads
round-robin from incoming network channels.

I think if you sort the historic data in parallel (without range
partitioning, i.e., randomly partitioned) and write it out in multiple
files, you could implement a source function that reads all files in
parallel and generates ascending watermarks.
It would be important that you have as many parallel source tasks as you
have files to ensure that watermarks are properly generated. Apart from
that, this should result in a nicely sorted stream.
The watermark handling of the DataStream API will take care to "merge" the
sorted files.

Best, Fabian


2018-02-09 16:23 GMT+01:00 david westwood :

> Thanks.
>
> I have to stream in the historical data and its out-of-boundedness >>
> real-time data. I thought there was some elegant way using mapPartition
> that I wasn't seeing.
>
> On Fri, Feb 9, 2018 at 5:10 AM, Fabian Hueske  wrote:
>
>> You can also partition by range and sort and write each partition. Once
>> all partitions have been written to files, you can concatenate the files.
>> As Till said it is not possible to sort in parallel and write in order to
>> a single file.
>>
>> Best, Fabian
>>
>> 2018-02-09 10:35 GMT+01:00 Till Rohrmann :
>>
>>> Hi David,
>>>
>>> Flink only supports sorting within partitions. Thus, if you want to
>>> write out a globally sorted dataset you should set the parallelism to 1
>>> which effectively results in a single partition. Decreasing the
>>> parallelism of an operator will cause the individual partitions to lose its
>>> sort order because the individual partitions are read in a non
>>> deterministic order.
>>>
>>> Cheers,
>>> Till
>>>
>>>
>>> On Thu, Feb 8, 2018 at 8:07 PM, david westwood <
>>> david.d.westw...@gmail.com> wrote:
>>>
 Hi:

 I would like to sort historical data using the dataset api.

 env.setParallelism(10)

 val dataset = [(Long, String)] ..
 .paritionByRange(_._1)
 .sortPartition(_._1, Order.ASCEDING)
 .writeAsCsv("mydata.csv").setParallelism(1)

 the data is out of order (in local order)
 but
 .print()
 prints the data in to correct order. I have run a small toy sample
 multiple times.

 Is there a way to sort the entire dataset with parallelism > 1 and
 write it to a single file in ascending order?

>>>
>>>
>>
>


Re: dataset sort

2018-02-09 Thread david westwood
Thanks.

I have to stream in the historical data and its out-of-boundedness >>
real-time data. I thought there was some elegant way using mapPartition
that I wasn't seeing.

On Fri, Feb 9, 2018 at 5:10 AM, Fabian Hueske  wrote:

> You can also partition by range and sort and write each partition. Once
> all partitions have been written to files, you can concatenate the files.
> As Till said it is not possible to sort in parallel and write in order to
> a single file.
>
> Best, Fabian
>
> 2018-02-09 10:35 GMT+01:00 Till Rohrmann :
>
>> Hi David,
>>
>> Flink only supports sorting within partitions. Thus, if you want to write
>> out a globally sorted dataset you should set the parallelism to 1 which
>> effectively results in a single partition. Decreasing the parallelism of
>> an operator will cause the individual partitions to lose its sort order
>> because the individual partitions are read in a non deterministic order.
>>
>> Cheers,
>> Till
>>
>>
>> On Thu, Feb 8, 2018 at 8:07 PM, david westwood <
>> david.d.westw...@gmail.com> wrote:
>>
>>> Hi:
>>>
>>> I would like to sort historical data using the dataset api.
>>>
>>> env.setParallelism(10)
>>>
>>> val dataset = [(Long, String)] ..
>>> .paritionByRange(_._1)
>>> .sortPartition(_._1, Order.ASCEDING)
>>> .writeAsCsv("mydata.csv").setParallelism(1)
>>>
>>> the data is out of order (in local order)
>>> but
>>> .print()
>>> prints the data in to correct order. I have run a small toy sample
>>> multiple times.
>>>
>>> Is there a way to sort the entire dataset with parallelism > 1 and write
>>> it to a single file in ascending order?
>>>
>>
>>
>


Re: dataset sort

2018-02-09 Thread Fabian Hueske
You can also partition by range and sort and write each partition. Once all
partitions have been written to files, you can concatenate the files.
As Till said it is not possible to sort in parallel and write in order to a
single file.

Best, Fabian

2018-02-09 10:35 GMT+01:00 Till Rohrmann :

> Hi David,
>
> Flink only supports sorting within partitions. Thus, if you want to write
> out a globally sorted dataset you should set the parallelism to 1 which
> effectively results in a single partition. Decreasing the parallelism of
> an operator will cause the individual partitions to lose its sort order
> because the individual partitions are read in a non deterministic order.
>
> Cheers,
> Till
>
>
> On Thu, Feb 8, 2018 at 8:07 PM, david westwood  > wrote:
>
>> Hi:
>>
>> I would like to sort historical data using the dataset api.
>>
>> env.setParallelism(10)
>>
>> val dataset = [(Long, String)] ..
>> .paritionByRange(_._1)
>> .sortPartition(_._1, Order.ASCEDING)
>> .writeAsCsv("mydata.csv").setParallelism(1)
>>
>> the data is out of order (in local order)
>> but
>> .print()
>> prints the data in to correct order. I have run a small toy sample
>> multiple times.
>>
>> Is there a way to sort the entire dataset with parallelism > 1 and write
>> it to a single file in ascending order?
>>
>
>


Re: dataset sort

2018-02-09 Thread Till Rohrmann
Hi David,

Flink only supports sorting within partitions. Thus, if you want to write
out a globally sorted dataset you should set the parallelism to 1 which
effectively results in a single partition. Decreasing the parallelism of an
operator will cause the individual partitions to lose its sort order
because the individual partitions are read in a non deterministic order.

Cheers,
Till


On Thu, Feb 8, 2018 at 8:07 PM, david westwood 
wrote:

> Hi:
>
> I would like to sort historical data using the dataset api.
>
> env.setParallelism(10)
>
> val dataset = [(Long, String)] ..
> .paritionByRange(_._1)
> .sortPartition(_._1, Order.ASCEDING)
> .writeAsCsv("mydata.csv").setParallelism(1)
>
> the data is out of order (in local order)
> but
> .print()
> prints the data in to correct order. I have run a small toy sample
> multiple times.
>
> Is there a way to sort the entire dataset with parallelism > 1 and write
> it to a single file in ascending order?
>