Re: Ordering of stream from different kafka partitions

2018-06-21 Thread Amol S - iProgrammer
Hello andrey,

Thanks for the help.

I am trying to implement your above given code code

 sourceStream
.setParallelism(4)
.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<>(…) {…})
.windowAll(TumblingEventTimeWindows.of(Time...))
.process(new OrderTheRecords()))

but I am facing issues to write *OrderTheRecords *class as I am new to this
framework can you please suggest me what is optimal way to sort the records?

I have implemented below ProcessWindowFunction code

import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class OrderTheRecords extends ProcessWindowFunction {

  @Override
public void process(Long s, Context context, Iterable
iterable, Collector collector) throws Exception {
for (Oplog oplog : iterable) {
collector.collect(oplog);
}

}
}


public class Oplog {

private OplogTimestamp ts;
private String op;
private BasicDBObject o;

}

here *ts* represents even timestamp.

---
*Amol Suryawanshi*
Java Developer
am...@iprogrammer.com


*iProgrammer Solutions Pvt. Ltd.*



*Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
www.iprogrammer.com 


On Wed, Jun 20, 2018 at 6:51 PM, Andrey Zagrebin 
wrote:

> Hi Amol,
>
> In above code also it will sort the records in specific time window only.
>
>
> All windows will be emitted as watermark passes the end of the window. The
> watermark only increases. So the non-overlapping windows should be also
> sorted by time and as a consequence the records across windows either, if
> this is the concern about sorting records only in a specific time window.
>
> 1. How should I create N consumers dynamically based on partition count?
>
>
> sourceStream.setParallelism(N), each Flink consumer parallel subtask will
> serve one Kafka partition.
>
> 2. Is number of consumers dynamically grows as number of partition
> increased in middle of execution?
>
>
> Dynamically added Kafka partitions will be eventually discovered by Flink
> consumers (flink.partition-discovery.interval-millis) and picked up by
> some consumer. Flink job has be rescaled separately.
>
> Currently parallelism of Flink operator cannot be changed while the job is
> running. The way to go now is to use savepoint/checkpoint, stop the job and
> start the new one with changed parallelism from the
> previous savepoint/checkpoint (see Flink docs). New job will pick up from
> partition offsets of previous job.
>
> 3. How to create partition specific kafka consumer in flink?
>
>
> The partition-consumer assignment is now implementation specific for Flink.
> There is an open issue for custom assignment https://issues.apac
> he.org/jira/browse/FLINK-8570 e.g. if you need specific locality of
> keys/consumers.
>
> I would simply suggest to assign some key to each record and let all
> records for particular key to go into the same Kafka partition. On the
> Flink side if a corresponding keyBy() is applied to the Kafka source, all
> the records for this particular key will go to the same parallel subtask of
> subsequent operator, sorted by time if they were originally sorted in its
> Kafka partition. This is more scalable approach than total global ordering.
>
> Cheers,
> Andrey
>
> On 20 Jun 2018, at 13:17, Amol S - iProgrammer 
> wrote:
>
> Hello Andrey,
>
> In above code also it will sort the records in specific time window only.
> Anyways we agreed to create N number of partitions with N number of
> consumers based on some key as order is maintained per kafka partition.
>
> I have some questions about this.
>
> 1. How should I create N consumers dynamically based on partition count?
> 2. Is number of consumers dynamically grows as number of partition
> increased in middle of execution?
> 3. How to create partition specific kafka consumer in flink?
>
> ---
> *Amol Suryawanshi*
> Java Developer
> am...@iprogrammer.com
>
>
> *iProgrammer Solutions Pvt. Ltd.*
>
>
>
> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
> www.iprogrammer.com 
> 
>
> On Wed, Jun 20, 2018 at 2:38 PM, Andrey Zagrebin  >
> wrote:
>
> Hi,
>
> Good point, sorry for confusion, BoundedOutOfOrdernessTimestampExtractor
> of course does not buffer records, you need to apply windowing
> (e.g. TumblingEventTimeWindows) for that and then sort the window output
> by time and emit records in sorted order.
>
> You 

Re: Ordering of stream from different kafka partitions

2018-06-20 Thread Andrey Zagrebin
Hi Amol,

> In above code also it will sort the records in specific time window only.

All windows will be emitted as watermark passes the end of the window. The 
watermark only increases. So the non-overlapping windows should be also sorted 
by time and as a consequence the records across windows either, if this is the 
concern about sorting records only in a specific time window.

> 1. How should I create N consumers dynamically based on partition count?

sourceStream.setParallelism(N), each Flink consumer parallel subtask will serve 
one Kafka partition.

> 2. Is number of consumers dynamically grows as number of partition
> increased in middle of execution?

Dynamically added Kafka partitions will be eventually discovered by Flink 
consumers (flink.partition-discovery.interval-millis) and picked up by some 
consumer. Flink job has be rescaled separately.

Currently parallelism of Flink operator cannot be changed while the job is 
running. The way to go now is to use savepoint/checkpoint, stop the job and 
start the new one with changed parallelism from the previous 
savepoint/checkpoint (see Flink docs). New job will pick up from partition 
offsets of previous job.

> 3. How to create partition specific kafka consumer in flink?

The partition-consumer assignment is now implementation specific for Flink.
There is an open issue for custom assignment 
https://issues.apache.org/jira/browse/FLINK-8570 
 e.g. if you need specific 
locality of keys/consumers.

I would simply suggest to assign some key to each record and let all records 
for particular key to go into the same Kafka partition. On the Flink side if a 
corresponding keyBy() is applied to the Kafka source, all the records for this 
particular key will go to the same parallel subtask of subsequent operator, 
sorted by time if they were originally sorted in its Kafka partition. This is 
more scalable approach than total global ordering.

Cheers,
Andrey

> On 20 Jun 2018, at 13:17, Amol S - iProgrammer  wrote:
> 
> Hello Andrey,
> 
> In above code also it will sort the records in specific time window only.
> Anyways we agreed to create N number of partitions with N number of
> consumers based on some key as order is maintained per kafka partition.
> 
> I have some questions about this.
> 
> 1. How should I create N consumers dynamically based on partition count?
> 2. Is number of consumers dynamically grows as number of partition
> increased in middle of execution?
> 3. How to create partition specific kafka consumer in flink?
> 
> ---
> *Amol Suryawanshi*
> Java Developer
> am...@iprogrammer.com 
> 
> 
> *iProgrammer Solutions Pvt. Ltd.*
> 
> 
> 
> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
> www.iprogrammer.com   >
> 
> 
> On Wed, Jun 20, 2018 at 2:38 PM, Andrey Zagrebin  >
> wrote:
> 
>> Hi,
>> 
>> Good point, sorry for confusion, BoundedOutOfOrdernessTimestampExtractor
>> of course does not buffer records, you need to apply windowing
>> (e.g. TumblingEventTimeWindows) for that and then sort the window output
>> by time and emit records in sorted order.
>> 
>> You can also use windowAll which already does keyBy((record) -> 0) and
>> makes the stream non-parallel:
>> 
>> sourceStream
>>.setParallelism(4)
>>.assignTimestampsAndWatermarks(new
>> BoundedOutOfOrdernessTimestampExtractor<>(…) {…})
>>.windowAll(TumblingEventTimeWindows.of(Time...))
>>.process(new OrderTheRecords()))
>> 
>> Cheers,
>> Andrey
>> 
>> On 20 Jun 2018, at 10:14, sihua zhou  wrote:
>> 
>> 
>> 
>> Hi,
>> 
>> I think a global ordering is a bit impractical on production, but in
>> theroy, you still can do that. You need to
>> 
>> - Firstly fix the operate's parallelism to 1(except the source node).
>> - If you want to sort the records within a bouned time, then you can
>> keyBy() a constant and window it, buffer the records in the state and sort
>> the records when the window is triggered, the code maybe as follows.
>>{code}
>>sourceStream
>>.setParallelism(4)
>>.assignTimestampsAndWatermarks(
>>new BoundedOutOfOrdernessTimestamp
>> Extractor(Time.milliseconds(3500))
>>{
>>@Override
>>public long extractTimestamp(Event element) {
>>Map timeStamp = (Map) event.get("ts”);
>>return (long) timeStamp.get("value");
>>}
>> })
>>.keyBy((record) -> 0)// keyby the constant value
>>.window(...)
>>.process(new 

Re: Ordering of stream from different kafka partitions

2018-06-20 Thread Amol S - iProgrammer
Hello Andrey,

In above code also it will sort the records in specific time window only.
Anyways we agreed to create N number of partitions with N number of
consumers based on some key as order is maintained per kafka partition.

I have some questions about this.

1. How should I create N consumers dynamically based on partition count?
2. Is number of consumers dynamically grows as number of partition
increased in middle of execution?
3. How to create partition specific kafka consumer in flink?

---
*Amol Suryawanshi*
Java Developer
am...@iprogrammer.com


*iProgrammer Solutions Pvt. Ltd.*



*Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
www.iprogrammer.com 


On Wed, Jun 20, 2018 at 2:38 PM, Andrey Zagrebin 
wrote:

> Hi,
>
> Good point, sorry for confusion, BoundedOutOfOrdernessTimestampExtractor
> of course does not buffer records, you need to apply windowing
> (e.g. TumblingEventTimeWindows) for that and then sort the window output
> by time and emit records in sorted order.
>
> You can also use windowAll which already does keyBy((record) -> 0) and
> makes the stream non-parallel:
>
>  sourceStream
> .setParallelism(4)
> .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor<>(…) {…})
> .windowAll(TumblingEventTimeWindows.of(Time...))
> .process(new OrderTheRecords()))
>
> Cheers,
> Andrey
>
> On 20 Jun 2018, at 10:14, sihua zhou  wrote:
>
>
>
> Hi,
>
> I think a global ordering is a bit impractical on production, but in
> theroy, you still can do that. You need to
>
> - Firstly fix the operate's parallelism to 1(except the source node).
> - If you want to sort the records within a bouned time, then you can
> keyBy() a constant and window it, buffer the records in the state and sort
> the records when the window is triggered, the code maybe as follows.
> {code}
> sourceStream
> .setParallelism(4)
> .assignTimestampsAndWatermarks(
> new BoundedOutOfOrdernessTimestamp
> Extractor(Time.milliseconds(3500))
> {
> @Override
> public long extractTimestamp(Event element) {
> Map timeStamp = (Map) event.get("ts”);
> return (long) timeStamp.get("value");
> }
>  })
> .keyBy((record) -> 0)// keyby the constant value
> .window(...)
> .process(new OrderTheRecords()))
> .setParallelism(1);
> {code}
>
> - If you want to sort the records truly globally(non-window), then you
> could keyBy a constant, store the records in the state and sort the records
> in the process() function for every incoming record. And if you want a
> perfect correct output, then maybe you need to do retraction (because every
> incoming records may change the globally order), the code maybe as follows
> {code}
> sourceStream
> .setParallelism(4)
> .keyBy((record) -> 0) // keyby the constant value
> .process(new OrderTheRecords()))
> .setParallelism(1);
> {code}
>
>
> In all the case, you need to fix the parallelism of the OrderTheRecord
> operate to 1, which makes your job non-scale-able and becomes the
> bottleneck. So a global ordering maybe not practical on production (but if
> the source's TPS is very low, then maybe practical).
>
> Best, Sihua
>
> On 06/20/2018 15:36,Amol S - iProgrammer
>  wrote:
>
> Hello Andrey,
>
> Thanks for your quick response. I have tried with your above code but it
> didn't suit's my requirement. I need global ordering of my records by using
> multiple kafka partitions. Please suggest me any workaround for this. as
> mentioned in this
>  and+Order+in+Streams>
> link is it possible to buffer data for some amount of time and then perform
> sort on this or any other way out there?
>
> ---
> *Amol Suryawanshi*
> Java Developer
> am...@iprogrammer.com
>
>
> *iProgrammer Solutions Pvt. Ltd.*
>
>
>
> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
> www.iprogrammer.com 
> 
>
> On Tue, Jun 19, 2018 at 10:19 PM, Andrey Zagrebin <
> and...@data-artisans.com>
> wrote:
>
> Hi Amol,
>
> I think you could try (based on your stack overflow code)
> org.apache.flink.streaming.api.functions.timestamps.
> BoundedOutOfOrdernessTimestampExtractor
> like this:
>
> DataStream streamSource = env
> .addSource(kafkaConsumer)
> 

Re: Ordering of stream from different kafka partitions

2018-06-20 Thread Andrey Zagrebin
Hi,

Good point, sorry for confusion, BoundedOutOfOrdernessTimestampExtractor of 
course does not buffer records, you need to apply windowing (e.g. 
TumblingEventTimeWindows) for that and then sort the window output by time and 
emit records in sorted order.

You can also use windowAll which already does keyBy((record) -> 0) and makes 
the stream non-parallel:

 sourceStream
.setParallelism(4)
.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor<>(…) {…})
.windowAll(TumblingEventTimeWindows.of(Time...))
.process(new OrderTheRecords()))

Cheers,
Andrey

> On 20 Jun 2018, at 10:14, sihua zhou  wrote:
> 
> 
> 
> Hi,
> 
> I think a global ordering is a bit impractical on production, but in theroy, 
> you still can do that. You need to
> 
> - Firstly fix the operate's parallelism to 1(except the source node).
> - If you want to sort the records within a bouned time, then you can keyBy() 
> a constant and window it, buffer the records in the state and sort the 
> records when the window is triggered, the code maybe as follows.
> {code}
> sourceStream
> .setParallelism(4)
> .assignTimestampsAndWatermarks(
> new 
> BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(3500))
> {
> @Override
> public long extractTimestamp(Event element) {
> Map timeStamp = (Map) event.get("ts”);
> return (long) timeStamp.get("value");
> }
>  })
> .keyBy((record) -> 0)// keyby the constant value
> .window(...)
> .process(new OrderTheRecords()))
> .setParallelism(1);
> {code}
> 
> - If you want to sort the records truly globally(non-window), then you could 
> keyBy a constant, store the records in the state and sort the records in the 
> process() function for every incoming record. And if you want a perfect 
> correct output, then maybe you need to do retraction (because every incoming 
> records may change the globally order), the code maybe as follows
> {code}
> sourceStream
> .setParallelism(4)
> .keyBy((record) -> 0) // keyby the constant value
> .process(new OrderTheRecords()))
> .setParallelism(1);
> {code}
> 
> 
> In all the case, you need to fix the parallelism of the OrderTheRecord 
> operate to 1, which makes your job non-scale-able and becomes the bottleneck. 
> So a global ordering maybe not practical on production (but if the source's 
> TPS is very low, then maybe practical).
> 
> Best, Sihua
> 
> On 06/20/2018 15:36,Amol S - iProgrammer 
>  wrote: 
> Hello Andrey,
> 
> Thanks for your quick response. I have tried with your above code but it
> didn't suit's my requirement. I need global ordering of my records by using
> multiple kafka partitions. Please suggest me any workaround for this. as
> mentioned in this
>  >
> link is it possible to buffer data for some amount of time and then perform
> sort on this or any other way out there?
> 
> ---
> *Amol Suryawanshi*
> Java Developer
> am...@iprogrammer.com
> 
> 
> *iProgrammer Solutions Pvt. Ltd.*
> 
> 
> 
> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
> www.iprogrammer.com   >
> 
> 
> On Tue, Jun 19, 2018 at 10:19 PM, Andrey Zagrebin  >
> wrote:
> 
> Hi Amol,
> 
> I think you could try (based on your stack overflow code)
> org.apache.flink.streaming.api.functions.timestamps.
> BoundedOutOfOrdernessTimestampExtractor
> like this:
> 
> DataStream streamSource = env
> .addSource(kafkaConsumer)
> .setParallelism(4)
> .assignTimestampsAndWatermarks(
> new BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(3500))
> {
> @Override
> public long extractTimestamp(Event element) {
> Map timeStamp = (Map) event.get("ts”);
> return (long) timeStamp.get("value");
> }
> });
> 
> In general, if records are sorted by anything in a Kafka partition,
> parallel subtask of Flink Kafka source will consume these records and push
> to user operators in the same order. There is maximum one consuming subtask
> per Kafka partition but several partitions might be served by one subtask.
> It means that there are the same guarantees as in Kafka: ordering per
> partition but not across them, including no global ordering.
> 
> The case of global and per window ordering is already described by 

Re: Ordering of stream from different kafka partitions

2018-06-20 Thread sihua zhou




Hi,


I think a global ordering is a bit impractical on production, but in theroy, 
you still can do that. You need to


- Firstly fix the operate's parallelism to 1(except the source node).
- If you want to sort the records within a bouned time, then you can keyBy() a 
constant and window it, buffer the records in the state and sort the records 
when the window is triggered, the code maybe as follows.
{code}
sourceStream
.setParallelism(4)
.assignTimestampsAndWatermarks(
new 
BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(3500))
{
@Override
public long extractTimestamp(Event element) {
Map timeStamp = (Map) event.get("ts”);
return (long) timeStamp.get("value");
}
 })
.keyBy((record) -> 0)// keyby the constant value
.window(...)
.process(new OrderTheRecords()))
.setParallelism(1);
{code}


- If you want to sort the records truly globally(non-window), then you could 
keyBy a constant, store the records in the state and sort the records in the 
process() function for every incoming record. And if you want a perfect correct 
output, then maybe you need to do retraction (because every incoming records 
may change the globally order), the code maybe as follows
{code}
sourceStream
.setParallelism(4)
.keyBy((record) -> 0) // keyby the constant value
.process(new OrderTheRecords()))
.setParallelism(1);
{code}




In all the case, you need to fix the parallelism of the OrderTheRecord operate 
to 1, which makes your job non-scale-able and becomes the bottleneck. So a 
global ordering maybe not practical on production (but if the source's TPS is 
very low, then maybe practical).


Best, Sihua


On 06/20/2018 15:36,Amol S - iProgrammer wrote:
Hello Andrey,

Thanks for your quick response. I have tried with your above code but it
didn't suit's my requirement. I need global ordering of my records by using
multiple kafka partitions. Please suggest me any workaround for this. as
mentioned in this

link is it possible to buffer data for some amount of time and then perform
sort on this or any other way out there?

---
*Amol Suryawanshi*
Java Developer
am...@iprogrammer.com


*iProgrammer Solutions Pvt. Ltd.*



*Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
www.iprogrammer.com 


On Tue, Jun 19, 2018 at 10:19 PM, Andrey Zagrebin 
wrote:

Hi Amol,

I think you could try (based on your stack overflow code)
org.apache.flink.streaming.api.functions.timestamps.
BoundedOutOfOrdernessTimestampExtractor
like this:

DataStream streamSource = env
.addSource(kafkaConsumer)
.setParallelism(4)
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(3500))
{
@Override
public long extractTimestamp(Event element) {
Map timeStamp = (Map) event.get("ts”);
return (long) timeStamp.get("value");
}
});

In general, if records are sorted by anything in a Kafka partition,
parallel subtask of Flink Kafka source will consume these records and push
to user operators in the same order. There is maximum one consuming subtask
per Kafka partition but several partitions might be served by one subtask.
It means that there are the same guarantees as in Kafka: ordering per
partition but not across them, including no global ordering.

The case of global and per window ordering is already described by Sihua.
The global ordering might be impractical in case of distributed system.

If a subtask of your Flink operator consumes from several partitions or
there is no ordering at all, you can try the above approach with
BoundedOutOfOrdernessTimestampExtractor to get approximate ordering
across these partitions per key or all records. It is similar to ordering
within a window. It means there could still be late records coming after
out of orderness period of time which can break the ordering. This operator
buffers records in state to maintain the order but only for out of
orderness period of time which also increases latency.

Cheers,
Andrey

On 19 Jun 2018, at 14:12, sihua zhou  wrote:



Hi Amol,


I'm not sure whether this is impossible, especially when you need to
operate the record in multi parallelism.


IMO, in theroy, we can only get a ordered stream when there is a single
partition of kafka and operate it with a single parallelism in flink. Even
in this case, if you only want to order the records in a window, than you
need to store the records in the state, and order them when the 

Re: Ordering of stream from different kafka partitions

2018-06-20 Thread Amol S - iProgrammer
Hello Andrey,

Thanks for your quick response. I have tried with your above code but it
didn't suit's my requirement. I need global ordering of my records by using
multiple kafka partitions. Please suggest me any workaround for this. as
mentioned in this

link is it possible to buffer data for some amount of time and then perform
sort on this or any other way out there?

---
*Amol Suryawanshi*
Java Developer
am...@iprogrammer.com


*iProgrammer Solutions Pvt. Ltd.*



*Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
www.iprogrammer.com 


On Tue, Jun 19, 2018 at 10:19 PM, Andrey Zagrebin 
wrote:

> Hi Amol,
>
> I think you could try (based on your stack overflow code)
> org.apache.flink.streaming.api.functions.timestamps.
> BoundedOutOfOrdernessTimestampExtractor
> like this:
>
> DataStream streamSource = env
>.addSource(kafkaConsumer)
>.setParallelism(4)
>.assignTimestampsAndWatermarks(
>new 
> BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(3500))
> {
>@Override
>public long extractTimestamp(Event element) {
>Map timeStamp = (Map) event.get("ts”);
>return (long) timeStamp.get("value");
>}
>});
>
> In general, if records are sorted by anything in a Kafka partition,
> parallel subtask of Flink Kafka source will consume these records and push
> to user operators in the same order. There is maximum one consuming subtask
> per Kafka partition but several partitions might be served by one subtask.
> It means that there are the same guarantees as in Kafka: ordering per
> partition but not across them, including no global ordering.
>
> The case of global and per window ordering is already described by Sihua.
> The global ordering might be impractical in case of distributed system.
>
> If a subtask of your Flink operator consumes from several partitions or
> there is no ordering at all, you can try the above approach with
> BoundedOutOfOrdernessTimestampExtractor to get approximate ordering
> across these partitions per key or all records. It is similar to ordering
> within a window. It means there could still be late records coming after
> out of orderness period of time which can break the ordering. This operator
> buffers records in state to maintain the order but only for out of
> orderness period of time which also increases latency.
>
> Cheers,
> Andrey
>
> > On 19 Jun 2018, at 14:12, sihua zhou  wrote:
> >
> >
> >
> > Hi Amol,
> >
> >
> > I'm not sure whether this is impossible, especially when you need to
> operate the record in multi parallelism.
> >
> >
> > IMO, in theroy, we can only get a ordered stream when there is a single
> partition of kafka and operate it with a single parallelism in flink. Even
> in this case, if you only want to order the records in a window, than you
> need to store the records in the state, and order them when the window is
> triggered. But if you want to order the records with a single
> `keyBy()`(non-window), I think that's maybe impossible in practice, because
> you need to store the all the incoming records and order the all data for
> every incoming records, also you need to send retracted message for the
> previous result(because every incoming record might change the global order
> of the records).
> >
> >
> > Best, Sihua
> > On 06/19/2018 19:19,Amol S - iProgrammer wrote:
> > Hi,
> >
> > I have used flink streaming API in my application where the source of
> > streaming is kafka. My kafka producer will publish data in ascending
> order
> > of time in different partitions of kafka and consumer will read data from
> > these partitions. However some kafka partitions may be slow due to some
> > operation and produce late results. Is there any way to maintain order in
> > this stream though the data arrive out of order. I have tried
> > BoundedOutOfOrdernessTimestampExtractor but it didn't served the
> purpose.
> > While digging this problem I came across your documentation (URL:
> > https://cwiki.apache.org/confluence/display/FLINK/Time+
> and+Order+in+Streams)
> > and tried to implement this but it didnt worked. I also tried with Table
> > API order by but it seems you not support orderBy in flink 1.5 version.
> > Please suggest me any workaround for this.
> >
> > I have raised same concern on stack overflow
> >
> > https://stackoverflow.com/questions/50904615/ordering-
> of-streams-while-reading-data-from-multiple-kafka-partitions
> >
> > Thanks,
> >
> > ---
> > *Amol Suryawanshi*
> > Java Developer
> > am...@iprogrammer.com
> >
> >
> > *iProgrammer Solutions Pvt. Ltd.*
> >
> >
> >
> > *Office 103, 104, 

Re: Ordering of stream from different kafka partitions

2018-06-19 Thread Andrey Zagrebin
Hi Amol,

I think you could try (based on your stack overflow code)
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
like this:

DataStream streamSource = env
   .addSource(kafkaConsumer)
   .setParallelism(4)
   .assignTimestampsAndWatermarks(
   new 
BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(3500)) {
   @Override
   public long extractTimestamp(Event element) {
   Map timeStamp = (Map) event.get("ts”);
   return (long) timeStamp.get("value");
   }
   });

In general, if records are sorted by anything in a Kafka partition, parallel 
subtask of Flink Kafka source will consume these records and push to user 
operators in the same order. There is maximum one consuming subtask per Kafka 
partition but several partitions might be served by one subtask. It means that 
there are the same guarantees as in Kafka: ordering per partition but not 
across them, including no global ordering. 

The case of global and per window ordering is already described by Sihua. The 
global ordering might be impractical in case of distributed system.

If a subtask of your Flink operator consumes from several partitions or there 
is no ordering at all, you can try the above approach with 
BoundedOutOfOrdernessTimestampExtractor to get approximate ordering across 
these partitions per key or all records. It is similar to ordering within a 
window. It means there could still be late records coming after out of 
orderness period of time which can break the ordering. This operator buffers 
records in state to maintain the order but only for out of orderness period of 
time which also increases latency.

Cheers,
Andrey

> On 19 Jun 2018, at 14:12, sihua zhou  wrote:
> 
> 
> 
> Hi Amol,
> 
> 
> I'm not sure whether this is impossible, especially when you need to operate 
> the record in multi parallelism. 
> 
> 
> IMO, in theroy, we can only get a ordered stream when there is a single 
> partition of kafka and operate it with a single parallelism in flink. Even in 
> this case, if you only want to order the records in a window, than you need 
> to store the records in the state, and order them when the window is 
> triggered. But if you want to order the records with a single 
> `keyBy()`(non-window), I think that's maybe impossible in practice, because 
> you need to store the all the incoming records and order the all data for 
> every incoming records, also you need to send retracted message for the 
> previous result(because every incoming record might change the global order 
> of the records).
> 
> 
> Best, Sihua
> On 06/19/2018 19:19,Amol S - iProgrammer wrote:
> Hi,
> 
> I have used flink streaming API in my application where the source of
> streaming is kafka. My kafka producer will publish data in ascending order
> of time in different partitions of kafka and consumer will read data from
> these partitions. However some kafka partitions may be slow due to some
> operation and produce late results. Is there any way to maintain order in
> this stream though the data arrive out of order. I have tried
> BoundedOutOfOrdernessTimestampExtractor but it didn't served the purpose.
> While digging this problem I came across your documentation (URL:
> https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams)
> and tried to implement this but it didnt worked. I also tried with Table
> API order by but it seems you not support orderBy in flink 1.5 version.
> Please suggest me any workaround for this.
> 
> I have raised same concern on stack overflow
> 
> https://stackoverflow.com/questions/50904615/ordering-of-streams-while-reading-data-from-multiple-kafka-partitions
> 
> Thanks,
> 
> ---
> *Amol Suryawanshi*
> Java Developer
> am...@iprogrammer.com
> 
> 
> *iProgrammer Solutions Pvt. Ltd.*
> 
> 
> 
> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
> www.iprogrammer.com 
> 



Re: Ordering of stream from different kafka partitions

2018-06-19 Thread Andrey Zagrebin
Hi Amol,

I think you could try (based on your stack overflow code)
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
like this:

DataStream streamSource = env
.addSource(kafkaConsumer)
.setParallelism(4)
.assignTimestampsAndWatermarks(
new 
BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(3500)) {
@Override
public long extractTimestamp(Event element) {
Map timeStamp = (Map) event.get("ts”);
return (long) timeStamp.get("value");
}
});

In general, if records are sorted by anything in a Kafka partition, parallel 
subtask of Flink Kafka source will consume these records and push to user 
operators in the same order. There is maximum one consuming subtask per Kafka 
partition but several partitions might be served by one subtask. It means that 
there are the same guarantees as in Kafka: ordering per partition but not 
across them, including no global ordering. 

The case of global and per window ordering is already described by Sihua. The 
global ordering might be impractical in case of distributed system.

If a subtask of your Flink operator consumes from several partitions or there 
is no ordering at all, you can try the above approach with 
BoundedOutOfOrdernessTimestampExtractor to get approximate ordering across 
these partitions per key or all records. It is similar to ordering within a 
window. It means there could still be late records coming after out of 
orderness period of time which can break the ordering. This operator buffers 
records in state to maintain the order but only for out of orderness period of 
time which also increases latency.

Cheers,
Andrey

> On 19 Jun 2018, at 14:12, sihua zhou  wrote:
> 
> 
> 
> Hi Amol,
> 
> 
> I'm not sure whether this is impossible, especially when you need to operate 
> the record in multi parallelism. 
> 
> 
> IMO, in theroy, we can only get a ordered stream when there is a single 
> partition of kafka and operate it with a single parallelism in flink. Even in 
> this case, if you only want to order the records in a window, than you need 
> to store the records in the state, and order them when the window is 
> triggered. But if you want to order the records with a single 
> `keyBy()`(non-window), I think that's maybe impossible in practice, because 
> you need to store the all the incoming records and order the all data for 
> every incoming records, also you need to send retracted message for the 
> previous result(because every incoming record might change the global order 
> of the records).
> 
> 
> Best, Sihua
> On 06/19/2018 19:19,Amol S - iProgrammer wrote:
> Hi,
> 
> I have used flink streaming API in my application where the source of
> streaming is kafka. My kafka producer will publish data in ascending order
> of time in different partitions of kafka and consumer will read data from
> these partitions. However some kafka partitions may be slow due to some
> operation and produce late results. Is there any way to maintain order in
> this stream though the data arrive out of order. I have tried
> BoundedOutOfOrdernessTimestampExtractor but it didn't served the purpose.
> While digging this problem I came across your documentation (URL:
> https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams)
> and tried to implement this but it didnt worked. I also tried with Table
> API order by but it seems you not support orderBy in flink 1.5 version.
> Please suggest me any workaround for this.
> 
> I have raised same concern on stack overflow
> 
> https://stackoverflow.com/questions/50904615/ordering-of-streams-while-reading-data-from-multiple-kafka-partitions
> 
> Thanks,
> 
> ---
> *Amol Suryawanshi*
> Java Developer
> am...@iprogrammer.com
> 
> 
> *iProgrammer Solutions Pvt. Ltd.*
> 
> 
> 
> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
> www.iprogrammer.com 
> 



Ordering of stream from different kafka partitions

2018-06-19 Thread Amol S - iProgrammer
Hi,

I have used flink streaming API in my application where the source of
streaming is kafka. My kafka producer will publish data in ascending order
of time in different partitions of kafka and consumer will read data from
these partitions. However some kafka partitions may be slow due to some
operation and produce late results. Is there any way to maintain order in
this stream though the data arrive out of order. I have tried
BoundedOutOfOrdernessTimestampExtractor but it didn't served the purpose.
While digging this problem I came across your documentation (URL:
https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams)
and tried to implement this but it didnt worked. I also tried with Table
API order by but it seems you not support orderBy in flink 1.5 version.
Please suggest me any workaround for this.

I have raised same concern on stack overflow

https://stackoverflow.com/questions/50904615/ordering-of-streams-while-reading-data-from-multiple-kafka-partitions

Thanks,

---
*Amol Suryawanshi*
Java Developer
am...@iprogrammer.com


*iProgrammer Solutions Pvt. Ltd.*



*Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
www.iprogrammer.com