Re: Problem getting watermark right with event time

2020-04-20 Thread Fabian Hueske
Hi Sudan,

I noticed a few issues with your code:

1) Please check the computation of timestamps. Your code

public long extractAscendingTimestamp(Eventi.Event element) {
  return element.getEventTime().getSeconds() * 1000;
}

only seems to look at the seconds of a timestamp. Typically, you would just
return the whole timestamp encoded as a long that represents the
milliseconds since epoch (1970-01-01 00:00:00.000).
Why do you multiple with 1000?

2) An AscendingTimestampExtractor assumes that records arrive with strictly
ascending timestamps.
If the timestamps in your data are slightly out of order, you probably want
another watermark assigner for example a
BoundedOutOfOrdernessTimestampExtractor [1].

3) You probably don't want to key on event time:

keyBy(Eventi.Event::getEventTime)

Usually, you choose a partitioning key here. If you cannot partition your
data and all records should be grouped in the single stream of windows you
should use DataStream.windowAll().
Note however, that this means that your code cannot run in parallel. See
[2] for details.

 Best, Fabian

[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.java
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#keyed-vs-non-keyed-windows

Am So., 19. Apr. 2020 um 21:37 Uhr schrieb Sudan S :

> Hi,
>
> I am having a problem getting watermark right. The setup is
> - I have a Flink Job which reads from a Kafka topic, uses Protobuf
> Deserialization, uses Sliding Window of (120seconds, 30 seconds), sums up
> the value and finally returns the result.
>
> The code is pasted below.
>
> The problem here is, I'm not able to reach the sink. I am able to reach
> the assignTimestamp when the timestamp arrives, but past that, neither
> process function nor the sink function is getting invoked in spite of
> pumping events regularly. I'm not able to figure out how to debug this
> issue.
> Plz help.
>
> public class StreamingJob {
>
> public static void main(String[] args) throws Exception {
>
> Properties kafkaConsumerProps = new Properties();
> kafkaConsumerProps.setProperty("bootstrap.servers",
> "{bootstrap_servers}");
> kafkaConsumerProps.setProperty("group.id", "{group_id}");
>
>
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new
> Configuration());
> env.enableCheckpointing(100);
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.setMaxParallelism(5);
> env.setParallelism(5);
>
> SingleOutputStreamOperator texStream = env
> .addSource(new FlinkKafkaConsumer011<>("auth", new
> EventiSchema(), kafkaConsumerProps)).setParallelism(5).setMaxParallelism(5);
> SlidingEventTimeWindows window =
> SlidingEventTimeWindows.of(Time.seconds(120), Time.seconds(30));
> texStream.assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor() {
> @Override
> public long extractAscendingTimestamp(Eventi.Event element) {
> return element.getEventTime().getSeconds() * 1000;
> }
> }).keyBy(Eventi.Event::getEventTime).window(window).process(new
> ProcessWindowFunction() {
> @Override
> public void process(Timestamp timestamp, Context context,
> Iterable elements, Collector out) throws Exception {
> int sum = 0;
> for (Eventi.Event element : elements) {
> sum++;
> }
> out.collect(sum);
> }
> }).print()
>
> env.execute();
> }
> }
>
> --
> *"The information contained in this e-mail and any accompanying documents
> may contain information that is confidential or otherwise protected from
> disclosure. If you are not the intended recipient of this message, or if
> this message has been addressed to you in error, please immediately alert
> the sender by replying to this e-mail and then delete this message,
> including any attachments. Any dissemination, distribution or other use of
> the contents of this message by anyone other than the intended recipient is
> strictly prohibited. All messages sent to and from this e-mail address may
> be monitored as permitted by applicable law and regulations to ensure
> compliance with our internal policies and to protect our business."*
> --
>


Problem getting watermark right with event time

2020-04-19 Thread Sudan S
Hi,

I am having a problem getting watermark right. The setup is
- I have a Flink Job which reads from a Kafka topic, uses Protobuf
Deserialization, uses Sliding Window of (120seconds, 30 seconds), sums up
the value and finally returns the result.

The code is pasted below.

The problem here is, I'm not able to reach the sink. I am able to reach the
assignTimestamp when the timestamp arrives, but past that, neither process
function nor the sink function is getting invoked in spite of pumping
events regularly. I'm not able to figure out how to debug this issue.
Plz help.

public class StreamingJob {

public static void main(String[] args) throws Exception {

Properties kafkaConsumerProps = new Properties();
kafkaConsumerProps.setProperty("bootstrap.servers",
"{bootstrap_servers}");
kafkaConsumerProps.setProperty("group.id", "{group_id}");


final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new
Configuration());
env.enableCheckpointing(100);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setMaxParallelism(5);
env.setParallelism(5);

SingleOutputStreamOperator texStream = env
.addSource(new FlinkKafkaConsumer011<>("auth", new
EventiSchema(), kafkaConsumerProps)).setParallelism(5).setMaxParallelism(5);
SlidingEventTimeWindows window =
SlidingEventTimeWindows.of(Time.seconds(120), Time.seconds(30));
texStream.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor() {
@Override
public long extractAscendingTimestamp(Eventi.Event element) {
return element.getEventTime().getSeconds() * 1000;
}
}).keyBy(Eventi.Event::getEventTime).window(window).process(new
ProcessWindowFunction() {
@Override
public void process(Timestamp timestamp, Context context,
Iterable elements, Collector out) throws Exception {
int sum = 0;
for (Eventi.Event element : elements) {
sum++;
}
out.collect(sum);
}
}).print()

env.execute();
}
}

-- 
*"The information contained in this e-mail and any accompanying documents 
may contain information that is confidential or otherwise protected from 
disclosure. If you are not the intended recipient of this message, or if 
this message has been addressed to you in error, please immediately alert 
the sender by replying to this e-mail and then delete this message, 
including any attachments. Any dissemination, distribution or other use of 
the contents of this message by anyone other than the intended recipient is 
strictly prohibited. All messages sent to and from this e-mail address may 
be monitored as permitted by applicable law and regulations to ensure 
compliance with our internal policies and to protect our business."*