Re: Minicluster Flink tests, checkpoints and inprogress part files

2021-02-05 Thread Aljoscha Krettek

Hi Dan,

I'm afraid this is not easily possible using the DataStream API in 
STREAMING execution mode today. However, there is one possible solution 
and we're introducing changes that will also make this work on STREAMING 
mode.


The possible solution is to use the `FileSink` instead of the 
`StreamingFileSink`. This is an updated version of the sink that works 
in both BATCH and STREAMING mode (see [1]). If you use BATCH execution 
mode all your files should be "completed" at the end.


[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_execution_mode.html

The thing we're currently working on is FLIP-147 [2], which will allow 
sinks (and other operators) to always do one final checkpoint before 
shutting down. This will allow them to move the last outstanding 
inprogress files over to finished as well.


[2] https://cwiki.apache.org/confluence/x/mw-ZCQ

I hope that helps!

Best,
Aljoscha

On 2021/02/04 21:37, Dan Hill wrote:

Hi Flink user group,

*Background*
I'm changing a Flink SQL job to use Datastream.  I'm updating an existing
Minicluster test in my code.  It has a similar structure to other tests in
flink-tests.  I call StreamExecutionEnvironment.execute.  My tests sink
using StreamingFileSink Bulk Formats to tmp local disk.

*Issue*
When I try to check the files on local disk, I see
".part-0-0.inprogress.1234abcd-5678-uuid...".

*Question*
What's the best way to get the test to complete the outputs?  I tried
checkpointing very frequently, sleeping, etc but these didn't work.

Thanks!
- Dan


Re: Timers not firing until stream end

2021-01-27 Thread Aljoscha Krettek

On 2021/01/27 15:09, Chesnay Schepler wrote:
Put another way, if you use any of the built-in WatermarkGenerators and 
use event-time, then it appears that you *must* set this interval.


This behavior is...less than ideal I must admit, and it does not 
appear to be properly documented.


Setting the watermark interval is done when calling 
`env.setStreamTimeCharacteristic()`. It's the first thing we documented

for working with event time [1].

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_time.html


To me it was always a usability problem that we didn't have event time 
enabled by default. We didn't have this because of "performance 
considerations". This changed in Flink 1.12 [2].


[2] https://issues.apache.org/jira/browse/FLINK-19317

@Pilgrim: Which version of Flink are you using?


Re: Flink upgrade to Flink-1.12

2021-01-27 Thread Aljoscha Krettek
I'm afraid I also don't know more than that. But I agree with Ufuk that 
it should just work.


I think the best way would be to try it in a test environment and then 
go forward with upgrading the production jobs/cluster.


Best,
Aljoscha

On 2021/01/25 18:59, Ufuk Celebi wrote:

Thanks for reaching out. Semi-asynchronous does *not* refer to incremental 
checkpoints and Savepoints are always triggered as full snapshots (not 
incremental).

Earlier versions of the RocksDb state backend supported two snapshotting modes, 
fully and semi-asynchronous snapshots. Semi-asynchronous state snapshots for 
RocksDb have been removed a long time ago by Aljoscha in 
https://github.com/apache/flink/pull/2345 (FLINK-4340). The notes you are 
referencing were added around that time and I'm afraid they might have become 
mostly obsolete.

I'm pulling in Aljoscha who should be able to give a definitive answer here.

To make a long story short, it should simply work for you to upgrade from 1.11 
to 1.12 via a Savepoint.

Cheers,

Ufuk

On Wed, Jan 20, 2021, at 3:58 AM, 耿延杰 wrote:

Hi all,

As flink doc says:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/upgrading.html#preconditions


We do not support migration for state in RocksDB that was checkpointed using 
`semi-asynchronous` mode. In case your old job was using this mode, you can 
still change your job to use `fully-asynchronous` mode before taking the 
savepoint that is used as the basis for the migration.


So, my first question:
Is "semi-asynchronous" means "incremental checkpoint"?

And second question:
If so, assume I'm using flink-1.11 and RocksDB with incremental asynchronous 
checkpoint as state backend.
I should:
1. take a savepoint for old version(flink-1.11),
2. and change job to use "full asynchronous checkpoint" ,
3. restart old version(flink-1.11) job with new config (full asynchronous 
checkpoint),
4. then, take a savepoint
5. and finally, stop old version(flink-1.11) and upgrade to flink-1.12

Whether I understand correctly?

Best regards


Re: Flink app logs to Elastic Search

2021-01-15 Thread Aljoscha Krettek

On 2021/01/15 10:43, bat man wrote:

I was able to make it work with a fresh Elastic installation. Now
taskmanager and jobmanager logs are available in elastic.
Thanks for the pointers.


Thanks for letting us know!


Re: Dead code in ES Sink

2021-01-14 Thread Aljoscha Krettek

On 2021/01/13 07:50, Rex Fenley wrote:

Are you saying that this option does get passed along to Elasticsearch
still or that it's just arbitrarily validated? According to [1] it's been
deprecated in ES 6 and removed in ES 7.

[1] https://github.com/elastic/elasticsearch/pull/38085


Sorry, I wasn't being very clear. I meant that we just pass it on to ES.  
In light of it being deprecated, I think it makes sense to keep it as 
long as we support ES 6. What do you think?


Side note: we still have an ES 5 connector... 😅 There was a discussion 
about dropping it, but it wasn't conclusive. [1]


[1] 
https://lists.apache.org/thread.html/rb957e7d7d5fb9bbe25e5fbc56662749ee1bc551d36e26c58644f60d4%40%3Cdev.flink.apache.org%3E


Best,
Aljoscha


Re: Flink app logs to Elastic Search

2021-01-13 Thread Aljoscha Krettek

On 2021/01/11 01:29, bat man wrote:

Yes, no entries to the elastic search. No indices were created in elastic.
Jar is getting picked up which I can see from yarn logs. Pre-defined text
based logging is also available.


Hmm, I can't imagine much that could go wrong. Maybe there is some 
interference from other configuration files. Could you try and make sure 
that you only have the configuration and logging system in the classpath 
that you want to use?


Best,
Aljoscha


Re: Flink to get historical data from kafka between timespan t1 & t2

2021-01-13 Thread Aljoscha Krettek

On 2021/01/13 12:07, vinay.raic...@t-systems.com wrote:

Ok. Attached is the PPT of what am attempting to achieve w.r.t. time

Hope I am all set to achieve the three bullets mentioned in attached 
slide to create reports with KafkaSource and KafkaBuilder approach.


If you have any additional tips to share please do so after going 
through the slide attached (example for live dashboards use case)


I think that should work with `KafkaSource`, yes. You just need to set 
the correct start timestamps and end timestamps, respectively. I believe 
that's all there is to it, off the top of my head I can't think of any 
additional tips.


Best,
Aljoscha


Re: Dead code in ES Sink

2021-01-13 Thread Aljoscha Krettek

On 2021/01/12 15:04, Rex Fenley wrote:

[2]
https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java#L131

Should [2] be removed?


The link seems to not work anymore but I'm guessing you're referring to 
`CONNECTION_MAX_RETRY_TIMEOUT_OPTION`. This is used in the 
`*DynamicSinkFactory` classes, such as [1]. These can be used when 
defining Table API/SQL sources using DDL or the programmatic API. The 
actual field is never used but it will be used to check the allowed 
options when verifying what users specify via "string" options.


[1] 
https://github.com/apache/flink/blob/ee653778689023ddfdf007d5bde1daad8fbbc081/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java#L98


Re: Flink to get historical data from kafka between timespan t1 & t2

2021-01-13 Thread Aljoscha Krettek

On 2021/01/13 07:58, vinay.raic...@t-systems.com wrote:

Not sure about your proposal regarding Point 3:
*	firstly how is it ensured that the stream is closed? If I understand 
the doc correctly the stream will be established starting with the 
latest timestamp (hmm... is it not a standard behaviour?) and will 
never finish (UNBOUNDED),


On the first question of standard behaviour: the default is to start 
from the group offsets that are available in Kafka. This uses the 
configured consumer group. I think it's better to be explicit, though, 
and specify sth like `EARLIEST` or `LATEST`, etc.


And yes, the stream will start but never stop with this version of the 
Kafka connector. Only when you use the new `KafkaSource` can you also 
specify an end timestamp that will make the Kafka source shut down 
eventually.


*	secondly it is still not clear how to get the latest event  at a 
given time point in the past?


You are referring to getting a single record, correct? I don't think 
this is possible with Flink. All you can do is get a stream from Kafka 
that is potentially bounded by a start timestamp and/or end timestamp.


Best,
Aljoscha


Re: Flink kafka exceptions handling

2021-01-12 Thread Aljoscha Krettek

On 2021/01/07 14:36, BELGHITH Amira (EXT) wrote:
--> Our processing System is supposed to continue streaming data even 
though there is some Kafka errors, we are expecting that the 
KafkaConsumer fails but not the Flink job, do you think it is possible?


I'm afraid that's not possible with Flink right no. We treat all 
exceptions as errors, which lead to job restarts and eventually complete 
job failure if the restarts exceed the configured limit.


What you could do right now is copy the code for the 
`FlinkKafkaConsumer` and insert exception handling code for the 
exceptions that you would like to exclude. You could even go so far and 
add generic handling code that you can then configure with a list of 
exceptions to ignore when creating the consumer.


I hope that helps!

Best,
Aljoscha


Re: Flink to get historical data from kafka between timespan t1 & t2

2021-01-12 Thread Aljoscha Krettek

On 2021/01/11 14:12, vinay.raic...@t-systems.com wrote:
a) As mentioned by you "KafkaSource" was introduced in Flink 1.12 so, I 
suppose we have to upgrade to this version of Flink. Can you share the 
link of the stable Flink image (containerized version) to be used in 
our set-up keeping in mind we are to use KafkaSource.


Unfortunately, there is a problem with publishing those images. You can 
check out [1] to follow progress. In the meantime you can try and build 
the image yourself or use the one that Robert pushed to his private 
account.


b) Also kindly share details of the corresponding compatible version of 
"Kafka" (containerized version) to be used for making this work as I 
understand there is constraint on version of Kafka required to make it 
work.


I believe Kafka is backwards compatible since at least version 1.0, so 
any recent enough version should work.


Best,
Aljoscha


Re: How should I process a cumulative counter?

2021-01-12 Thread Aljoscha Krettek

Hi Larry,

By now, it seems to me that the windowing API might not be the right 
solution for your use case. The fact that sensors can shut down 
arbitrarily makes it hard to calculate what window an event should fall 
into.


Have you tried looking into `ProcessFunction`? With this you can keep 
state and set timers to fire, both based on event time and processing 
time. You could store your sensor data and calculate results and evict 
events on timer firings.


Best,
Aljoscha


Re: Flink to get historical data from kafka between timespan t1 & t2

2021-01-11 Thread Aljoscha Krettek

On 2021/01/08 16:55, vinay.raic...@t-systems.com wrote:

Could you also attach the code snippet for KafkaSource`, `KafkaSourceBuilder`, 
and `OffsetInitializers` that you were referring to in your previous reply, for 
my reference please to make it more clearer for me.


Ah sorry, but this I was referring to the Flink code. You can start with 
`KafkaSource` [1], which has an example block that shows how to use it 
in the Javadocs.


[1] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java


Re: Roadmap for Execution Mode (Batch/Streaming) and interaction with Table/SQL APIs

2021-01-11 Thread Aljoscha Krettek

Also cc'ing d...@flink.apache.org

On 2021/01/06 09:19, burkaygur wrote:

1) How do these changes impact the Table and SQL APIs? Are they completely
orthogonal or can we get the benefits of the new Batch Mode with Flink SQL
as well?


The answer here is a bit complicated. The Table API/SQL already use 
similar techniques under the hood for BATCH execution. Our recent 
changes just made similar functionality available for the DataStream 
API. So in a way the changes are orthogonal.


However, the changes are relevant when you want to interoperate with the 
DataStream and Table API. There it becomes relevant that both parts can 
do STREAMING/BATCH execution well. We're not 100% there yet on this 
front but we're tracking some work under FLIP-136 [1].


[1] https://cwiki.apache.org/confluence/x/0DN4CQ


2) What is the best ticket to follow the roadmap & track the progress of
this whole project. Specifically the parts about bootstrapping of state. I
would love to help contribute to it.


I would say the best way to follow progress is the dev mailing list and 
the the FLIP overview page [2]. That's not super intuitive and can be 
hard to follow for outsiders. Sometimes, people such as myself will 
write blog posts on the Flink website or private blogs that try and 
shine a light on the development so it might help to follow some 
relevant people from the project on Twitter, where such posts are often 
announced.


Specifically about state bootstrapping, we don't have many concrete 
thoughts yet. It would help if you could talk about some of the 
requirements that you would have for this.


Best,
Aljoscha

[2] 
https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals


Best,
Aljoscha


Re: Flink to get historical data from kafka between timespan t1 & t2

2021-01-08 Thread Aljoscha Krettek

Hi,

for your point 3. you can look at 
`FlinkKafkaConsumerBase.setStartFromTimestamp(...)`.


Points 1. and 2. will not work with the well established 
`FlinkKafkaConsumer`. However, it should be possible to do it with the 
new `KafkaSource` that was introduced in Flink 1.12. It might be a bit 
rough around the edged, though.


With the `KafkaSource` you can specify `OffsetInitializers` for both the 
starting and stopping offset of the source. Take a look at 
`KafkaSource`, `KafkaSourceBuilder`, and `OffsetInitializers` in the 
code.


I hope this helps.

Best,
Aljoscha

On 2021/01/08 07:51, vinay.raic...@t-systems.com wrote:

Hi Flink Community Team,

This is a desperate request for your help on below.

I am new to the Flink and trying to use it with Kafka for Event-based data 
stream processing in my project. I am struggling using Flink to find solutions 
to my requirements of project below:


 1.  Get all Kafka topic records at a given time point 't' (now or in the 
past). Also how to pull latest-record only* from Kafka using Flink
 2.  Getting all records from Kafka for a given time interval in the past between 
t1 & t2 time period.
 3.  Continuously getting data from Kafka starting at a given time point (now 
or in the past). The client will actively cancel/close the data streaming. 
Examples: live dashboards. How to do it using Flink?
Please provide me sample "Flink code snippet" for pulling data from kafka for 
above three requirements and oblige. I am stuck for last one month without much progress 
and your timely help will be a savior for me!
Thanks & Regards,
Vinay Raichur
T-Systems India | Digital Solutions
Mail: vinay.raic...@t-systems.com
Mobile: +91 9739488992



Re: How should I process a cumulative counter?

2021-01-08 Thread Aljoscha Krettek

Hi Larry,

the basic problem for your use case is that window boundaries are 
inclusive for the start timestamp and exclusive for the end timestamp.  
It's setup like this to ensure that consecutive tumbling windows don't 
overlap. This is only a function of how our `WindowAssigner` works, so 
it could be done differently in a different system.


Have you tried using a sliding window where the `slide` is `size - 1ms`?  
With this, you would ensure that elements that fall exactly on the 
boundary, i.e. your hourly sensor updates would end up in both of the 
consecutive windows. It seems a bit unorthodox but could work in your 
case.


Best,
Aljoscha

On 2021/01/08 08:56, Larry Aspen wrote:

Hi,

I'm evaluating Flink for our company's IoT use case and read a blog post
by Fabian Hueske from 2015 [1]. We have a similar situation except the
sensor is sending the value of a cumulative counter instead of a count.
We would like to calculate the sum of deltas of consecutive cumulative
counter values that occur during a time window.

Here is a scenario of a cumulative counter measuring runtime in seconds
and a machine starting for the first time at 12:00:00 and running for
the whole hour (sensor records values when it starts, every 15 minutes
and on hour change):

timestamp, cumulative counter value in seconds
12:00:00, 0
12:15:00, 900
12:30:00, 1800
12:45:00, 2700
13:00:00, 3600

This would produce the following deltas:
12:00:00, 900 -0 = 900
12:15:00, 1800 - 900 = 900
12:30:00, 2700 - 1800 = 900
12:45:00, 3600 - 2700 = 900

We would then sum the deltas to get runtime in seconds for the hour:
900 + 900 + 900 + 900 = 3600

What would be a good way to handle this kind of calculation in Flink?

I have already tried using a tumbling event time window of one hour,
but then the last value is only part of the next window and the delta
of 12:45:00 is missing and the sum is 900 + 900 + 900 = 2700.

I have also tried a sliding event time window of two hours where the sum
is calculated for the first hour. This produces the correct sum in this
scenario but fails if the next hour is later (e.g. 12:45:00, 14:00:00
i.e. machine is shutdown between 12:45:00 - 13:00:00 and started at
14:00:00).

My latest attempt has been to use a global window where I try to keep
the values for the last two hours and calculate the sum for the older
hour. This seems to work in my experiments where I read values from
a file and use parallelism of one. If I increase the parallelism, the
values are processed out of order and the results are incorrect as
older values are received after newer values which causes them to be
evicted.

Any advice on this would be appreciated.

Best regards,
Larry Aspen

[1] https://flink.apache.org/news/2015/12/04/Introducing-windows.html


Re: Flink app logs to Elastic Search

2021-01-08 Thread Aljoscha Krettek
So you're saying there is no logging output whatsoever being sent to 
Elasticsearch? Did you try and see if the jar file is being picked up?  
Are you still getting the pre-defined, text-based logging output?


Best,
Aljoscha

On 2021/01/07 17:04, bat man wrote:

Hi Team,

I have a requirement to push the flink app logs to Elastic Search for log
management. Can anyone guide if you are already doing this.
I have tried this -
https://cristian.io/post/flink-log4j/
I’m not getting any error for a sample job I tried.

I am using EMR to run Flink 1.9 and Elastic Search latest version running
on ec2 machine.

Thanks,
Hemant


Re: Question about "NoWatermark" in Flink 1.9.2

2021-01-08 Thread Aljoscha Krettek

Thanks for the update!

Best,
Aljoscha

On 2021/01/07 16:45, Peter Huang wrote:

Hi,

We end up finding the root cause. Since a time point, two of the partitions
of the input topic don't have any data which causes the second window
operator in the pipeline can't receive the watermark of all of the
partitions of the first operator. Thus, the watermark can be determined.
Hopefully, this finding will be useful for other users in the community.


Best Regards
Peter Huang

On Thu, Jan 7, 2021 at 8:11 AM Peter Huang 
wrote:


Hi,

We have a pipeline running while for both datacenters. Since Jan 5th, one
of the instances has the issue of  "No WaterMark", In the program, we are
using BoundedOutOfdernessTimestampExtractor to get a ts field from each
message. The same code runs well in the other dc. We checked the input
message, ts field is correctly set. Even we switch auto-watermark, the
issue is still there. The graph below shows no Watermark in the second
operator. Did anyone experience the same issue before?



[image: image.png]


Best Regards
Peter Huang






Re: Flink kafka exceptions handling

2021-01-07 Thread Aljoscha Krettek

Hi,

When you say that the `JobManager` goes down, you're referring to the 
fact that the Flink job will finish in a failed state after too many 
exceptions have occurred in the `FlinkKafkaConsumer. Is that correct?


I'm afraid right now there is no code path that would allow catching 
those `TopicUnthaurizationException`. We basically treat most exceptions 
coming from Kafka as errors that require recovery.


What behaviour would you have in mind as a reaction to those exceptions?

Best,
Aljoscha

On 2021/01/06 17:59, BELGHITH Amira (EXT) wrote:


Thank you for your answer.
I have been subscribed.

This is the previous topic I’m referring to 
http://mail-archives.apache.org/mod_mbox/flink-user/202008.mbox/%3CCACzKVZQ093HixMewb_prtP41ceXgmxCv=cmpsbphw-9+h8b...@mail.gmail.com%3E

Our flink job manager fails after multiple restarting, when the Kafka Consumer 
does not find a topic for example. We have a kafka exception 
TopicUnthaurizationException. We listen to a list a topics and whenever one is 
down , all our streaming system is down .. is there a way to handle those 
exceptions in the FlinkKafkaConsumer so the job manager does not fail?


De : Amira Belghith 
Envoyé : mercredi 6 janvier 2021 18:36
À : BELGHITH Amira (EXT) ResgGtsOpmOptVdf ; 
amira.belghith-...@soge.com
Objet : Fwd: Flink kafka exceptions handling

[EMETTEUR EXTERNE] / [EXTERNAL SENDER]
Soyez vigilant avant d'ouvrir les pièces jointes ou de cliquer sur les liens. En cas de 
doute, signalez le message via le bouton "Message suspect" ou consultez go/secu.
Be cautious before opening attachments or clicking on any links. If in doubt, use 
"Suspicious email" button or visit go/secu.



-- Message transféré -
De : Piotr Nowojski mailto:pnowoj...@apache.org>>
Date : mer. 6 janv. 2021 à 17:26
Objet : Re: Flink kafka exceptions handling
À : Amira Belghith mailto:belghith.am...@gmail.com>>
CC : buggi...@gmail.com 
mailto:buggi...@gmail.com>>

I think you first need to be subscribed as it's explained here [1]. Could you 
also link to which previous topic are you referring to?

Piotrek

[1] https://flink.apache.org/community.html#mailing-lists

śr., 6 sty 2021 o 17:09 Amira Belghith 
mailto:belghith.am...@gmail.com>> napisał(a):
Hey,
Thanks for your fast reply.
The mail couldnt be delivered to the mailing list.

Le mer. 6 janv. 2021 à 16:59, Piotr Nowojski 
mailto:pnowoj...@apache.org>> a écrit :
Hey,

could you post the question on the user 
mailto:user@flink.apache.org>> mailing list?

Thanks,
Piotrek

śr., 6 sty 2021 o 15:11 Amira Belghith 
mailto:belghith.am...@gmail.com>> napisał(a):
Hi Nick, Piotr,

Im a software engineer working for Societe Generale bank.
I saw your discussion about FlinkKafkaConsumer and exceptions handling.
I have the same problem for a week now, and I wanted to know if you have found 
a solution.
Our flink job manager fails after multiple restarting, when the Kafka Consumer 
does not find a topic for example. We have a kafka exception 
TopicUnthaurizationException. We listen to a list a topics and whenever one is 
down , all our streaming system is down .. is there a way to handle those 
exceptions in the FlinkKafkaConsumer so the job manager does not fail?

Thanks a lot for your help,
Amira belghith

=

Ce message et toutes les pieces jointes (ci-apres le "message")
sont confidentiels et susceptibles de contenir des informations
couvertes par le secret professionnel. Ce message est etabli
a l'intention exclusive de ses destinataires. Toute utilisation
ou diffusion non autorisee interdite.
Tout message electronique est susceptible d'alteration. La SOCIETE GENERALE
et ses filiales declinent toute responsabilite au titre de ce message
s'il a ete altere, deforme falsifie.

=

This message and any attachments (the "message") are confidential,
intended solely for the addresses, and may contain legally privileged
information. Any unauthorized use or dissemination is prohibited.
E-mails are susceptible to alteration. Neither SOCIETE GENERALE nor any
of its subsidiaries or affiliates shall be liable for the message
if altered, changed or falsified.

=


Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-07 Thread Aljoscha Krettek
This is somewhat unrelated to the discussion about how to actually do 
the triggering when sources shut down, I'll write on that separately. I 
just wanted to get this quick thought out.


For letting operators decide whether they actually want to wait for a 
final checkpoint, which is relevant at least for Async I/O and 
potentially for sinks.


We could introduce an interface, sth like `RequiresFinalization` or 
`FinalizationListener` (all bad names). The operator itself knows when 
it is ready to completely shut down, Async I/O would wait for all 
requests, sink would potentially wait for a given number of checkpoints.  
The interface would have a method like `isFinalized()` that the 
framework can call after each checkpoint (and potentially at other 
points)


This way we would decouple that logic from things that don't actually 
need it. What do you think?


Best,
Aljoscha


Re: Issue in WordCount Example with DataStream API in BATCH RuntimeExecutionMode

2020-12-24 Thread Aljoscha Krettek
Thanks for reporting this! This is not the expected behaviour, I created 
a Jira Issue: https://issues.apache.org/jira/browse/FLINK-20764.


Best,
Aljoscha

On 23.12.20 22:26, David Anderson wrote:

I did a little experiment, and I was able to reproduce this if I use the
sum aggregator on KeyedStream to do the counting.

However, if I implement my own counting in a KeyedProcessFunction, or if I
use the Table API, I get correct results with RuntimeExecutionMode.BATCH --
though the results are produced incrementally, as they would be in
streaming mode.

In FLIP-134: Batch execution for the DataStream API [1] it was decided to
deprecate these relational methods -- such as sum -- on KeyedStream. But I
don't know if this means this behavior is to be expected, or not.

I've cc'ed @Aljoscha Krettek , who should be able to
shed some light on this.

Best,
David

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API

On Wed, Dec 23, 2020 at 8:22 PM Derek Sheng 
wrote:


Hi team,

Recently I am trying to explore the new features of Flink 1.12 with Batch
Execution.

I locally wrote a classic WordCount program to read from text file and
count the words (almost same as the one in Flink Github repo
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala),
and after reading
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html
I added `env.setRuntimeMode(RuntimeExecutionMode.BATCH);` after declare the
"env" to make it execute under BATCH mode. After running the code, the
printed results showed only final count results instead of incremental
results, which is expected. *But I also notice, all the words that only
appear once have NOT been printed out*. I have tried different things
like wrap the word in a case class etc, and read more details and see if I
have missed anything but still not able to figure out (And I have tried the
default examples come with the Flink package and got same results, and with
using DataSet API I do not see this issue).

Is there anything extra user need to specify or notice when using BATCH
execution mode in datastream API with Flink 1.12 or this is kind of a bug
please? The flink version I used is 1.12 with scala 2.11 (also tried java
1.8 and observed same issue)

Please let me know if you need other info to help diagnose. Thank you very
much!

Bests,

Derek Sheng







Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-12-15 Thread Aljoscha Krettek

Thanks for the thorough update! I'll answer inline.

On 14.12.20 16:33, Yun Gao wrote:

 1. To include EndOfPartition into consideration for barrier alignment at 
the TM side, we now tend to decouple the logic for EndOfPartition with the 
normal alignment behaviors to avoid the complex interference (which seems to be 
a bit not trackable). We could do so by inserting suitable barriers for input 
channels received but not processed EndOfPartition. For example, if a task with 
four inputs has received barrier 2 from two input channels, but the other two 
inputs do not received barrier 2  before EndOfPartition due to the precedent 
tasks are finished, we could then insert barrier 2 for the last two channels so 
that we could still finish the checkpoint 2.


You mean we would insert "artificial" barriers for barrier 2 in case we 
receive  EndOfPartition while other inputs have already received barrier 
2? I think that makes sense, yes.



 2. As we have discussed, if a tasks finished during we triggering the 
tasks, it would cause checkpoint failure and we should re-trigger its 
descendants. But if possible we think we might skip this issue at the first 
version to reduce the implementation complexity since it should not affect the 
correctness. We could considering support it in the following versions.


I think this should be completely fine.


 3. We would have to add a field isFinished  to OperatorState so that we 
could not re-run finished sources after failover. However, this would require a 
new version of checkpoint meta. Currently Flink have an abstract 
MetaV2V3SerializerBase and have V2 and V3 extends it to share some 
implementation. To add V4 which is only different from V3 for one field, the 
current PoC want to introduce a new MetaV3V4SerializerBase extends 
MetaV2V3SerializerBase to share implementation between V3 and V4. This might 
looks a little complex and we might need a general mechanism to extend 
checkpoint meta format.


This indeed seems complex. Maybe we could switch to using composition 
instead of inheritance to make this more extensible?



 4. With the change StreamTask would have two types of subclasses according 
to how to implement triggerCheckpoint, one is source tasks that perform 
checkpoints immediately and another is the non-source tasks that would notify 
CheckpointBarrierHandler in some way. However, since we have multiple source 
tasks (legacy and new source) and multiple non-source tasks (one-input, 
two-input, multiple-input), it would cause the cases that multiple subclasses 
share the same implementation and  cause code repetition. Currently the PoC 
introduces a new level of abstraction, namely SourceStreamTasks and 
NonSourceStreamTasks, but what makes it more complicated is that 
StreamingIterationHead extends OneInputStreamTask but it need to perform 
checkpoint as source tasks.


Don't we currently have the same problem? Even right now source tasks 
and non-source tasks behave differently when it comes to checkpoints. 
Are you saying we should fix that or would the new work introduce even 
more duplicate code?




Re: Logs of JobExecutionListener

2020-11-25 Thread Aljoscha Krettek
One bigger problem here is that the code base is very inconsistent when 
it comes to the @Public//@Internal annotations. Initially, only the 
packages that were meant to be "public" had them. For example flink-core 
has thorough annotations. Packages that were not meant to have any 
user-facing code didn't initially have annotations, so flink-runtime has 
barely no annotations.


The reason why some classes in non-public-facing packages have 
annotations is just that at some point someone decided to make something 
consciously @Public or @Internal.


On 24.11.20 12:25, Flavio Pompermaier wrote:

ok that's fine to me, just add an @internal annotation on the
RestClusterClient if it is intended only for internal use.. but wouldn't be
easier to provide some sort of client generation facility (e.g. swagger or
similar)?

Il mar 24 nov 2020, 11:38 Till Rohrmann  ha scritto:


I see the point in having a richer RestClusterClient. However, I think we
first have to make a decision whether the RestClusterClient is something
internal or not. If it is something internal, then only extending the
RestClusterClient and not adding these convenience methods to ClusterClient
could be quite easy. However if it is internal, then we don't need these
methods because the RestClusterClient is not used in such a way that it is
needed. I believe Flavio that you could build your own RestClusterClient
offering these methods.

If we say that these methods should be usable by users, then they should
also be added to the ClusterClient. Here I see the problem that some of
these methods won't work with a per-job or application deployment. For
example, public JarUploadResponseBody uploadJar(Path uploadedFile) would
not work.

Long story short, I think the easiest solution would be to build yourself
an utility class which offers the required methods. The second best option
in my opinion would be to add these methods to the RestClusterClient w/o
giving guarantees for their stability.

Cheers,
Till

On Mon, Nov 23, 2020 at 8:29 PM Flavio Pompermaier 
wrote:


For the sake of simplification (so everybody looking for missing methods
in RestClusterClient) I just shared the new methods at [1].
In this way you can add them to the RestClusterClient when you want (if
you want to).
I also had to change the visibility of some variables and methods in
order to make it work.
Probably it would be useful to put DTOs of flink-webmonitor in a
standalone project in order to be "importable" in the client project..

Best,
Flavio

[1]
https://github.com/fpompermaier/flink-job-server/blob/main/flink-rest-client/src/main/java/org/apache/flink/client/program/rest/RestClusterClientExtended.java

On Mon, Nov 23, 2020 at 4:38 PM Flavio Pompermaier 
wrote:


I don't know if they need to be added also to the ClusterClient but for
sure they are missing in the RestClusterClient

On Mon, Nov 23, 2020 at 4:31 PM Aljoscha Krettek 
wrote:


On 23.11.20 16:26, Flavio Pompermaier wrote:

Thank you Aljosha,.now that's more clear!
I didn't know that jobGraph.getJobID() was the solution for my use

case..I

was convinced that the job ID was assigned by the cluster!
And to me it's really weird that the job listener was not called by

the

submitJob...Probably this should be documented at least.
In the meanwhile I extended a little bit the RestClusterClient..do you
think it could be worth issuing a PR to add some unimplemented

methods?


For example I added:
- public JobExceptionsInfo getFlinkJobExceptionsInfo(JobID

flinkJobId);

- public EmptyResponseBody deleteJar(String jarFileName);
- public boolean isJobRunning(JobID fjid)
- public JarUploadResponseBody uploadJar(Path uploadedFile);

and I was also going to add jarRun..


I would be OK with adding these. But you would also need to add them to
the base ClusterClient, right?

@Till or @Chesnay, any concerns with this?









Re: Logs of JobExecutionListener

2020-11-23 Thread Aljoscha Krettek

On 23.11.20 16:26, Flavio Pompermaier wrote:

Thank you Aljosha,.now that's more clear!
I didn't know that jobGraph.getJobID() was the solution for my use case..I
was convinced that the job ID was assigned by the cluster!
And to me it's really weird that the job listener was not called by the
submitJob...Probably this should be documented at least.
In the meanwhile I extended a little bit the RestClusterClient..do you
think it could be worth issuing a PR to add some unimplemented methods?

For example I added:
- public JobExceptionsInfo getFlinkJobExceptionsInfo(JobID flinkJobId);
- public EmptyResponseBody deleteJar(String jarFileName);
- public boolean isJobRunning(JobID fjid)
- public JarUploadResponseBody uploadJar(Path uploadedFile);

and I was also going to add jarRun..


I would be OK with adding these. But you would also need to add them to 
the base ClusterClient, right?


@Till or @Chesnay, any concerns with this?


Re: Logs of JobExecutionListener

2020-11-23 Thread Aljoscha Krettek

On 20.11.20 22:09, Flavio Pompermaier wrote:

To achieve this, I was using the
RestClusterClient because with that I can use the
following code and retrieve the JobID:

 (1) JobID flinkJobId =
client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get().getJobID();


All you want to do is get the JobID, correct? If yes, you can just do a 
`jobGraph.getJobID()`. The job id is not set on the cluster but it's 
actually set client side, on the JobGraph object.


Does that help in your case?

A general comment on your other questions: yes, the listener logic if 
only used when using the environments. It's not integrated with the 
RestClusterClient, which is considered more of an internal 
implementation detail.


Aljoscha



Re: How to use EventTimeSessionWindows.withDynamicGap()

2020-11-20 Thread Aljoscha Krettek

Sure, my pleasure!

Aljoscha

On 19.11.20 16:12, Simone Cavallarin wrote:

Many thanks for the Help!!

Simone


From: Aljoscha Krettek 
Sent: 19 November 2020 11:46
To: user@flink.apache.org 
Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()

On 17.11.20 17:37, Simone Cavallarin wrote:

Hi,

I have been working on the suggestion that you gave me, thanks! The first part is to add to the 
message the gap. 1)I receive the event, 2)I take that event and I map it using  
StatefulsessionCalculator, that is where I put together "The message", and 
"long" that is my gap in millis.

DataStream source = 

Operation in front of the window that keeps track of session gaps

DataStream> enriched = source
 .keyBy()
 .map(new StatefulSessionCalculator()); // or process()

This is my StatefulSessionCalculator():

Tuple2 map(MyMessageType input) {
 ValueState valueState = getState(myModelStateDescriptor);
MyState state = valueState.value()
 state.update(input);
 long suggestedGap = state.getSuggestedGap();
 valueState.update(state);
 return Tuple2.of(input, suggestedGap);
}

If the "gap" calculated is "1234".
The result would be: [Tom, 1.70, 50, 1605612588995], [1234]>?


That looks correct, yes.


The second step is to use the gap calculated through  
DynamicWindowGapExtractor().

DataStream<...> result = enriched
 .keyBy(new MyKeySelector())
 .window(EventTimeSessionWindows.withDynamicGap(new 
DynamicWindowGapExtractor()))


The DynamicWindowGapExtractor() extract the gap from the message and feed it 
back to Flink.
Could you please give me an example also for this one?


This would just be class that extends
SessionWindowTimeGapExtractor> and returns the gap
from the extract() method.


One thing that I don't understand is that after enriching the message my event 
that contain a POJO is nested inside tuple. How can I access it?


You would just read the first field of the tuple, i.e. tuple.f0.



The last point when you said: "I think, however, that it might be easier at this 
point to just use a stateful ProcessFunction", you meant a completely different 
approach, would be better?


That's what I meant yes. Because it seems to complicated to split the
logic into the part that determines the dynamic gap and then another
part that does the computation per session. It seems easier to just roll
that into one operator that does everything. And with state and timers
you should have enough flexibility.

Best,
Aljoscha






Re: Logs of JobExecutionListener

2020-11-19 Thread Aljoscha Krettek
Hmm, there was this issue: 
https://issues.apache.org/jira/browse/FLINK-17744 But it should be fixed 
in your version.


On 19.11.20 12:58, Flavio Pompermaier wrote:

Which version are you using?
I used the exact same commands on Flink 1.11.0 and I didn't get the job
listener output..

Il gio 19 nov 2020, 12:53 Andrey Zagrebin  ha scritto:


Hi Flavio and Aljoscha,

Sorry for the late heads up. I could not actually reproduce the reported
problem with 'flink run' and local standalone cluster on master.
I get the expected output with the suggested modification of WordCount
program:

$ bin/start-cluster.sh

$ rm -rf out; bin/flink run
flink/flink-examples/flink-examples-batch/target/WordCount.jar --output
flink/build-target/out

Executing WordCount example with default input data set.
Use --input to specify file input.
 SUBMITTED
Job has been submitted with JobID c454a894d0524ccb69943b95838eea07
Program execution finished
Job with JobID c454a894d0524ccb69943b95838eea07 has finished.
Job Runtime: 139 ms

 EXECUTED

Best,
Andrey

On Thu, Nov 19, 2020 at 2:40 PM Aljoscha Krettek 
wrote:


JobListener.onJobExecuted() is only invoked in
ExecutionEnvironment.execute() and ContextEnvironment.execute(). If none
of these is still in the call chain with that setup then the listener
will not be invoked.

Also, this would only happen on the client, not on the broker (in your
case) or the server (JobManager).

Does that help to debug the problem?

Aljoscha

On 19.11.20 09:49, Flavio Pompermaier wrote:

I have a spring boot job server that act as a broker towards our
application and a Flink session cluster. To submit a job I use the
FlinkRestClient (that is also the one used in the CLI client when I use

the

run action it if I'm not wrong). However both methods don't trigger the

job

listener.

Il gio 19 nov 2020, 09:39 Aljoscha Krettek  ha

scritto:



@Flavio, when you're saying you're using the RestClusterClient, you are
not actually using that manually, right? You're just submitting your

job

via "bin/flink run ...", right?

What's the exact invocation of "bin/flink run" that you're using?

On 19.11.20 09:29, Andrey Zagrebin wrote:

Hi Flavio,

I think I can reproduce what you are reporting (assuming you also pass
'--output' to 'flink run').
I am not sure why it behaves like this. I would suggest filing a Jira
ticket for this.

Best,
Andrey

On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier <

pomperma...@okkam.it


wrote:


is this a bug or is it a documentation problem...?

Il sab 14 nov 2020, 18:44 Flavio Pompermaier 

ha

scritto:


I've also verified that the problem persist also using a modified

version

of the WordCount class.
If you add the code pasted at the end of this email at the end of

its

main method you can verify that the listener is called if you run

the

program from the IDE, but it's not called if you submit the job

using

the

CLI client using the command

  - bin/flink run




  /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar


Maybe this is an expected result but I didn't find any

documentation of

this behaviour (neither in the Javadoc or in the flink web site,

where

I

can't find any documentation about JobListener at all).

[Code to add to main()]
   // emit result
   if (params.has("output")) {
 counts.writeAsCsv(params.get("output"), "\n", " ");
 // execute program
 env.registerJobListener(new JobListener() {

   @Override
   public void onJobSubmitted(JobClient arg0, Throwable

arg1) {

 System.out.println(" SUBMITTED");
   }

   @Override
   public void onJobExecuted(JobExecutionResult arg0,

Throwable

arg1) {
 System.out.println(" EXECUTED");
   }
 });
 env.execute("WordCount Example");
   } else {
 System.out.println("Printing result to stdout. Use --output

to

specify output path.");
 counts.print();
   }

On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <

pomperma...@okkam.it>

wrote:


see inline

Il ven 13 nov 2020, 14:31 Matthias Pohl 

ha

scritto:


Hi Flavio,
thanks for sharing this with the Flink community. Could you answer

the

following questions, please:
- What's the code of your Job's main method?



it's actually very simple...the main class creates a batch

execution

env

using ExecutionEnvironment.getExecutionEnvironment(), I register a

job

listener to the env and I do some stuff before calling

env.execute().

The listener is executed correctly but if I use the

RestClusterClient

to

sibmit the jobGraph exyracted from that main contained in a jar,

the

program is executed as usual but the job listene

Re: How to use EventTimeSessionWindows.withDynamicGap()

2020-11-19 Thread Aljoscha Krettek

On 17.11.20 17:37, Simone Cavallarin wrote:

Hi,

I have been working on the suggestion that you gave me, thanks! The first part is to add to the 
message the gap. 1)I receive the event, 2)I take that event and I map it using  
StatefulsessionCalculator, that is where I put together "The message", and 
"long" that is my gap in millis.

DataStream source = 

Operation in front of the window that keeps track of session gaps

DataStream> enriched = source
.keyBy()
.map(new StatefulSessionCalculator()); // or process()

This is my StatefulSessionCalculator():

Tuple2 map(MyMessageType input) {
ValueState valueState = getState(myModelStateDescriptor);
MyState state = valueState.value()
state.update(input);
long suggestedGap = state.getSuggestedGap();
valueState.update(state);
return Tuple2.of(input, suggestedGap);
}

If the "gap" calculated is "1234".
The result would be: [Tom, 1.70, 50, 1605612588995], [1234]>?


That looks correct, yes.


The second step is to use the gap calculated through  
DynamicWindowGapExtractor().

DataStream<...> result = enriched
.keyBy(new MyKeySelector())
.window(EventTimeSessionWindows.withDynamicGap(new 
DynamicWindowGapExtractor()))


The DynamicWindowGapExtractor() extract the gap from the message and feed it 
back to Flink.
Could you please give me an example also for this one?


This would just be class that extends 
SessionWindowTimeGapExtractor> and returns the gap 
from the extract() method.



One thing that I don't understand is that after enriching the message my event 
that contain a POJO is nested inside tuple. How can I access it?


You would just read the first field of the tuple, i.e. tuple.f0.



The last point when you said: "I think, however, that it might be easier at this 
point to just use a stateful ProcessFunction", you meant a completely different 
approach, would be better?


That's what I meant yes. Because it seems to complicated to split the 
logic into the part that determines the dynamic gap and then another 
part that does the computation per session. It seems easier to just roll 
that into one operator that does everything. And with state and timers 
you should have enough flexibility.


Best,
Aljoscha



Re: execution.runtime-mode=BATCH when reading from Hive

2020-11-19 Thread Aljoscha Krettek

Thanks! It's good to see that it is helpful to you.

Best,
Aljoscha

On 18.11.20 18:11, Dongwon Kim wrote:

Hi Aljoscha,

Unfortunately, it's not that easy right now because normal Sinks that

rely on checkpointing to write out data, such as Kafka, don't work in
BATCH execution mode because we don't have checkpoints there. It will
work, however, if you use a source that doesn't rely on checkpointing it
will work. The FlinkKafkaProducer with Semantic.NONE should work, for
example.


As the output produced to Kafka is eventually stored on Cassandra, I might
use a different sink to write output directly to Cassandra for BATCH
execution.
In such a case, I have to replace both (A) source and (E) sink.

There is HiveSource, which is built on the new Source API that will work

well with both BATCH and STREAMING. It's quite new and it was only added
to be used by a Table/SQL connector but you might have some success with
that one.


Oh, this one is a new one which will be introduced in the upcoming 1.12
release.
How I've missed this one.
I'm going to give it a try :-)

BTW, thanks a lot for the input and the nice presentation - it's very
helpful and insightful.

Best,

Dongwon

On Wed, Nov 18, 2020 at 9:44 PM Aljoscha Krettek 
wrote:


Hi Dongwon,

Unfortunately, it's not that easy right now because normal Sinks that
rely on checkpointing to write out data, such as Kafka, don't work in
BATCH execution mode because we don't have checkopoints there. It will
work, however, if you use a source that doesn't rely on checkpointing it
will work. The FlinkKafkaProducer with Semantic.NONE should work, for
example.

There is HiveSource, which is built on the new Source API that will work
well with both BATCH and STREAMING. It's quite new and it was only added
to be used by a Table/SQL connector but you might have some success with
that one.

Best,
Aljoscha

On 18.11.20 07:03, Dongwon Kim wrote:

Hi,

Recently I've been working on a real-time data stream processing pipeline
with DataStream API while preparing for a new service to launch.
Now it's time to develop a back-fill job to produce the same result by
reading data stored on Hive which we use for long-term storage.

Meanwhile, I watched Aljoscha's talk [1] and just wondered if I could

reuse

major components of the pipeline written in DataStream API.
The pipeline conceptually looks as follows:
(A) reads input from Kafka
(B) performs AsyncIO to Redis in order to enrich the input data
(C) appends timestamps and emits watermarks before time-based window
(D) keyBy followed by a session window with a custom trigger for early
firing
(E) writes output to Kafka

I have simple (maybe stupid) questions on reusing components of the
pipeline written in DataStream API.
(1) By replacing (A) with a bounded source, can I execute the pipeline

with

a new BATCH execution mode without modifying (B)~(E)?
(2) Is there a bounded source for Hive available for DataStream API?

Best,

Dongwon

[1] https://www.youtube.com/watch?v=z9ye4jzp4DQ










Re: Logs of JobExecutionListener

2020-11-19 Thread Aljoscha Krettek
JobListener.onJobExecuted() is only invoked in 
ExecutionEnvironment.execute() and ContextEnvironment.execute(). If none 
of these is still in the call chain with that setup then the listener 
will not be invoked.


Also, this would only happen on the client, not on the broker (in your 
case) or the server (JobManager).


Does that help to debug the problem?

Aljoscha

On 19.11.20 09:49, Flavio Pompermaier wrote:

I have a spring boot job server that act as a broker towards our
application and a Flink session cluster. To submit a job I use the
FlinkRestClient (that is also the one used in the CLI client when I use the
run action it if I'm not wrong). However both methods don't trigger the job
listener.

Il gio 19 nov 2020, 09:39 Aljoscha Krettek  ha scritto:


@Flavio, when you're saying you're using the RestClusterClient, you are
not actually using that manually, right? You're just submitting your job
via "bin/flink run ...", right?

What's the exact invocation of "bin/flink run" that you're using?

On 19.11.20 09:29, Andrey Zagrebin wrote:

Hi Flavio,

I think I can reproduce what you are reporting (assuming you also pass
'--output' to 'flink run').
I am not sure why it behaves like this. I would suggest filing a Jira
ticket for this.

Best,
Andrey

On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier 
is this a bug or is it a documentation problem...?

Il sab 14 nov 2020, 18:44 Flavio Pompermaier  ha
scritto:


I've also verified that the problem persist also using a modified

version

of the WordCount class.
If you add the code pasted at the end of this email at the end of its
main method you can verify that the listener is called if you run the
program from the IDE, but it's not called if you submit the job using

the

CLI client using the command

 - bin/flink run


  /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar


Maybe this is an expected result but I didn't find any documentation of
this behaviour (neither in the Javadoc or in the flink web site, where

I

can't find any documentation about JobListener at all).

[Code to add to main()]
  // emit result
  if (params.has("output")) {
counts.writeAsCsv(params.get("output"), "\n", " ");
// execute program
env.registerJobListener(new JobListener() {

  @Override
  public void onJobSubmitted(JobClient arg0, Throwable arg1) {
System.out.println(" SUBMITTED");
  }

  @Override
  public void onJobExecuted(JobExecutionResult arg0, Throwable
arg1) {
System.out.println(" EXECUTED");
  }
});
env.execute("WordCount Example");
  } else {
System.out.println("Printing result to stdout. Use --output to
specify output path.");
counts.print();
  }

On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <

pomperma...@okkam.it>

wrote:


see inline

Il ven 13 nov 2020, 14:31 Matthias Pohl  ha
scritto:


Hi Flavio,
thanks for sharing this with the Flink community. Could you answer

the

following questions, please:
- What's the code of your Job's main method?



it's actually very simple...the main class creates a batch execution

env

using ExecutionEnvironment.getExecutionEnvironment(), I register a job
listener to the env and I do some stuff before calling env.execute().
The listener is executed correctly but if I use the RestClusterClient

to

sibmit the jobGraph exyracted from that main contained in a jar, the
program is executed as usual but the job listener is not called.

- What cluster backend and application do you use to execute the job?




I use a standalone session cluster for the moment

- Is there anything suspicious you can find in the logs that might be

related?



no unfortunately..



Best,
Matthias

On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier <
pomperma...@okkam.it> wrote:


Actually what I'm experiencing is that the JobListener is executed
successfully if I run my main class from the IDE, while the job

listener is

not fired at all if I submit the JobGraph of the application to a

cluster

using the RestClusterClient..
Am I doing something wrong?

My main class ends with the env.execute() and i do
env.registerJobListener() when I create the Exceution env
via ExecutionEnvironment.getExecutionEnvironment().

Thanks in advance for any help,
Flavio

On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <
pomperma...@okkam.it> wrote:


Hello everybody,
I'm trying to use the JobListener to track when a job finishes

(with

Flink 1.11.0).
It works great but I have the problem that logs inside
the onJobExecuted are not logged anywhere..is it normal?

Best,
Flavio














Re: Logs of JobExecutionListener

2020-11-19 Thread Aljoscha Krettek
@Flavio, when you're saying you're using the RestClusterClient, you are 
not actually using that manually, right? You're just submitting your job 
via "bin/flink run ...", right?


What's the exact invocation of "bin/flink run" that you're using?

On 19.11.20 09:29, Andrey Zagrebin wrote:

Hi Flavio,

I think I can reproduce what you are reporting (assuming you also pass
'--output' to 'flink run').
I am not sure why it behaves like this. I would suggest filing a Jira
ticket for this.

Best,
Andrey

On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier 
wrote:


is this a bug or is it a documentation problem...?

Il sab 14 nov 2020, 18:44 Flavio Pompermaier  ha
scritto:


I've also verified that the problem persist also using a modified version
of the WordCount class.
If you add the code pasted at the end of this email at the end of its
main method you can verify that the listener is called if you run the
program from the IDE, but it's not called if you submit the job using the
CLI client using the command

- bin/flink run

/home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar

Maybe this is an expected result but I didn't find any documentation of
this behaviour (neither in the Javadoc or in the flink web site, where I
can't find any documentation about JobListener at all).

[Code to add to main()]
 // emit result
 if (params.has("output")) {
   counts.writeAsCsv(params.get("output"), "\n", " ");
   // execute program
   env.registerJobListener(new JobListener() {

 @Override
 public void onJobSubmitted(JobClient arg0, Throwable arg1) {
   System.out.println(" SUBMITTED");
 }

 @Override
 public void onJobExecuted(JobExecutionResult arg0, Throwable
arg1) {
   System.out.println(" EXECUTED");
 }
   });
   env.execute("WordCount Example");
 } else {
   System.out.println("Printing result to stdout. Use --output to
specify output path.");
   counts.print();
 }

On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier 
wrote:


see inline

Il ven 13 nov 2020, 14:31 Matthias Pohl  ha
scritto:


Hi Flavio,
thanks for sharing this with the Flink community. Could you answer the
following questions, please:
- What's the code of your Job's main method?



it's actually very simple...the main class creates a batch execution env
using ExecutionEnvironment.getExecutionEnvironment(), I register a job
listener to the env and I do some stuff before calling env.execute().
The listener is executed correctly but if I use the RestClusterClient to
sibmit the jobGraph exyracted from that main contained in a jar, the
program is executed as usual but the job listener is not called.

- What cluster backend and application do you use to execute the job?




I use a standalone session cluster for the moment

- Is there anything suspicious you can find in the logs that might be

related?



no unfortunately..



Best,
Matthias

On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier <
pomperma...@okkam.it> wrote:


Actually what I'm experiencing is that the JobListener is executed
successfully if I run my main class from the IDE, while the job listener is
not fired at all if I submit the JobGraph of the application to a cluster
using the RestClusterClient..
Am I doing something wrong?

My main class ends with the env.execute() and i do
env.registerJobListener() when I create the Exceution env
via ExecutionEnvironment.getExecutionEnvironment().

Thanks in advance for any help,
Flavio

On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <
pomperma...@okkam.it> wrote:


Hello everybody,
I'm trying to use the JobListener to track when a job finishes (with
Flink 1.11.0).
It works great but I have the problem that logs inside
the onJobExecuted are not logged anywhere..is it normal?

Best,
Flavio









Re: Union SingleOutputSteramOperator and getSideOutput doesn't work

2020-11-18 Thread Aljoscha Krettek

Hi,

I'm afraid you stumbled across an inconsistency in the API. In the Java 
API we differentiate between DataStream and SingleOutputStreamOperator 
where the latter is used for "physical" operations that, among other 
things, allow things like getting side outputs.


The Scala API hides this difference but internally still must play by 
the same rules.


In your case, you would have to get the side outputs individually from 
the two operations that you union and then union the result of that one. 
This is quite cumbersome but the resulting runtime graph will be okay. 
The union operation does not exist there since it is only a logical 
operation that affects how operators/tasks are wired together.


We could try and fix this in the Scala API by trying to do what I 
suggested above underneath the covers in the getSideOutput() call. We 
would have to "unwrap" the union, apply getSideOutput() individually and 
then form the union of that and return it.


Best,
Aljoscha

On 18.11.20 11:09, Efrat Abramovitz wrote:

Cheers,


We have stumbled upon an issue regarding union streams after they have a tagged 
side output, it seems we cannot extract side output anymore.


Issue:

SingleOutputSteramOperator stream cease to be SingleOutputSteramOperator after 
union,  and cannot perform getSideOutput.

Specifically in Scala doc, there is no indication of the existing or importance 
of the type SingleOutputSteramOperator using side outputs


According to 
documentation
 it is unclear in what terms data streams keeps their side output, and the code below 
will fail with a surprising MatchError:

using scala

flink version: 1.9


import org.apache.flink.streaming.api.scala._

import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector

def main(args: Array[String]): Unit = {

   val flink = StreamExecutionEnvironment.getExecutionEnvironment

   val dataStream1 = processInt(flink.fromCollection(Seq(1, 3, 5, 7, 9, 12)))

   val dataStream2 = processInt(flink.fromCollection(Seq(2, 3, 4, 6, 8, 13)))


   dataStream1.union(dataStream2).getSideOutput(ODD).map(elem => println(elem)) 
//this line fails


   val _ = flink.execute(JOB_NAME)
}

def processInt(data: DataStream[Int]): DataStream[Int] = {

 data.process(new ProcessFunction[Int, Int] {
   override def processElement(value: Int, ctx: ProcessFunction[Int, 
Int]#Context, out: Collector[Int]): Unit = {
 if (value % 2 == 0 && value < 10) {
   ctx.output(EVEN, value)
 } else if (value % 1 == 0 && value < 10) {
   ctx.output(ODD, value)
 }
 out.collect(value)
   }
 })
   }
}


Best regards








Re: How to run a per-job cluster for a Beam pipeline w/ FlinkRunner on YARN

2020-11-18 Thread Aljoscha Krettek
Yes, these options are yarn-specific, but you can specify arbitrary 
options using -Dfoo=bar.


And yes, sorry about the confusion but -e is the parameter to use on 
Flink 1.10, it's equivalent.


Best,
Aljoscha

On 17.11.20 16:37, Dongwon Kim wrote:

Hi Aljoscha,

Thanks for the input.
The '-t' option seems to be available as of flink-1.11 while the latest
FlinkRunner is based on flink-1.10.
So I use '-e' option which is available in 1.10:

$ flink run -e yarn-per-job -d <...>


A short question here is that this command ignores *-yD* and *--yarnship*
options.
Are these options only for yarn session mode?

Best,

Dongwon




On Tue, Nov 17, 2020 at 5:16 PM Aljoscha Krettek 
wrote:


Hi,

to ensure that we really are using per-job mode, could you try and use

$ flink run -t yarn-per-job -d <...>

This will directly specify that we want to use the YARN per-job
executor, which bypasses some of the logic in the older YARN code paths
that differentiate between YARN session mode and YARN per-job mode.

Best,
Aljoscha

On 17.11.20 07:02, Tzu-Li (Gordon) Tai wrote:

Hi,

Not sure if this question would be more suitable for the Apache Beam
mailing lists, but I'm pulling in Aljoscha (CC'ed) who would know more
about Beam and whether or not this is an expected behaviour.

Cheers,
Gordon

On Mon, Nov 16, 2020 at 10:35 PM Dongwon Kim 

wrote:



Hi,

I'm trying to run a per-job cluster for a Beam pipeline w/ FlinkRunner

on

YARN as follows:


flink run -m yarn-cluster -d \


  my-beam-pipeline.jar \

  --runner=FlinkRunner \
  --flinkMaster=[auto] \
  --parallelism=8



Instead of creating a per-job cluster as wished, the above command seems
to create a session cluster and then submit a job onto the cluster.

I doubt it because
(1) In the attached file, there's "Submit New Job" which is not shown in
other per-job applications that are written in Flink APIs and submitted

to

YARN similar to the above command.

[image: beam on yarn.png]
(2) When the job is finished, the YARN application is still in its

RUNNING

state without being terminated. I had to kill the YARN application

manually.


FYI, I'm using
- Beam v2.24.0 (Flink 1.10)
- Hadoop v3.1.1

Thanks in advance,

Best,

Dongwon












Re: execution.runtime-mode=BATCH when reading from Hive

2020-11-18 Thread Aljoscha Krettek

Hi Dongwon,

Unfortunately, it's not that easy right now because normal Sinks that 
rely on checkpointing to write out data, such as Kafka, don't work in 
BATCH execution mode because we don't have checkopoints there. It will 
work, however, if you use a source that doesn't rely on checkpointing it 
will work. The FlinkKafkaProducer with Semantic.NONE should work, for 
example.


There is HiveSource, which is built on the new Source API that will work 
well with both BATCH and STREAMING. It's quite new and it was only added 
to be used by a Table/SQL connector but you might have some success with 
that one.


Best,
Aljoscha

On 18.11.20 07:03, Dongwon Kim wrote:

Hi,

Recently I've been working on a real-time data stream processing pipeline
with DataStream API while preparing for a new service to launch.
Now it's time to develop a back-fill job to produce the same result by
reading data stored on Hive which we use for long-term storage.

Meanwhile, I watched Aljoscha's talk [1] and just wondered if I could reuse
major components of the pipeline written in DataStream API.
The pipeline conceptually looks as follows:
(A) reads input from Kafka
(B) performs AsyncIO to Redis in order to enrich the input data
(C) appends timestamps and emits watermarks before time-based window
(D) keyBy followed by a session window with a custom trigger for early
firing
(E) writes output to Kafka

I have simple (maybe stupid) questions on reusing components of the
pipeline written in DataStream API.
(1) By replacing (A) with a bounded source, can I execute the pipeline with
a new BATCH execution mode without modifying (B)~(E)?
(2) Is there a bounded source for Hive available for DataStream API?

Best,

Dongwon

[1] https://www.youtube.com/watch?v=z9ye4jzp4DQ





Re: How to run a per-job cluster for a Beam pipeline w/ FlinkRunner on YARN

2020-11-17 Thread Aljoscha Krettek

Hi,

to ensure that we really are using per-job mode, could you try and use

$ flink run -t yarn-per-job -d <...>

This will directly specify that we want to use the YARN per-job 
executor, which bypasses some of the logic in the older YARN code paths 
that differentiate between YARN session mode and YARN per-job mode.


Best,
Aljoscha

On 17.11.20 07:02, Tzu-Li (Gordon) Tai wrote:

Hi,

Not sure if this question would be more suitable for the Apache Beam
mailing lists, but I'm pulling in Aljoscha (CC'ed) who would know more
about Beam and whether or not this is an expected behaviour.

Cheers,
Gordon

On Mon, Nov 16, 2020 at 10:35 PM Dongwon Kim  wrote:


Hi,

I'm trying to run a per-job cluster for a Beam pipeline w/ FlinkRunner on
YARN as follows:


flink run -m yarn-cluster -d \


 my-beam-pipeline.jar \

 --runner=FlinkRunner \
 --flinkMaster=[auto] \
 --parallelism=8



Instead of creating a per-job cluster as wished, the above command seems
to create a session cluster and then submit a job onto the cluster.

I doubt it because
(1) In the attached file, there's "Submit New Job" which is not shown in
other per-job applications that are written in Flink APIs and submitted to
YARN similar to the above command.

[image: beam on yarn.png]
(2) When the job is finished, the YARN application is still in its RUNNING
state without being terminated. I had to kill the YARN application manually.

FYI, I'm using
- Beam v2.24.0 (Flink 1.10)
- Hadoop v3.1.1

Thanks in advance,

Best,

Dongwon







Re: How to use EventTimeSessionWindows.withDynamicGap()

2020-11-16 Thread Aljoscha Krettek

Hi,

thanks for the pointer, I should have remembered that thread earlier!

I'll try and sketch what the pipeline might look like to show what I 
mean by "enriching the message" and where the operations would sit.


DataStream source = 

DataStream> enriched = source
  .keyBy()
  .map(new StatefulSessionCalculator()); // or process()

DataStream<...> result = enriched
  .keyBy(new MyKeySelector())
  .window(EventTimeSessionWindows.withDynamicGap(
new DynamicWindowGapExtractor()))
  .sideOutputLateData(lateDataSideOutputTag)
  .trigger(ContinuousEventTimeTrigger.of(Time.minutes(10)))
  .process(new ProcessWindowFunction(...));

The stateful map function could look something like this:

Tuple2 map(MyMessageType input) {
  ValueState valueState = getState(myModelStateDescriptor);
  MyState state = valueState.value()
  state.update(input);
  long suggestedGap = state.getSuggestedGap();
  valueState.update(state);
  return Tuple2.of(input, suggestedGap);
}

The two operations have to be separate because the session gap extractor 
cannot be stateful.


I think, however, that it might be easier at this point to just use a 
stateful ProcessFunction to not have to deal with the somewhat finicky 
setup of the stateful extractor just to force it into the requirements 
of the session windows API.


Best,
Aljoscha

On 14.11.20 10:50, Simone Cavallarin wrote:

Hi Aljoscha,

I found a similar question of mine by 
KristoffSC<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=user_nodes&user=2311>
 Jan, 2020, called Session Windows with dynamic gap.

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Session-Window-with-dynamic-gap-td31893.html

The idea is the same and at the end of the thread this was the solution that you 
suggested: "There are no plans of adding state support to the gap extractors but you 
could do this using a two-step approach, i.e. have an operation in front of the window 
that keeps track of session gaps, enriches the message with the gap that should be used 
and then the extractor extracts that gap. This is a more modular approach compared to 
putting everything in one operator/extractor."


1) Operation in front of the windows -> keep track of the session gaps (I have 
been reading all around for this)

   *   
(https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/windowing/assigners/SessionWindowTimeGapExtractor.html)
   *   
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SessionWindowTimeGapExtractor.java
   *   
https://www.codota.com/code/java/classes/org.apache.flink.streaming.api.windowing.assigners.SessionWindowTimeGapExtractor


2) Enrich the message with the gap that should be use (this is a parameter can 
be for example an average of the last 10 gaps?)

   *   (I got lost.) How can I enrich a message coming from Kafka, maybe adding 
this parameter to the next event?

3) The extractor extract the gap (that will be used to calculate a new gap 
parameter so it needs to be sent back on point 1 and be used on the windowing 
process)


   *   (m.. okay now complitely lost...)

Thanks
s


From: Simone Cavallarin 
Sent: 13 November 2020 16:55
To: Aljoscha Krettek 
Cc: user 
Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()

+user@


From: Simone Cavallarin 
Sent: 13 November 2020 16:46
To: Aljoscha Krettek 
Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()

Hi Aljoscha,

When you said: You could use a stateful operation (like a ProcessFunction) to put a 
dynamic "gap" into the records and then use that gap with 
EventTimeSessionWindows. I understand the theory but I'm struggling to put in practice in 
code terms.

<https://stackoverflow.com/questions/61960485/flink-session-window-not-triggered-even-with-continuouseventtimetrigger>

stream = steam
 .keyBy(new MyKeySelector())
 .window(EventTimeSessionWindows.withDynamicGap(new 
DynamicWindowGapExtractor()))
 .sideOutputLateData(lateDataSideOutputTag)
 .trigger(ContinuousEventTimeTrigger.of(Time.minutes(10))) // in case some 
key is continuously coming within the session window gap
 .process(new ProcessWindowFunction(……));


Where ProcessWindowFunction(……)update a parameter that is used inside 
DynamicWindowGapExtractor()...

I found this on the following link: 
https://stackoverflow.com/questions/61960485/flink-session-window-not-triggered-even-with-continuouseventtimetrigger

If you could help me with some examples where i can read some code it would be 
so helpful.

Thanks!


From: Aljoscha Krettek 
Sent: 13 November 2020 09:43
To: user@flink.apache.org 
Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()

Y

Re: How to use EventTimeSessionWindows.withDynamicGap()

2020-11-13 Thread Aljoscha Krettek
Yes, you're right that Flink can do this with session windows but the 
assignment will be static. In general, the smaller the session gap (or 
session timeout) the fewer windows there will be.


You're also right that you would have to somehow maintain information 
about how dense you records are in time and then use that to adjust the 
session gap. So you could use a stateful operation (like a 
ProcessFunction) to put a dynamic "gap" into the records and then use 
that gap with EventTimeSessionWindows.


Best,
Aljoscha

On 12.11.20 18:16, Simone Cavallarin wrote:

Hi Aljoscha,

Yes correct i would like to have more windows when there are more events for a 
given time frame. That is when
the events are more dense in time. I can calculate the time difference between 
each event and create a parameter that can create windows of different sizes 
dynamically based on past events. Maybe on the beginning it will be starting 
for a fix parameter but then the parameter should be learning and accommodate 
the data accordingly

Could you please give me an example on how to set the timeout?

I have been reading all around and I'm a bit confused. I thought that flink can 
create more windows when the events are more dense in time quite easily 
(https://www.ververica.com/blog/session-windowing-in-flink ).

[cid:85daf58a-bc3e-4f39-94c2-d14fe2bf9c16]

To avoid having the successive sessions become bigger and bigger so should I  
create a cap for example 1 min?

Many thanks for the help!
Best
Simon

________
From: Aljoscha Krettek 
Sent: 12 November 2020 16:34
To: user@flink.apache.org 
Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()

Hi,

I'm not sure that what you want is possible. You say you want more
windows when there are more events for a given time frame? That is when
the events are more dense in time?

Also, using the event timestamp as the gap doesn't look correct. The gap
basically specifies the timeout for a session (and I now realize that
maybe "gap" is not a good word for that). So if your timeout increases
as time goes on your successive sessions will just get bigger and bigger.

Best,
Aljoscha

On 12.11.20 15:56, Simone Cavallarin wrote:

Hi All,

I'm trying to use EventTimeSessionWindows.withDynamicGap in my application. I 
have understood that the gap is computed dynamically by a function on each 
element. What I should be able to obtain is a Flink application that can 
automatically manage the windows based on the frequency of the data. (if I have 
understood correctly)

But I'm wondering if there is any parameter to adjust the computation to do 
more windows or less windows considering the same data.

I have my event that provide "millis" of which I would like to pass to the 
function but I don't understand how, for the moment I'm trying with the code below but no 
luck.. Can you please give me some help? Thanks!


  FlinkKafkaConsumer kafkaData =
  new FlinkKafkaConsumer("CorID_1", new 
EventDeserializationSchema(), p);
  WatermarkStrategy wmStrategy =
  WatermarkStrategy
  .forMonotonousTimestamps()
  .withIdleness(Duration.ofMinutes(1))
  .withTimestampAssigner((event, timestamp) -> { return 
event.get_Time();

  });

  DataStream stream = env.addSource(
  kafkaData.assignTimestampsAndWatermarks(wmStrategy));


  DataStream Data = stream
  .keyBy((Event ride) -> ride.CorrID)
  .window(EventTimeSessionWindows.withDynamicGap((event)->{
  return event.get_Time();}));



Where from the load of the message which i receive from Kafka i convert the 
date time in millis.

   public long get_Time() {
  long tn = OffsetDateTime.parse(a_t_rt).toInstant().toEpochMilli();
  this.millis = tn;
  return millis;
  }
  public void set_a_t_rt(String a_t_rt) {
  this.a_t_rt = a_t_rt;
  }












Re: DataStream.connect semantics for Flink SQL / Table API

2020-11-12 Thread Aljoscha Krettek

Hi,

I think if you don't do any operations that are sensitive to event-time 
then just using a UNION/UNION ALL should work because then there won't 
be any buffering by event time which could delay your output.


Have you tried this and have you seen an actual delay in your output?

Best,
Aljoscha

On 12.11.20 12:57, Yuval Itzchakov wrote:

Hi,

I want to create an abstraction over N source tables (streams), and unify
them into 1 table. I know UNION and UNION ALL exist, but I'm looking for
DataStream.connect like semantics in regards to watermark control. I don't
want to take the minimum watermark over all N streams, as I know for sure
there won't be any time based calculations over the result table and I
don't want data to be delayed as the different tables composing the stream
highly vary in times of events flowing into the stream (one stream can
receive events once an hour, the other only once a day).

I don't want to turn the Table into a DataStream, since I want to leverage
predicate pushdown for the definition of the result table. Does anything
like this currently exist?





Re: How to use EventTimeSessionWindows.withDynamicGap()

2020-11-12 Thread Aljoscha Krettek

Hi,

I'm not sure that what you want is possible. You say you want more 
windows when there are more events for a given time frame? That is when 
the events are more dense in time?


Also, using the event timestamp as the gap doesn't look correct. The gap 
basically specifies the timeout for a session (and I now realize that 
maybe "gap" is not a good word for that). So if your timeout increases 
as time goes on your successive sessions will just get bigger and bigger.


Best,
Aljoscha

On 12.11.20 15:56, Simone Cavallarin wrote:

Hi All,

I'm trying to use EventTimeSessionWindows.withDynamicGap in my application. I 
have understood that the gap is computed dynamically by a function on each 
element. What I should be able to obtain is a Flink application that can 
automatically manage the windows based on the frequency of the data. (if I have 
understood correctly)

But I'm wondering if there is any parameter to adjust the computation to do 
more windows or less windows considering the same data.

I have my event that provide "millis" of which I would like to pass to the 
function but I don't understand how, for the moment I'm trying with the code below but no 
luck.. Can you please give me some help? Thanks!


 FlinkKafkaConsumer kafkaData =
 new FlinkKafkaConsumer("CorID_1", new 
EventDeserializationSchema(), p);
 WatermarkStrategy wmStrategy =
 WatermarkStrategy
 .forMonotonousTimestamps()
 .withIdleness(Duration.ofMinutes(1))
 .withTimestampAssigner((event, timestamp) -> { return 
event.get_Time();

 });

 DataStream stream = env.addSource(
 kafkaData.assignTimestampsAndWatermarks(wmStrategy));


 DataStream Data = stream
 .keyBy((Event ride) -> ride.CorrID)
 .window(EventTimeSessionWindows.withDynamicGap((event)->{
 return event.get_Time();}));



Where from the load of the message which i receive from Kafka i convert the 
date time in millis.

  public long get_Time() {
 long tn = OffsetDateTime.parse(a_t_rt).toInstant().toEpochMilli();
 this.millis = tn;
 return millis;
 }
 public void set_a_t_rt(String a_t_rt) {
 this.a_t_rt = a_t_rt;
 }









Re: Flink 1.8.3 GC issues

2020-11-12 Thread Aljoscha Krettek

Created an issue for this: https://issues.apache.org/jira/browse/BEAM-11251

On 11.11.20 19:09, Aljoscha Krettek wrote:

Hi,

nice work on debugging this!

We need the synchronized block in the source because the call to 
reader.advance() (via the invoker) and reader.getCurrent() (via 
emitElement()) must be atomic with respect to state. We cannot advance 
the reader state, not emit that record but still checkpoint the new 
reader state. The monitor ensures that no checkpoint can happen in 
between those to calls.


The basic problem is now that we starve checkpointing because the 
monitor/lock is not fair. This could be solved by using a fair lock but 
that would require Flink proper to be changed to use a fair lock instead 
of a monitor/synchronized. I don't see this as an immediate solution.


One thing that exacerbates this problem is that too many things are 
happening "under" the synchronized block. All the transforms before a 
shuffle/rebalance/keyBy are chained to the source, which means that they 
are invoked from the emitElement() call. You could see this by 
printing/logging a stacktrace in your user function that does the Redis 
lookups.


A possible mitigation would be to disable chaining globally by inserting 
a `flinkStreamEnv.disableOperatorChaining()` in [1].


A more surgical version would be to only disable chaining for sources. 
I'm attaching a patch for that in case you're willing to try it out. 
This is for latest master but it's easy enough to apply manually.


Best,
Aljoscha

[1] 
https://github.com/apache/beam/blob/d4923711cdd7b83175e21a6a422638ce530bc80c/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L225 



On 23.10.20 09:47, Piotr Nowojski wrote:

Hi Josson,

Thanks for great investigation and coming back to use. Aljoscha, could 
you

help us here? It looks like you were involved in this original BEAM-3087
issue.

Best,
Piotrek

pt., 23 paź 2020 o 07:36 Josson Paul  napisał(a):

@Piotr Nowojski   @Nico Kruber 



An update.

I am able to figure out the problem code. A change in the Apache Beam 
code

is causing this problem.





Beam introduced a lock on the “emit” in Unbounded Source. The lock is on
the Flink’s check point lock. Now the same lock is used by Flink’s timer
service to emit the Watermarks. Flink’s timer service is starved to get
hold of the lock and for some reason it never gets that lock. 
Aftereffect

  of this situation is that the ‘WaterMark’ is never emitted by Flink’s
timer service.  Because there is no Watermarks flowing through the 
system,

Sliding Windows are never closed. Data gets accumulated in the Window.



This problem occurs only if we have external lookup calls (like Redis)
happen before the data goes to Sliding Window. Something like below.



KafkaSource à Transforms (Occasional Redis
lookup)->SlidingWindow->Transforms->Kafka Sink






https://github.com/apache/beam/blob/60e0a22ea95921636c392b5aae77cb48196dd700/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L256 

. This is Beam 2.4 and you can see that there is no synchronized 
block at

line 257 and 270.




https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L264 

. This is Beam 2.15. See the synchronized block introduced in line 
264 and

280. We are using Beam 2.15 and Flink 1.8.



Beam introduced this synchronized block because of this bug.
https://issues.apache.org/jira/browse/BEAM-3087



After I removed that synchronized keyword everything started working 
fine

in my application.



What do you guys think about this?. Why does Beam need a Synchronized
block there?



Beam is using this lock ->


https://github.com/apache/flink/blob/d54807ba10d0392a60663f030f9fe0bfa1c66754/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java#L282 





Thanks,

Josson

On Mon, Sep 14, 2020 at 5:03 AM Piotr Nowojski 
wrote:


Hi Josson,

The TM logs that you attached are only from a 5 minutes time period. 
Are
you sure they are encompassing the period before the potential 
failure and
after the potential failure? It would be also nice if you would 
provide the

logs matching to the charts (like the one you were providing in the
previous messages), to correlate events (spike in latency/GC with some
timestamp from the logs).

I was not asking necessarily to upgrade to Java9, but an updated/bug
fixed version of Java8 [1].


1) In Flink 1.4 set up, the data in the Heap is throttled. It never

goes out of memory whatever be the ingestion rate. our Windows are 5
minutes windows.

2) In Flink 1.8 set up, HeapKeyedStateBackend is never throttled and
fills up fast. When Old-gen space goes beyond 60-70% even the Mixed 
GC or

Full GC doesn't 

Re: Flink 1.8.3 GC issues

2020-11-11 Thread Aljoscha Krettek
s a finalize method. I have attached spreadsheet

(

*Object-explorer.csv*) to give you a high level view
3) The difference between working cluster and NON working

cluster is

only on Beam and Flink. Hardware, Input message rate,

Application

jars,
Kafka are all the same between those 2 clusters. Working

cluster was

with
Flink 1.4 and Beam 2.4.0

Any insights into this will help me to debug further

Thanks,
Josson


On Thu, Sep 3, 2020 at 3:34 AM Piotr Nowojski <

pnowoj...@apache.org>


wrote:

Hi,

Have you tried using a more recent Flink version? 1.8.x is

no longer

supported, and latest versions might not have this issue

anymore.


Secondly, have you tried backtracking those references to the
Finalizers? Assuming that Finalizer is indeed the class

causing

problems.

Also it may well be a non Flink issue [1].

Best regards,
Piotrek

[1] https://issues.apache.org/jira/browse/KAFKA-8546

czw., 3 wrz 2020 o 04:47 Josson Paul 

napisał(a):

Hi All,

*ISSUE*
--
Flink application runs for sometime and suddenly the CPU

shoots up

and touches the peak, POD memory reaches to the peak, GC

count

increases,
Old-gen spaces reach close to 100%. Full GC doesn't clean

up heap

space. At
this point I stopped sending the data and cancelled the

Flink Jobs.

Still
the Old-Gen space doesn't come down. I took a heap dump and

can see

that
lot of Objects in the java.lang.Finalizer class. I have

attached the

details in a word document. I do have the heap dump but it

is close

to 2GB
of compressed size. Is it safe to upload somewhere and

share it

here?.

This issue doesn't happen in Flink: 1.4.0 and Beam:

release-2.4.0


*WORKING CLUSTER INFO* (Flink: 1.4.0 and Beam:

release-2.4.0)



Application reads from Kafka and does aggregations and

writes into

Kafka. Application has 5 minutes windows. Application uses

Beam

constructs
to build the pipeline. To read and write we use Beam

connectors.


Flink version: 1.4.0
Beam version: release-2.4.0
Backend State: State backend is in the Heap and check

pointing

happening to the distributed File System.

No of task Managers: 1
Heap: 6.4 GB
CPU: 4 Cores
Standalone cluster deployment on a Kubernetes pod

*NOT WORKING CLUSTER INFO* (Flink version: 1.8.3 and Beam

version:

release-2.15.0)
--
Application details are same as above

*No change in application and the rate at which data is

injected.

But change in Flink and Beam versions*


Flink version: 1.8.3
Beam version: release-2.15.0
Backend State: State backend is in the Heap and check

pointing

happening to the distributed File System.

No of task Managers: 1
Heap: 6.5 GB
CPU: 4 Cores

Deployment: Standalone cluster deployment on a Kubernetes

pod


My Observations
-

1) CPU flame graph shows that in the working version, the

cpu time

on GC is lesser compared to non-working version (Please see

the

attached
Flame Graph. *CPU-flame-WORKING.svg* for working cluster and
*CPU-flame-NOT-working.svg*)

2) I have attached the flame graph for native memory MALLOC

calls

when the issue was happening. Please find the attached SVG

image (

*malloc-NOT-working.svg*). The POD memory peaks when this

issue

happens. For me, it looks like the GC process is requesting

a lot of

native
memory.

3) When the issue is happening the GC cpu usage is very

high. Please

see the flame graph (*CPU-graph-at-issuetime.svg*)

Note: SVG file can be opened using any browser and it is

clickable

while opened.
--
Thanks
Josson


--
Thanks
Josson


--
Thanks
Josson


--
Thanks
Josson








--
Thanks
Josson





--
Thanks
Josson





--
Thanks
Josson





From c93636ad96b98eb1e56d7aa8dc9fe1e09272cbe3 Mon Sep 17 00:00:00 2001
From: Aljoscha Krettek 
Date: Wed, 11 Nov 2020 19:05:10 +0100
Subject: [PATCH] [BEAM-X] Don't chain sources to avoid checkpoint
 starvation

---
 .../flink/FlinkStreamingPortablePipelineTranslator.java | 3 ++-
 .../runners/flink/FlinkStreamingTransformTranslators.java   | 6 --
 2 files changed, 6 insertions(+), 3 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index 2112941350..77627a3fb7 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -542,7 +542,8 @@ public class FlinkStreamingPortablePipelineTranslator
 source =
 nonDedupSource
 .flatMap(new 
FlinkStreamingTransformTranslators.StripIdsMap<>(pipelineOptions))
-.returns(outputTypeInfo);
+.returns(outputTypeInfo)
+.disableChaining();
   }
 } catch (Exception e) {
   throw new RuntimeExc

Re: Flink-Kafka exactly once errors even with transaction.timeout.ms set

2020-11-11 Thread Aljoscha Krettek
Hmm, could you please post the full stack trace that leads to the 
TimeoutException?


Best,
Aljoscha

On 10.11.20 17:54, Tim Josefsson wrote:

Hey Aljoscha,

I'm setting the transaction.timeout.ms when I create the FlinkKafkaProducer:

I create a Properties object and then set the property and finally add
those properties when creating the producer.

Properties producerProps = new Properties();
producerProps.setProperty("transaction.timeout.ms", "90");

If I don't set that property my I instead get the following config when
starting the job:
11:41:56,345 INFO  org.apache.kafka.clients.producer.ProducerConfig
  - ProducerConfig values:
acks = 1
[omitted for brevity]
transaction.timeout.ms = 6
transactional.id = Source: Read player events from Kafka -> Map
  Json to HashMap -> Add CanonicalTime as timestamp -> Filter dates not
needed for backfill -> Sink: Post events to playerEvents
Kafka-a15b4dd4812495cebdc94e33125ef858-1
value.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer

So I imagine the Producer is picking up the change but it still returns
errors when running the job.

Best regards,
Tim


On Tue, 10 Nov 2020 at 16:14, Aljoscha Krettek  wrote:


On 10.11.20 11:53, Tim Josefsson wrote:

Also when checking my logs I see the following message:
11:41:56,345 INFO  org.apache.kafka.clients.producer.ProducerConfig
   - ProducerConfig values:
 acks = 1
 [omitted for brevity]
 transaction.timeout.ms = 90
 transactional.id = Source: Read player events from Kafka -> Map
   Json to HashMap -> Add CanonicalTime as timestamp -> Filter dates not
needed for backfill -> Sink: Post events to playerEvents
Kafka-a15b4dd4812495cebdc94e33125ef858-1
 value.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer


The interesting thing would be to figure out where that
`transaction.timeout.ms = 90` is coming from. The default from Flink
would be 6, if nothing is configured. Are you specifying that value,
maybe from the commandline or in code?

Maybe it's a funny coincidence, but our StreamingKafkaITCase also
specifies that timeout value.

Best,
Aljoscha








Re: Flink-Kafka exactly once errors even with transaction.timeout.ms set

2020-11-10 Thread Aljoscha Krettek

On 10.11.20 11:53, Tim Josefsson wrote:

Also when checking my logs I see the following message:
11:41:56,345 INFO  org.apache.kafka.clients.producer.ProducerConfig
  - ProducerConfig values:
acks = 1
[omitted for brevity]
transaction.timeout.ms = 90
transactional.id = Source: Read player events from Kafka -> Map
  Json to HashMap -> Add CanonicalTime as timestamp -> Filter dates not
needed for backfill -> Sink: Post events to playerEvents
Kafka-a15b4dd4812495cebdc94e33125ef858-1
value.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer


The interesting thing would be to figure out where that 
`transaction.timeout.ms = 90` is coming from. The default from Flink 
would be 6, if nothing is configured. Are you specifying that value, 
maybe from the commandline or in code?


Maybe it's a funny coincidence, but our StreamingKafkaITCase also 
specifies that timeout value.


Best,
Aljoscha



Re: How to use properly the function: withTimestampAssigner((event, timestamp) ->..

2020-11-09 Thread Aljoscha Krettek
@Till For instances where we use withTimestampAssigner() the examples in 
the docs always use the explicit generic parameter. (See 
event_timestamps_watermarks.md and streaming_analytics.md). For cases 
where we don't use withTimestampAssigner() we don't need the extra 
generic parameter because the compiler can figure it out.

But yes, the Java compiler is not very helpful here... 😅


Best,
Aljoscha

On 09.11.20 09:35, Till Rohrmann wrote:
> Glad to hear it!
> 
> Cheers,
> Till
> 
> On Sun, Nov 8, 2020 at 8:02 PM Simone Cavallarin 
> wrote:
> 
>> Hi Till,
>>
>> That's great! thank you so much!!! I have spent one week on this. I'm so
>> relieved!
>>
>> Cheers
>>
>> s
>>
>>
>> --
>> *From:* Till Rohrmann 
>> *Sent:* 06 November 2020 17:56
>> *To:* Simone Cavallarin 
>> *Cc:* user@flink.apache.org ; Aljoscha Krettek <
>> aljos...@apache.org>
>> *Subject:* Re: How to use properly the function:
>> withTimestampAssigner((event, timestamp) ->..
>>
>> Hi Simone,
>>
>> The problem is that the Java 1.8 compiler cannot do type inference when
>> chaining methods [1].
>>
>> The solution would be
>>
>> WatermarkStrategy wmStrategy =
>>      WatermarkStrategy
>>  .forMonotonousTimestamps()
>>  .withTimestampAssigner((event, timestamp) -> {
>> return event.getTime();
>>  });
>>
>> @Aljoscha Krettek  I think we need to update the
>> documentation about it. We have some examples which don't take this into
>> account.
>>
>> [1]
>> https://e.printstacktrace.blog/java-type-inference-generic-methods-chain-call/
>>
>> Cheers,
>> Till
>>
>> On Fri, Nov 6, 2020 at 4:19 PM Simone Cavallarin 
>> wrote:
>>
>> Hi,
>>
>> I'm taking the timestamp from the event payload that I'm receiving from
>> Kafka.
>>
>> I'm struggling to get the time and I'm confused on how I should use the
>> function ".withTimestampAssigner()". I'm receiving an error on event.
>> getTime() that is telling me: *"cannot resolve method "Get Time" in
>> "Object"* and I really don't understand how I can fix it.  My class is
>> providing a long so the variable itself should be fine. Any help would be
>> really appreciated.
>>
>> *This is my code:*
>>
>> *FlinkKafkaConsumer kafkaData =*
>> *new FlinkKafkaConsumer("CorID_0", new
>> EventDeserializationSchema(), p);*
>> *WatermarkStrategy wmStrategy =*
>> *WatermarkStrategy*
>> *.forMonotonousTimestamps()*
>> *.withTimestampAssigner((event, timestamp) -> {
>> return event.**getTime();*
>> *});*
>>
>> *DataStream stream = env.addSource(*
>> *kafkaData.assignTimestampsAndWatermarks(wmStrategy));*
>>
>>
>> And to give you the idea of the whole project,
>>
>> *This is the EventDeserializationSchema class:*
>>
>> *public class EventDeserializationSchema implements
>> DeserializationSchema {*
>>
>> *private static final long serialVersionUID = 1L;*
>>
>>
>> *private static final CsvSchema schema = CsvSchema.builder()*
>> *.addColumn("firstName")*
>> *.addColumn("lastName")*
>> *.addColumn("age", CsvSchema.ColumnType.NUMBER)*
>> *.addColumn("time")*
>> *.build();*
>>
>> *private static final ObjectMapper mapper = new CsvMapper();*
>>
>> *@Override*
>> *public Event deserialize(byte[] message) throws IOException {*
>> *return
>> mapper.readerFor(Event.class).with(schema).readValue(message);*
>> *}*
>>
>> *@Override*
>> *public boolean isEndOfStream(Event nextElement) {*
>> *return false;*
>> *}*
>>
>> *@Override*
>> *public TypeInformation getProducedType() {*
>>
>> *return TypeInformation.of(Event.class);*
>> *}*
>> *}*
>>
>> *And this is the Event Class:*
>>
>> *public class Event implements Serializable {*
>> *public String firstName;*
>> *public String lastName;*
>> *private int age;*
>> *public Long time;*
>>
>>
>>
>> *public Event() {*
>> *}*
>>
>> *public String getFirstName() {*
>> *return firstName;*
>> *}*
>>
>> *public void setFirstName(String firstName) {*
>> *this.firstName = firstName;*
>> *}*
>>
>> *public String getLastName() {*
>> *return lastName;*
>> *}*
>>
>> *public void setLastName(String lastName) {*
>> *this.lastName = lastName;*
>> *}*
>>
>> *public int getAge() {*
>> *return age;*
>> *}*
>>
>> *public void setAge(int age) {*
>> *this.age = age;*
>> *}*
>>
>> *public long getTime() {*
>> *return time;*
>> *}*
>>
>> *public void setTime(String kafkaTime) {*
>> *long tn =
>> OffsetDateTime.parse(kafkaTime).toInstant().toEpochMilli();*
>> *this.time = tn;*
>> *}*
>> *}*
>>
>>
>>
>>
>>
>>
> 



Re: Flink kafka - Message Prioritization

2020-11-04 Thread Aljoscha Krettek

I'm afraid there's nothing in Flink that would make this possible right now.

Have you thought about if this would be possible by using the vanilla 
Kafka Consumer APIs? I'm not sure that it's possible to read messages 
with prioritization using their APIs.


Best,
Aljoscha

On 04.11.20 08:34, Robert Metzger wrote:

Hi Vignesh,

I'm adding Aljoscha to the thread, he might have an idea how to solve this
with the existing Flink APIs (the closest idea I had was the N-ary stream
operator, but I guess that doesn't support backpressuring individual
upstream operators -- side inputs would be needed for that?)

The only somewhat feasible idea I came up with, which only makes sense if
you don't need any exactly once guarantees, is implementing your own Kafka
connector (or forking the existing Kafka connector in Flink (then you could
also get exactly once)).
In this custom Kafka connector, you could, conceptually have two Kafka
consumers each feeding messages into their bounded queue. A third thread is
always emptying the messages from the queue with priority.

Best,
Robert


On Thu, Oct 29, 2020 at 1:29 PM Vignesh Ramesh 
wrote:


Hi,

I have a flink pipeline which reads from a kafka topic does a map
operation(builds an ElasticSearch model) and sinks it to Elasticsearch

*Pipeline-1:*

Flink-Kafka-Connector-Consumer(topic1) (parallelism 8) -> Map (parallelism
8) -> Flink-Es-connector-Sink(es1) (parallelism 8)

Now i want some messages to be prioritized(processed quickly not
necessarily in any order). I am okay in creating a new topic and placing
the priority messages in it (or) do a partition based buckets(Ex:
https://github.com/riferrei/bucket-priority-pattern i don't think it's
possible in flink kafka connector since partition assignment is present
inside FlinkKafkaConsumerBase ).

*I tried the below solution:*

I created another topic (topic2 in which i placed the priority messages)
and with it a new Flink pipeline

*Pipeline-2:*

Flink-Kafka-Connector-Consumer(topic2) (parallelism 8) -> Map (parallelism
8) -> Flink-Es-connector-Sink(es1) (parallelism 8)

But the problem is, I want to consume topic2 as soon as possible. I can
have a delay/slowness in topic1 because of that. If there is no message in
topic2 then topic1 should be given more priority. But in the above case
both the pipelines are getting processed equally. Increasing the
parallelism of pipeline-2 to a big number doesn't help as when there is no
message in topic2 then topic1 is still very slow(parallelism of topic 2 is
wasted).

How can i achieve this using Flink Kafka connector? Is it possible to
achieve it in any other way?


Regards,

Vignesh







Re: Filter By Value in List

2020-11-02 Thread Aljoscha Krettek
I believe this is happening because the type system does not recognize 
that list of Strings as anything special but treats it as a black-box type.


@Timo: Would this work with the new type system?

Best,
Aljoscha

On 02.11.20 06:47, Rex Fenley wrote:

Hello,

I'm trying to filter the rows of a table by whether or not a value exists
in an array column of a table.
Simple example:
table.where("apple".in($"fruits"))

In this example, each row has a "fruits" Array column that could
have 1 or many fruit strings which may or may not be "apple".

However, I keep receiving the following error when I do something similar
to the example above:
"IN operator on incompatible types: String and GenericType"

Is there any way to accomplish this?

Thanks!





Re: Error parsing annotations in flink-table-planner_blink_2.12-1.11.2

2020-11-02 Thread Aljoscha Krettek
But you're not using apiguardian yourself or have it as a dependency 
before this, right?


Best,
Aljoscha

On 02.11.20 14:59, Yuval Itzchakov wrote:

Yes, I'm using SBT.

I managed to resolve this by adding:

"org.apiguardian" % "apiguardian-api" % "1.1.0"

To the dependency list. Perhaps this depedency needs to be shaded as well
in flink-core?

My SBT looks roughly like this:

   lazy val flinkVersion = "1.11.2"
   libraryDependencies ++= Seq(
 "org.apache.flink"%% "flink-table-planner-blink"
  % flinkVersion,
 "org.apache.flink"%% "flink-table-runtime-blink"
  % flinkVersion,
 "org.apache.flink"%% "flink-table-api-scala-bridge"
% flinkVersion,
 "org.apache.flink" % "flink-s3-fs-hadoop"
% flinkVersion,
 "org.apache.flink"%% "flink-container"
  % flinkVersion,
 "org.apache.flink"%% "flink-connector-kafka"
  % flinkVersion,
 "org.apache.flink" % "flink-connector-base"
% flinkVersion,
 "org.apache.flink" % "flink-table-common"
% flinkVersion,
 "org.apache.flink"%% "flink-cep"
  % flinkVersion,
 "org.apache.flink"%% "flink-scala"
  % flinkVersion % "provided",
 "org.apache.flink"%% "flink-streaming-scala"
  % flinkVersion % "provided",
 "org.apache.flink" % "flink-json"
% flinkVersion % "provided",
 "org.apache.flink" % "flink-avro"
% flinkVersion % "provided",
 "org.apache.flink"%% "flink-parquet"
  % flinkVersion % "provided",
 "org.apache.flink"%% "flink-runtime-web"
  % flinkVersion % "provided",
 "org.apache.flink"%% "flink-runtime"
  % flinkVersion % "test" classifier "tests",
 "org.apache.flink"%% "flink-streaming-java"
% flinkVersion % "test" classifier "tests",
 "org.apache.flink"%% "flink-test-utils"
% flinkVersion % "test",
   )

On Mon, Nov 2, 2020 at 3:21 PM Aljoscha Krettek  wrote:


@Timo and/or @Jark, have you seen this problem before?

@Yuval, I'm assuming you're using sbt as a build system, is that
correct? Could you maybe also post a snippet of your build file that
shows the dependency setup or maybe the whole file(s).

Best,
Aljoscha

On 01.11.20 13:34, Yuval Itzchakov wrote:

Hi,

While trying to compile an application with a dependency on
flink-table-planner_blink_2.12-1.11.2, I receive the following error
message during compilation:

scalac: While parsing annotations in /Library/Caches/Coursier/v1/https/


repo1.maven.org/maven2/org/apache/flink/flink-table-planner-blink_2.12/1.11.2/flink-table-planner-blink_2.12-1.11.2.jar(org/apache/calcite/sql/SqlKind.class)
,

could not find EXPERIMENTAL in enum .
This is likely due to an implementation restriction: an annotation

argument

cannot refer to a member of the annotated class (scala/bug#7014).

Has anyone encountered this issue?










Re: LEGACY('STRUCTURED_TYPE' to pojo

2020-11-02 Thread Aljoscha Krettek
@Timo: Is this sth that would work when using the new type stack? From 
the message I'm assuming it's using the older type stack.


@Rex: Which Flink version are you using and could you maybe post the 
code snipped that you use to do conversions?


Best,
Aljoscha

On 02.11.20 06:50, Rex Fenley wrote:

Maybe this is related to this issue?
https://issues.apache.org/jira/browse/FLINK-17683

On Fri, Oct 30, 2020 at 8:43 PM Rex Fenley  wrote:


Correction, I'm using Scala case classes not strictly Java POJOs just to
be clear.

On Fri, Oct 30, 2020 at 7:56 PM Rex Fenley  wrote:


Hello,

I keep running into trouble moving between DataStream and SQL with POJOs
because my nested POJOs turn into LEGACY('STRUCTURED_TYPE', is there any
way to convert them back to POJOs in Flink when converting a SQL Table back
to a DataStream?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
  |  FOLLOW US   |  LIKE US





--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG   |
  FOLLOW US   |  LIKE US









Re: Error parsing annotations in flink-table-planner_blink_2.12-1.11.2

2020-11-02 Thread Aljoscha Krettek

@Timo and/or @Jark, have you seen this problem before?

@Yuval, I'm assuming you're using sbt as a build system, is that 
correct? Could you maybe also post a snippet of your build file that 
shows the dependency setup or maybe the whole file(s).


Best,
Aljoscha

On 01.11.20 13:34, Yuval Itzchakov wrote:

Hi,

While trying to compile an application with a dependency on
flink-table-planner_blink_2.12-1.11.2, I receive the following error
message during compilation:

scalac: While parsing annotations in /Library/Caches/Coursier/v1/https/
repo1.maven.org/maven2/org/apache/flink/flink-table-planner-blink_2.12/1.11.2/flink-table-planner-blink_2.12-1.11.2.jar(org/apache/calcite/sql/SqlKind.class),
could not find EXPERIMENTAL in enum .
This is likely due to an implementation restriction: an annotation argument
cannot refer to a member of the annotated class (scala/bug#7014).

Has anyone encountered this issue?





Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

2020-10-10 Thread Aljoscha Krettek

Hi Dan,

did you try using the JobClient you can get from the TableResult to wait 
for job completion? You can get a CompletableFuture for the JobResult 
which should help you.


Best,
Aljoscha

On 08.10.20 23:55, Dan Hill wrote:

I figured out the issue.  The join caused part of the job's execution to be
delayed.  I added my own hacky wait condition into the test to make sure
the join job finishes first and it's fine.

What common test utilities exist for Flink?  I found
flink/flink-test-utils-parent.  I implemented a simple sleep loop to wait
for jobs to finish.  I'm guessing this can be done with one of the other
utilities.

Are there any open source test examples?

How are watermarks usually sent with Table API in tests?

After I collect some answers, I'm fine updating the Flink testing page.
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#testing-flink-jobs

On Thu, Oct 8, 2020 at 8:52 AM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:


Can't comment on the SQL issues, but here's our exact setup for Bazel and
Junit5 w/ the resource files approach:
https://github.com/fintechstudios/vp-operator-demo-ff-virtual-2020/tree/master/tools/junit

Best,
Austin

On Thu, Oct 8, 2020 at 2:41 AM Dan Hill  wrote:


I was able to get finer grained logs showing.  I switched from
-Dlog4j.configuration to -Dlog4j.configurationFile and it worked.  With my
larger test case, I was hitting a silent log4j error.  When I created a
small test case to just test logging, I received a log4j error.

Here is a tar
<https://drive.google.com/file/d/1b6vJR_hfaRZwA28jKNlUBxDso7YiTIbk/view?usp=sharing>
with the info logs for:
- (test-nojoin.log) this one works as expected
- (test-join.log) this does not work as expected

I don't see an obvious issue just by scanning the logs.  I'll take a
deeper in 9 hours.




On Wed, Oct 7, 2020 at 8:28 PM Dan Hill  wrote:


Switching to junit4 did not help.

If I make a request to the url returned from
MiniClusterWithClientResource.flinkCluster.getClusterClient().getWebInterfaceURL(),
I get
{"errors":["Not found."]}.  I'm not sure if this is intentional.




On Tue, Oct 6, 2020 at 4:16 PM Dan Hill  wrote:


@Aljoscha - Thanks!  That setup lets fixing the hacky absolute path
reference.  However, the actual log calls are not printing to the console.
Only errors appear in my terminal window and the test logs.  Maybe console
logger does not work for this junit setup.  I'll see if the file version
works.

On Tue, Oct 6, 2020 at 4:08 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:


What Aljoscha suggested is what works for us!

On Tue, Oct 6, 2020 at 6:58 PM Aljoscha Krettek 
wrote:


Hi Dan,

to make the log properties file work this should do it: assuming the
log4j.properties is in //src/main/resources. You will need a
BUILD.bazel
in that directory that has only the line
"exports_files(["log4j.properties"]). Then you can reference it in
your
test via "resources = ["//src/main/resources:log4j.properties"],". Of
course you also need to have the right log4j deps (or slf4j if you're
using that)

Hope that helps!

Aljoscha

On 07.10.20 00:41, Dan Hill wrote:

I'm trying to use Table API for my job.  I'll soon try to get a test
working for my stream job.
- I'll parameterize so I can have different sources and sink for

tests.

How should I mock out a Kafka source?  For my test, I was planning

on

changing the input to be from a temp file (instead of Kafka).
- What's a good way of forcing a watermark using the Table API?


On Tue, Oct 6, 2020 at 3:35 PM Dan Hill 

wrote:



Thanks!

Great to know.  I copied this junit5-jupiter-starter-bazel
<

https://github.com/junit-team/junit5-samples/tree/main/junit5-jupiter-starter-bazel>
rule

into my repository (I don't think junit5 is supported directly with
java_test yet).  I tried a few ways of bundling `log4j.properties`

into the

jar and didn't get them to work.  My current iteration hacks the
log4j.properties file as an absolute path.  My failed attempts

would spit

an error saying log4j.properties file was not found.  This route

finds it

but the log properties are not used for the java logger.

Are there a better set of rules to use for junit5?

# build rule
java_junit5_test(
  name = "tests",
  srcs = glob(["*.java"]),
  test_package = "ai.promoted.logprocessor.batch",
  deps = [...],
  jvm_flags =


["-Dlog4j.configuration=file:///Users/danhill/code/src/ai/promoted/logprocessor/batch/log4j.properties"],

)

# log4j.properties
status = error
name = Log4j2PropertiesConfig
appenders = console
appender.console.type = Console
appender.console.name = LogToConsole
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d [%t] %-5p %c - %m%n
rootLogger.level = info

Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

2020-10-06 Thread Aljoscha Krettek

Hi Dan,

to make the log properties file work this should do it: assuming the 
log4j.properties is in //src/main/resources. You will need a BUILD.bazel 
in that directory that has only the line 
"exports_files(["log4j.properties"]). Then you can reference it in your 
test via "resources = ["//src/main/resources:log4j.properties"],". Of 
course you also need to have the right log4j deps (or slf4j if you're 
using that)


Hope that helps!

Aljoscha

On 07.10.20 00:41, Dan Hill wrote:

I'm trying to use Table API for my job.  I'll soon try to get a test
working for my stream job.
- I'll parameterize so I can have different sources and sink for tests.
How should I mock out a Kafka source?  For my test, I was planning on
changing the input to be from a temp file (instead of Kafka).
- What's a good way of forcing a watermark using the Table API?


On Tue, Oct 6, 2020 at 3:35 PM Dan Hill  wrote:


Thanks!

Great to know.  I copied this junit5-jupiter-starter-bazel
<https://github.com/junit-team/junit5-samples/tree/main/junit5-jupiter-starter-bazel>
 rule
into my repository (I don't think junit5 is supported directly with
java_test yet).  I tried a few ways of bundling `log4j.properties` into the
jar and didn't get them to work.  My current iteration hacks the
log4j.properties file as an absolute path.  My failed attempts would spit
an error saying log4j.properties file was not found.  This route finds it
but the log properties are not used for the java logger.

Are there a better set of rules to use for junit5?

# build rule
java_junit5_test(
 name = "tests",
 srcs = glob(["*.java"]),
 test_package = "ai.promoted.logprocessor.batch",
 deps = [...],
 jvm_flags =
["-Dlog4j.configuration=file:///Users/danhill/code/src/ai/promoted/logprocessor/batch/log4j.properties"],
)

# log4j.properties
status = error
name = Log4j2PropertiesConfig
appenders = console
appender.console.type = Console
appender.console.name = LogToConsole
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d [%t] %-5p %c - %m%n
rootLogger.level = info
rootLogger.appenderRefs = stdout
rootLogger.appenderRef.stdout.ref = LogToConsole

On Tue, Oct 6, 2020 at 3:34 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:


Oops, this is actually the JOIN issue thread [1]. Guess I should revise
my previous "haven't had issues" statement hah. Sorry for the spam!

[1]:
apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Streaming-SQL-Job-Switches-to-FINISHED-before-all-records-processed-td38382.html

On Tue, Oct 6, 2020 at 6:32 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:


Unless it's related to this issue[1], which was w/ my JOIN and time
characteristics, though not sure that applies for batch.

Best,
Austin

[1]:
apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-SQL-Streaming-Join-Creates-Duplicates-td37764.html


On Tue, Oct 6, 2020 at 6:20 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:


Hey Dan,

We use Junit5 and Bazel to run Flink SQL tests on a mini cluster and
haven’t had issues, though we’re only testing on streaming jobs.

Happy to help setting up logging with that if you’d like.

Best,
Austin

On Tue, Oct 6, 2020 at 6:02 PM Dan Hill  wrote:


I don't think any of the gotchas apply to me (at the bottom of this
link).

https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#junit-rule-miniclusterwithclientresource

I'm assuming for a batch job that I don't have to do anything for:
"You can implement a custom parallel source function for emitting
watermarks if your job uses event time timers."

On Tue, Oct 6, 2020 at 2:42 PM Dan Hill  wrote:


I've tried to enable additional logging for a few hours today.  I
think something with junit5 is swallowing the logs.  I'm using Bazel and
junit5.  I setup MiniClusterResourceConfiguration using a custom
extension.  Are there any known issues with Flink and junit5?  I can try
switching to junit4.

When I've binary searched this issue, this failure happens if my
query in step 3 has a join it.  If I remove the join, I can remove step 4
and the code still works.  I've renamed a bunch of my tables too and the
problem still exists.





On Tue, Oct 6, 2020, 00:42 Aljoscha Krettek 
wrote:


Hi Dan,

there were some bugs and quirks in the MiniCluster that we recently
fixed:

   - https://issues.apache.org/jira/browse/FLINK-19123
   - https://issues.apache.org/jira/browse/FLINK-19264

But I think they are probably unrelated to your case. Could you
enable
logging and see from the logs whether the 2) and 3) jobs execute
correctly on the MiniCluster?

Best,
Aljoscha

On 06.10.20 08:08, Dan Hill wrote:

I'm writing a test for a batch job using

MiniClusterResourceConfiguration.


Here's 

Re: 回复: need help about "incremental checkpoint",Thanks

2020-10-06 Thread Aljoscha Krettek

I'm forwarding my comment from the Jira Issue [1]:

In 
https://github.com/appleyuchi/wrongcheckpoint/blob/master/src/main/scala/wordcount_increstate.scala 
you set the RocksDBStateBackend, in 
https://github.com/appleyuchi/wrongcheckpoint/blob/master/src/main/scala/StateWordCount.scala 
you set the FsStateBackend. This will not work because the RocksDB 
savepoint is not compatible.


Best,
Aljoscha

[1] https://issues.apache.org/jira/browse/FLINK-19486

On 06.10.20 11:02, ?? wrote:

I don't  know where I did change the state backends.


There are two meaning of "restarting":
??Restarting automatically(success in my experiment)
??Restarting manually(failure in my experiment)


The whole experiment(just a wordcount case) and steps are listed in my github:
https://github.com/appleyuchi/wrongcheckpoint
Could you spare some time for me to check it?
Thanks for your help~!




--  --
??:  
  "David Anderson" 
   https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#directory-structure
>
> at the end of above link,it said:
>
> $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
>
> I have tried the above command in previous experiment,but still no luck.
> And why the above official command has " :" after "run -s"?
> I guess " :" not necessary.
>
> Could you tell me what the right command is to recover(resume) from 
incremental checkpoint(RocksdbStateBackEnd)?
>
> Much Thanks~!
>
>
> --  --
> ??: "??" https://flink.apache.org/zh/community.html for how to join the list.
>
> Best,
> David
>
> On Fri, Oct 2, 2020 at 4:45 PM ?? https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>
>> Best,
>> David
>>
>> On Fri, Oct 2, 2020 at 4:07 PM ?? https://paste.ubuntu.com/p/DpTyQKq6Vk/
>>>
>>>  
>>>
>>> pom.xml is:
>>>
>>> http://maven.apache.org/POM/4.0.0";
>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd">;
>>> https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
>>> https://mvnrepository.com/artifact/org.apache.flink/flink-cep 
-->
>>> https://paste.ubuntu.com/p/49HRYXFzR2/
>>>
>>>  
>>>
>>> some of the above error is:
>>>
>>> Caused by: java.lang.IllegalStateException: Unexpected state 
handle type, expected: class org.apache.flink.runtime.state.KeyGroupsStateHandle, but 
found: class org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle
>>>
>>>  
>>>
>>>  
>>>
>>> The steps are:
>>>
>>> 1.mvn clean scala:compile compile package
>>>
>>> 2.nc -lk 
>>>
>>> 3.flink run -c wordcount_increstate 
 datastream_api-1.0-SNAPSHOT.jar
>>> Job has been submitted with JobID df6d62a43620f258155b8538ef0ddf1b
>>>
>>> 4.input the following conents in nc -lk 
>>>
>>> before
>>> error
>>> error
>>> error
>>> error
>>>
>>> 5.
>>>
>>> flink run -s 
hdfs://Desktop:9000/tmp/flinkck/df6d62a43620f258155b8538ef0ddf1b/chk-22 -c StateWordCount 
datastream_api-1.0-SNAPSHOT.jar
>>>
>>> Then the above error happens.
>>>
>>>  
>>>
>>> Please help,Thanks~!
>>>
>>>
>>> I have tried to subscried to user@flink.apache.org;
>>>
>>> but no replies.If possible ,send to appleyu...@foxmail.com with 
your valuable replies,thanks.
>>>
>>>  





Re: Issue with Flink - unrelated executeSql causes other executeSqls to fail.

2020-10-06 Thread Aljoscha Krettek

Hi Dan,

there were some bugs and quirks in the MiniCluster that we recently fixed:

 - https://issues.apache.org/jira/browse/FLINK-19123
 - https://issues.apache.org/jira/browse/FLINK-19264

But I think they are probably unrelated to your case. Could you enable 
logging and see from the logs whether the 2) and 3) jobs execute 
correctly on the MiniCluster?


Best,
Aljoscha

On 06.10.20 08:08, Dan Hill wrote:

I'm writing a test for a batch job using MiniClusterResourceConfiguration.

Here's a simple description of my working test case:
1) I use TableEnvironment.executeSql(...) to create a source and sink table
using tmp filesystem directory.
2) I use executeSql to insert some test data into the source tabel.
3) I use executeSql to select from source and insert into sink.
4) I use executeSql from the same source to a different sink.

When I do these steps, it works.  If I remove step 4, no data gets written
to the sink.  My actual code is more complex than this (has create view,
join and more tables).  This is a simplified description but highlights the
weird error.

Has anyone hit issues like this?  I'm assuming I have a small code bug in
my queries that's causing issues.  These queries appear to work in
production so I'm confused.  Are there ways of viewing failed jobs or
queries with MiniClusterResourceConfiguration?

Thanks!
- Dan





Re: ConnectionPool to DB and parallelism of operator question

2020-10-06 Thread Aljoscha Krettek

Hi,

since I don't know the implementation of the Sink I can only guess. I 
would say you get 82 * 300 connections because you will get 82 instances 
of a sink operator and each of those would then have a connection pool 
of 300 connections. The individual sink instances will (potentially) run 
on different machines and not share the connection pool.


Best,
Aljoscha

On 05.10.20 22:28, Vijay Balakrishnan wrote:

HI,
Basic question on parallelism of operators and ConnectionPool to DB:
Will this result in 82 * 300 connections to InfluxDB or just 300
connections to InfluxDB ?
main() {
   sink = createInfluxMonitoringSink(..);
   keyStream.addSink(sink).addParallelism(82);//will this result in 82 * 300
connections to InfluxDB or just 300 connections to InfluxDB ?
}


private . createInfluxMonitoringSink(...) {


   final OkHttpClient.Builder okHttpClientBuilder = new
OkHttpClient.Builder()
.readTimeout(timeout, TimeUnit.MILLISECONDS)
.connectTimeout(timeout, TimeUnit.MILLISECONDS)
.writeTimeout(timeout, TimeUnit.MILLISECONDS)
.connectionPool(new ConnectionPool(300, 60,
TimeUnit.SECONDS));

try (InfluxDB influxDB = InfluxDBFactory.connect
(host, userName, pwd, okHttpClientBuilder)) { ..}

}

TIA,





Re: Reading from HDFS and publishing to Kafka

2020-09-29 Thread Aljoscha Krettek

Hi,

I actually have no experience running a Flink job on K8s against a 
kerberized HDFS so please take what I'll say with a grain of salt.


The only thing you should need to do is to configure the path of your 
keytab and possibly some other Kerberos settings. For that check out [1] 
and [2].


I think in general this looks like the right approach and Romans 
comments are correct as well.


Aljoscha

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/security-kerberos.html#yarnmesos-mode
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#auth-with-external-systems


On 27.09.20 21:54, Khachatryan Roman wrote:

Hi,

1. Yes, StreamingExecutionEnvironment.readFile can be used for files on HDFS
2. I think this is a valid concern. Besides that, there are plans to
deprecate DataSet API [1]
4. Yes, the approach looks good

I'm pulling in Aljoscha for your 3rd question (and probably some
clarifications on others).

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741

Regards,
Roman


On Fri, Sep 25, 2020 at 12:50 PM Damien Hawes 
wrote:


Hi folks,

I've got the following use case, where I need to read data from HDFS and
publish the data to Kafka, such that it can be reprocessed by another job.

I've searched the web and read the docs. This has turned up no and
concrete examples or information of how this is achieved, or even if it's
possible at all.

Further context:

1. Flink will be deployed to Kubernetes.
2. Kerberos is active on Hadoop.
3. The data is stored on HDFS as Avro.
4. I cannot install Flink on our Hadoop environment.
5. No stateful computations will be performed.

I've noticed that the flink-avro package provides a class called
AvroInputFormat, with a nullable path field, and I think this is my goto.

Apologies for the poor formatting ahead, but the code I have in mind looks
something like this:



StreamingExecutionEnvironment env = ...;
AvroInputFormat inf = new AvroInputFormat(null, Source.class);
DataStreamSource stream = env.readFile(inf, "hdfs://path/to/data");
// rest, + publishing to Kafka using the FlinkKafkaProducer



My major questions and concerns are:

1. Is it possible to use read from HDFS using the
StreamingExecutionEnvironment object? I'm planning on using the Data Stream
API because of point (2) below.
2. Because Flink will be deployed on Kubernetes, I have a major concern
that if I were to use the Data Set API, once Flink completes and exits, the
pods will restart, causing unnecessary duplication of data. Is the pod
restart a valid concern?
3. Is there anything special I need to be worried about regarding Kerberos
in this instance? The key tab will be materialised on the pods upon start
up.
4. Is this even a valid approach? The dataset I need to read and replay is
small (12 TB).

Any help, even in part will be appreciated.

Kind regards,

Damien










Re: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

2020-09-21 Thread Aljoscha Krettek

Hi All,

Avro was finally bumped in 
https://issues.apache.org/jira/browse/FLINK-18192.


The implementers didn't see 
https://issues.apache.org/jira/browse/FLINK-12532, but it is also 
updated now.


Best,
Aljoscha

On 21.09.20 08:04, Arvid Heise wrote:

Hi Lian,

we had a similar discussion on [1].

TL;DR you are using Avro 1.9.x while Flink still bundles Avro 1.8 [2] until
Hive bumps it [3]. In the thread, I gave some options to avoid running into
the issue.
The easiest fix is to use Avro 1.8.2 all the way, but you may run into [4]
if your logical type is nullable (which is not necessary in most cases).

Still, I think it's time for us to revise the decision to wait for Hive to
bump and rather upgrade independently. Avro was for a long time stuck on
1.8 but the project gained traction again in the past two years. On the
other hand, Hive seems to be rather slow to respond to that and we
shouldn't have a slow moving component block us to support a fast moving
component if it's such apparent that users want it.
@Aljoscha Krettek  could you please pick that topic up
and ping the respective maintainers?

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-from-AVRO-files-td35850.html
[2] https://issues.apache.org/jira/browse/FLINK-12532
[3] https://issues.apache.org/jira/browse/HIVE-21737
[4] https://issues.apache.org/jira/browse/AVRO-1891

On Sun, Sep 20, 2020 at 9:56 PM Lian Jiang  wrote:


Thanks Dawid for proposing ConfluentRegistryDeserializationSchema. I am
trying ConfluentRegistryAvroDeserializationSchema (if this is what you
mean) but got "java.lang.Long cannot be cast to java.time.Instant". This
may be caused by https://issues.apache.org/jira/browse/FLINK-11030.
<https://issues.apache.org/jira/browse/FLINK-11030> Is there any progress
for this JIRA? Thanks. Regards!


Stacktrace:
java.lang.ClassCastException: java.lang.Long cannot be cast to
java.time.Instant
at com.mycompany.mypayload.MetadataRecord.put(MetadataRecord.java:136)
at org.apache.avro.generic.GenericData.setField(GenericData.java:795)
at
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
at
org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
at
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
at
org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
at
org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:74)
at
com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:89)
at
com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:16)
at
org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema.deserialize(KafkaDeserializationSchema.java:80)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)



Code:

import org.apache.avro.specific.SpecificRecord;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import 
org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import 
org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;
import java.io.Serializable;

public class SpecificRecordSerDe implements
 KafkaSe

Re: [DISCUSS] Deprecate and remove UnionList OperatorState

2020-09-18 Thread Aljoscha Krettek

On 14.09.20 02:20, Steven Wu wrote:

Right now, we use UnionState to store the `nextCheckpointId` in the Iceberg
sink use case, because we can't retrieve the checkpointId from
the FunctionInitializationContext during the restore case. But we can move
away from it if the restore context provides the checkpointId.


Is the code for this available in the open source? I checked the Iceberg 
sink that's available in Iceberg proper and the one in Netflix 
Skunkworks: 
https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java#L228


Both of them are only using operator state, not the union variant.

Best,
Aljoscha


Re: How to access state in TimestampAssigner in Flink 1.11?

2020-09-11 Thread Aljoscha Krettek

Hi Theo,

I think you're right that there is currently no good built-in solution 
for your use case. What you would ideally like to have is some operation 
that can buffer records and "hold back" the watermark according to the 
timestamps of the records that are in the buffer. This has the benefit 
that it doesn't require adding a watermark operation in the middle of 
the graph, which can lead to problems because you need to make sure to 
do good per-partition watermarking, which is ideally done in a source.


It's certainly possible to write an operator that can do such buffering 
and holding back the watermark, you would have to override 
processWatermark() keep track of the timestamps of records and the max 
watermark seen so far and emit a modified watermark downstream whenever 
there are updates and you evict records from the buffer.


I'd be open to suggestions if someone wants to add a proper solution/API 
for this to Flink.


Best,
Aljoscha

On 07.09.20 16:51, Theo Diefenthal wrote:

Hi Aljoscha,

We have a ProcessFunction which does some processing per kafka partition. It 
basically buffers the incoming data over 1 minute and throws out some events 
from the stream if within the minute another related event arrived.

In order to buffer the data and store the events over 1 minute, we perform a 
groupBy(kafkaPartition) and thus have KeyedState per kafka-partition in there. 
The process function however messes up the Watermarks (Due to the buffering). 
Concluding, we have to reassign the watermarks and want to do it the same way 
as the source kafka assigner would have done. We can only do this if the 
Timestamp assigner is aware of the Subtask index.

I solved this issue here by not using "assignTimestampAndWatermark" but instead extending 
TimestampsAndWatermarksOperator and putting a "transform" into the DataStream. I don't 
like this solution much but it solves the needs here.

Ideally, I would love to not need state here at all (and buffer in RAM only). I would really love 
to extend the "buffer logic" directly inside the KafkaConsumer so that there is nothing 
to store on a checkpoint (except for the kafka offsets which would be stored anyways, but just 
different ones). But that's another topic, I will open a thread on the dev mailing list with a 
feature request for it soon. Or do you have any other idea how to buffer the data per kafka 
partition? We have the nice semantic that one kafka partition receives data from one of our servers 
so that we have ascneding timestamps per partition and can guarantee that some related events 
always arrive in the same partition. I think that's a rather common usecase in Flink which can 
optimize the latency a lot, so I would love to have some more features directly from Flink to 
better support "processing per kafka partition" without the need to shuffle.

Best regards
Theo

- Ursprüngliche Mail -
Von: "Aljoscha Krettek" 
An: "user" 
Gesendet: Montag, 7. September 2020 11:07:35
Betreff: Re: How to access state in TimestampAssigner in Flink 1.11?

Hi,

sorry for the inconvenience! I'm sure we can find a solution together.

Why do you need to keep state in the Watermark Assigner? The Kafka
source will by itself maintain the watermark per partition, so just
specifying a WatermarkStrategy will already correctly compute the
watermark per partition and then combine them together.

Best,
Aljoscha

On 20.08.20 08:08, Till Rohrmann wrote:

Hi Theo,

thanks for reaching out to the community. I am pulling in Aljoscha and Klou
who have worked on the new WatermarkStrategy/WatermarkGenerator abstraction
and might be able to help you with your problem. At the moment, it looks to
me that there is no way to combine state with the new WatermarkGenerator
abstraction.

Cheers,
Till

On Wed, Aug 19, 2020 at 3:37 PM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:


Hi there,

Right now I'm in the process of upgrading our Flink 1.9 jobs to Flink 1.11.

In Flink 1.9, I was able to write a AssignerWithperiodicWatermarks which
also extended AbstractRichFunction and could thus utilize State and
getRuntimeContext() in there. This worked as the
TimestampsAndWatermarksOperator was a AbstractUdfStreamOperator and passed
my assigner in as the userFunction to that operator.

I used this feature for some "per partition processing" which Flinks
somehow isn't ideally suited for at the moment I guess. We have ascending
watermarks per kafka partition and do some processing on that. In order to
maintain state per kafka-partition, I now keyby kafkapartition in our
stream (not ideal but better than operatorstate in terms of rescaling) but
afterwards need to emulate the watermark strategy from the initial kafka
source, i.e. reassign watermarks the same way as the kafka source did (per
kafka partition within the operator). Via getRunti

Re: [DISCUSS] Drop Scala 2.11

2020-09-10 Thread Aljoscha Krettek
Yes! I would be in favour of this since it's blocking us from upgrading 
certain dependencies.


I would also be in favour of dropping Scala completely but that's a 
different story.


Aljoscha

On 10.09.20 16:51, Seth Wiesman wrote:

Hi Everyone,

Think of this as a pre-flip, but what does everyone think about dropping
Scala 2.11 support from Flink.

The last patch release was in 2017 and in that time the scala community has
released 2.13 and is working towards a 3.0 release. Apache Kafka and Spark
have both dropped 2.11 support in recent versions. In fact, Flink's
universal Kafka connector is stuck on 2.4 because that is the last version
with scala 2.11 support.

What are people's thoughts on dropping Scala 2.11? How many are still using
it in production?

Seth





Re: Watermark generation issues with File sources in Flink 1.11.1

2020-09-10 Thread Aljoscha Krettek

Thanks David! This saved me quite some time.

Aljoscha

On 09.09.20 19:58, David Anderson wrote:

Arti,

The problem with watermarks and the File source operator will be fixed in
1.11.2 [1]. This bug was introduced in 1.10.0, and isn't related to the new
WatermarkStrategy api.

[1] https://issues.apache.org/jira/browse/FLINK-19109

David

On Wed, Sep 9, 2020 at 2:52 PM Arti Pande  wrote:


Hi Aljoscha,

By "checkpoints do not work" what I mean is ever since Flink 1.9.2 till
1.11.1 when using File source the source operator (guessing split
enumerator or metadata reader) finishes immediately after starting (and
assigning the splits to split readers) hence when first checkpoint is
triggered, it sees the state of the first operator i.e. source as finished
and hence does not do any checkpointing. Thats' what you can see in logs
and also on the Flink UI for checkpoints. It assumes that the pipeline is
about to finish shortly and aborts the checkpoint.

This along with the watermark generation problems kind of make it
difficult to use file source in production.


On Mon, Aug 24, 2020 at 4:01 PM Aljoscha Krettek 
wrote:


Hi Arti,

what exactly do you mean by "checkpoints do not work"? Are there
exceptions being thrown? How are you writing your file-based sources,
what API methods are you using?

Best,
Aljoscha

On 20.08.20 16:21, Arti Pande wrote:

Hi Till,

Thank you for your quick response. Both the

AssignerWithPeriodicWatermarks

and WatermarkStrategy I am using are very simple ones.

*Code for AssignerWithPeriodicWatermarks:*

public class CustomEventTimeWatermarkGenerator implements
AssignerWithPeriodicWatermarks {

  private final long maxOutOfOrderness = 0;
  private long currentMaxTimestamp;

  @Override
  public long extractTimestamp(MyPojo myPojo, long

previousTimestamp) {

  long timestamp = myPojo.getInitiationTime().toEpochMilli();
  currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
  return timestamp;
  }

  @Override
  public Watermark getCurrentWatermark() {
  return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
  }
}


*Code for WatermarkStrategy :*

WatermarkStrategy watermarkStrategy =


WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(0))

  .withTimestampAssigner((event, timestamp) ->
event.getInitiationTime().toEpochMilli());


Thanks & regards,
Arti


On Thu, Aug 20, 2020 at 11:43 AM Till Rohrmann 

wrote:



Hi Arti,

thanks for sharing this feedback with us. The WatermarkStrategy has

been

introduced quite recently and might have some rough edges. I am

pulling in

Aljoscha and Klou who have worked on this feature and might be able to

help

you. For better understanding your problem, it would be great if you

could

share the AssignerWithPeriodicWatermarks/WatermarkStrategy code with

us.


For the file source, the Flink community has recently introduced a new
source abstraction which will also support checkpoints for file sources
once the file source connector has been migrated to the new

interfaces. The

community is currently working on it.

Cheers,
Till

On Wed, Aug 19, 2020 at 5:38 PM Arti Pande 

wrote:



Hi,

When migrating Stream API based Flink application from 1.9.2 to 1.11.1
the watermark generation has issues with file source alone. It works

well

with Kafka source.

With 1.9.2 a custom watermark generator implementation of
AssignerWithPeriodicWatermarks was to be used. Starting 1.11.1 it is
deprecated and to be replaced with WatermarkStrategy (that combines

both

WatermarkGenerator and TimestampAssigner).

With Flink 1.11.1 when using Kafka source both the above options (i.e.
old  AssignerWithPeriodicWatermarks  and new WatermarkStrategy) work
perfectly well but with file source none of them works. The watermark
assigner never increments the watermarks resulting in stateful

operators

not clearing their state ever, leading to erroneous results and
continuously increasing memory usage.

Same code works well with Kafka source. Is this a known issue? If so,

any

fix planned shortly?

A side note (and probably a candidate for separate email, but I will
write it here) even checkpoints do not work with File Source since

1.9.2

and it is still the problem with 1.11.1. Just wondering if File

source with

stream API is not a priority in Flink development? If so we can

rethink our

sources.

Thanks & regards,
Arti














[DISCUSS] Deprecate and remove UnionList OperatorState

2020-09-09 Thread Aljoscha Krettek

Hi Devs,

@Users: I'm cc'ing the user ML to see if there are any users that are 
relying on this feature. Please comment here if that is the case.


I'd like to discuss the deprecation and eventual removal of UnionList 
Operator State, aka Operator State with Union Redistribution. If you 
don't know what I'm talking about you can take a look in the 
documentation: [1]. It's not documented thoroughly because it started 
out as mostly an internal feature.


The immediate main reason for removing this is also mentioned in the 
documentation: "Do not use this feature if your list may have high 
cardinality. Checkpoint metadata will store an offset to each list 
entry, which could lead to RPC framesize or out-of-memory errors." The 
insidious part of this limitation is that you will only notice that 
there is a problem when it is too late. Checkpointing will still work 
and a program can continue when the state size is too big. The system 
will only fail when trying to restore from a snapshot that has union 
state that is too big. This could be fixed by working around that issue 
but I think there are more long-term issues with this type of state.


I think we need to deprecate and remove API for state that is not tied 
to a key. Keyed state is easy to reason about, the system can 
re-partition state and also re-partition records and therefore scale the 
system in and out. Operator state, on the other hand is not tied to a 
key but an operator. This is a more "physical" concept, if you will, 
that potentially ties business logic closer to the underlying runtime 
execution model, which in turns means less degrees of freedom for the 
framework, that is Flink. This is future work, though, but we should 
start with deprecating union list state because it is the potentially 
most dangerous type of state.


We currently use this state type internally in at least the 
StreamingFileSink, FlinkKafkaConsumer, and FlinkKafkaProducer. However, 
we're in the process of hopefully getting rid of it there with our work 
on sources and sinks. Before we fully remove it, we should of course 
signal this to users by deprecating it.


What do you think?

Best,
Aljoscha


Re: Should the StreamingFileSink mark the files "finished" when all bounded input sources are depleted?

2020-09-08 Thread Aljoscha Krettek

Hi,

this is indeed the correct behaviour right now. Which doesn't mean that 
it's the behaviour that we would like to have.


The reason why we can't move the "pending" files to "final" is that we 
don't have a point where we can do this in an idempotent and retryable 
fashion. When we do regular STREAMING execution we do checkpoints and 
when the checkpoints complete we can move pending files. If this fails, 
it will be retried because we still have all the information we need for 
that in the checkpoint. (It's basically a two-phase commit protocol). 
When a bounded STREAMING programs simply finishes, we don't have a point 
where we can do that. A colleague of mine (Yun in cc) is actually 
working on a proposal to do one "final checkpoint" for exactly this.


We're also working on better support for bounded programs on the 
DataStream API, I'll try and summarise this below.


A couple of colleagues and I are currently thinking about how we can 
bring support for good BATCH execution to the DataStream API. The 
starting point is https://s.apache.org/FLIP-131 which discusses eventual 
deprecation of the DataSet API, followed by 
https://s.apache.org/FLIP-134 which outlines the semantics of BATCH 
execution on the DataStraem API, and https://s.apache.org/FLIP-140 which 
discusses improved runtime behaviour for BATCH programs on the 
DataStream API.


The last piece of the puzzle will be sinks, which also need to work well 
for both BATCH and STREAMING programs on the DataStream API. We're 
expecting to publish a FLIP for this shortly.


Best,
Aljoscha

On 07.09.20 19:29, Ken Krugler wrote:

Hi Fred,

I think this is the current behavior (though it would be helpful to know which 
version of Flink you’re using).

 From an email conversation with Kostas in January of this year:


Hi Ken, Jingsong and Li,

Sorry for the late reply.

As Jingsong pointed out, upon calling close() the StreamingFileSink
does not commit the in-progress/pending files.
The reason for this is that the close() method of any UDF including
sink functions is called on both normal termination and termination
due to failure.
Given this, we cannot commit the files, because in case of failure
they should be reverted.

Actually we are currently updating the StreamingFileSink docs to
includes this among other things.
Also the differentiation between normal termination and termination
due to failure will hopefully be part of Flink 1.11 and
this is the FLIP to check
https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs
 
.

Cheers,
Kostas


Though it looks like FLIP-46 is still under discussion, and thus 1.11 doesn’t 
have a fix for this?

— Ken


On Sep 7, 2020, at 8:39 AM, Teunissen, F.G.J. (Fred) mailto:fred.teunis...@ing.com>> wrote:

Hi All,
  
My flink-job is using bounded input sources and writes the results to a StreamingFileSink.

When it has processed all the input the job is finished and closes. But the 
output files are still
named “-0-0..inprogress.”. I expected them to be named 
““-0-0.”.

Did I forget some setting or something else?
  
Regards,

Fred
  
-

ATTENTION:
The information in this e-mail is confidential and only meant for the intended 
recipient. If you are not the intended recipient, don't use or disclose it in 
any way. Please let the sender know and delete the message immediately.
-


--
Ken Krugler
http://www.scaleunlimited.com 
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr






Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-09-08 Thread Aljoscha Krettek

I agree with almost all of your points!

The only one where I could see that users want different behaviour BATCH 
jobs on the DataStream API. I agree that processing-time does not make 
much sense in batch jobs. However, if users have written some business 
logic using processing-time timers their jobs will silently not work if 
we set the default to IGNORE. Setting it to FAIL would at least make 
users aware that something is not right.


I can also see a small group of users wanting processing-time timers for 
BATCH. We could, for example, fire all processing-time timers at the 
"end of input", then we also set the watermark to +Inf.


Another thing is: what should we do with new triggers that are set after 
the end-of-input. If we have TRIGGER and users keep setting new 
processing-time timers in the callback, would we continue firing them. 
Or should the behaviour bee QUIESCE_AND_TRIGGER, where we work off 
remaining timers but don't add new ones? Do we silently ignore adding 
new ones?


By the way, I assume WAIT means we wait for processing-time to actually 
reach the time of pending timers? Or did you have something else in mind 
with this?


Aljoscha

On 08.09.20 09:19, Dawid Wysakowicz wrote:

Hey Aljoscha

A couple of thoughts for the two remaining TODOs in the doc:

# Processing Time Support in BATCH/BOUNDED execution mode

I think there are two somewhat orthogonal problems around this topic:
     1. Firing processing timers at the end of the job
     2. Having processing timers in the BATCH mode
The way I see it there are three main use cases for different
combinations of the aforementioned dimensions:
     1. Regular streaming jobs: STREAM mode with UNBOUNDED sources
        - we do want to have processing timers
        - there is no end of the job
     2. Debugging/Testing streaming jobs: STREAM mode with BOUNDED sources
        - we do want to have processing timers
        - we want to fire/wait for the timers at the end
     3. batch jobs with DataStream API:
        - we do **NOT** want to have processing timers either during
processing or at the end. We want to either fail-hard or ignore the
timers. Generally speaking, in BATCH mode the processing timers do not
make sense, therefore it would be better to fail-hard. It would be the
safest option, as some of the user logic might depend on the processing
timers. Failing hard would give the user opportunity to react to the
changed behaviour. On the other hand if we want to make it possible to
run exact same program both in STREAM and BATCH mode we must have an
option to simply ignore processing     timers.
        - we never want to actually trigger the timers. Neither during
runtime nor at the end

Having the above in mind, I am thinking if we should introduce two
separate options:
  * processing-time.timers = ENABLE/FAIL/IGNORE
  * processing-time.on-end-of-input = CANCEL/WAIT/TRIGGER
With the two options we can satisfy all the above cases. The default
settings would be:
STREAM:
       processing-time.timers = ENABLE
       processing-time.on-end-of-input = TRIGGER
BATCH:
  processing-time.timers = IGNORE
  processing-time.on-end-of-input = CANCEL

# Event time triggers
I do say that from the implementation perspective, but I find it hard to
actually ignore the event-time triggers. We would have to adjust the
implementation of WindowOperator to do that. At the same time I see no
problem with simply keeping them working. I am wondering if we
should/could just leave them as they are.

# Broadcast State
As far as I am concerned there are no core semantical problems with the
Broadcast State. As of now, it does not give any guarantees about the
order in which the broadcast and non-broadcast sides are executed even
in streaming. It also does not expose any mechanisms to implement an
event/processing-time alignments (you cannot register timers in the
broadcast side). I can't see any of the guarantees breaking in the BATCH
mode.
I do agree it would give somewhat nicer properties in BATCH if we
consumed the broadcast side first. It would make the operation
deterministic and let users implement a broadcast join properly on top
of this method. Nevertheless I see it as an extension of the DataStream
API for BATCH execution rather than making the DataStream API work for
BATCH.  Therefore I'd be fine with the leaving the Broadcast State out
of the FLIP

What do you think?

On 01/09/2020 13:46, Aljoscha Krettek wrote:

Hmm, it seems I left out the Dev ML in my mail. Looping that back in..


On 28.08.20 13:54, Dawid Wysakowicz wrote:

@Aljoscha Let me bring back to the ML some of the points we discussed
offline.

Ad. 1 Yes I agree it's not just about scheduling. It includes more
changes to the runtime. We might need to make it more prominent in the
write up.

Ad. 2 You have a good point here that switching the default value for
TimeCharacteristic to INGESTION time might not be the best option as it
mi

Re: How to access state in TimestampAssigner in Flink 1.11?

2020-09-07 Thread Aljoscha Krettek

Hi,

sorry for the inconvenience! I'm sure we can find a solution together.

Why do you need to keep state in the Watermark Assigner? The Kafka 
source will by itself maintain the watermark per partition, so just 
specifying a WatermarkStrategy will already correctly compute the 
watermark per partition and then combine them together.


Best,
Aljoscha

On 20.08.20 08:08, Till Rohrmann wrote:

Hi Theo,

thanks for reaching out to the community. I am pulling in Aljoscha and Klou
who have worked on the new WatermarkStrategy/WatermarkGenerator abstraction
and might be able to help you with your problem. At the moment, it looks to
me that there is no way to combine state with the new WatermarkGenerator
abstraction.

Cheers,
Till

On Wed, Aug 19, 2020 at 3:37 PM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:


Hi there,

Right now I'm in the process of upgrading our Flink 1.9 jobs to Flink 1.11.

In Flink 1.9, I was able to write a AssignerWithperiodicWatermarks which
also extended AbstractRichFunction and could thus utilize State and
getRuntimeContext() in there. This worked as the
TimestampsAndWatermarksOperator was a AbstractUdfStreamOperator and passed
my assigner in as the userFunction to that operator.

I used this feature for some "per partition processing" which Flinks
somehow isn't ideally suited for at the moment I guess. We have ascending
watermarks per kafka partition and do some processing on that. In order to
maintain state per kafka-partition, I now keyby kafkapartition in our
stream (not ideal but better than operatorstate in terms of rescaling) but
afterwards need to emulate the watermark strategy from the initial kafka
source, i.e. reassign watermarks the same way as the kafka source did (per
kafka partition within the operator). Via getRuntimeContext() I am/was able
to identify the kafkaPartitions one operatorinstance was responsible for
and could produce the outputwatermark accordingly. (min over all
responsible partitions).

In Flink 1.11, how can I rebuild this behavior? Do I really need to build
my own TimestampsAndWatermarksOperator which works like the old one? Or is
there a better approach?

Best regards
Theo







Re: Idle stream does not advance watermark in connected stream

2020-09-01 Thread Aljoscha Krettek

I can only agree with Dawid, who explained it better than me... 😅

Aljoscha

On 31.08.20 12:10, Dawid Wysakowicz wrote:

Hey Arvid,

The problem is that the StreamStatus.IDLE is set on the Task level. It
is not propagated to the operator. Combining of the Watermark for a
TwoInputStreamOperator happens in the AbstractStreamOperator:

     public void processWatermark(Watermark mark) throws Exception {
         if (timeServiceManager != null) {
             timeServiceManager.advanceWatermark(mark);
         }
         output.emitWatermark(mark);
     }

     public void processWatermark1(Watermark mark) throws Exception {
         input1Watermark = mark.getTimestamp();
         long newMin = Math.min(input1Watermark, input2Watermark);
         if (newMin > combinedWatermark) {
             combinedWatermark = newMin;
             processWatermark(new Watermark(combinedWatermark));
         }
     }

     public void processWatermark2(Watermark mark) throws Exception {
         input2Watermark = mark.getTimestamp();
         long newMin = Math.min(input1Watermark, input2Watermark);
         if (newMin > combinedWatermark) {
             combinedWatermark = newMin;
             processWatermark(new Watermark(combinedWatermark));
         }
     }

There we do not know that e.g. the whole input 1 is idle. Therefore if
we do not receive any Watermarks from it (it became IDLE) we do not
progress the Watermark starting from any two input operator. We are
missing similar handling of the IDLE status from the task level which
works well for one input operators and multiple parallel upstream instances.

Best,

Dawid

On 31/08/2020 11:05, Arvid Heise wrote:

Hi Aljoscha,

I don't quite follow your analysis. If both sources are configured
with idleness, they should send a periodic watermark on timeout.
So the code that you posted would receive watermarks on the idle
source and thus advance watermarks periodically.

If an idle source does not emit a watermark at all, then I'm curious
why it's not mapped to StreamStatus.IDLE [1], which would trigger the
desired behavior.

[1]
https://github.com/apache/flink/blob/72cd5921684e6daac4a7dd791669898b56d5/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java#L79

On Wed, Aug 26, 2020 at 5:46 PM Aljoscha Krettek mailto:aljos...@apache.org>> wrote:

 Yes, I'm afraid this analysis is correct. The StreamOperator,
 AbstractStreamOperator to be specific, computes the combined
 watermarks
 from both inputs here:
 
https://github.com/apache/flink/blob/f0ed29c06d331892a06ee9bddea4173d6300835d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L573.

 The operator layer is not aware of idleness so it will never
 notice. The
 idleness only works on the level of inputs but is never forwarded
 to an
 operator itself.

 To fix this we would have to also make operators aware of idleness
 such
 that they can take this into account when computing the combined
 output
 watermark.

 Best,
 Aljoscha

 On 26.08.20 10:02, Dawid Wysakowicz wrote:
 > Hi Kien,
 >
 > I am afraid this is a valid bug. I am not 100% sure but the way I
 > understand the code the idleness mechanism applies to input
 channels,
 > which means e.g. when multiple parallell instances shuffle its
 results
 > to downstream operators.
 >
 > In case of a two input operator, combining the watermark of two
 > different upstream operators happens inside of the operator itself.
 > There we do not have the idleness status. We do not have a
 status that a
 > whole upstream operator became idle. That's definitely a
 bug/limitation.
 >
 > I'm also cc'ing Aljoscha who could maybe confirm my analysis.
 >
 > Best,
 >
 > Dawid
 >
 > On 24/08/2020 16:00, Truong Duc Kien wrote:
 >> Hi all,
 >> We are testing the new Idleness detection feature in Flink 1.11,
 >> however, it does not work as we expected:
 >> When we connect two data streams, of which one is idle, the output
 >> watermark CoProcessOperator does not increase, hence the program
 >> cannot progress.
 >>
 >> I've made a small project to illustrate the problem. The watermark
 >> received by the sink does not increase at all until the idle
 source is
 >> stopped.
 >>
 >> https://github.com/kien-truong/flink-idleness-testing
 >>
 >> Is this a bug or does the idleness detection not support this
 use case ?
 >>
 >> Regards.
 >> Kien
 >



--

Arvid Heise| Senior Java Developer

<https://www.ververica.com/>


Follow us @V

Re: [DISCUSS] Remove Kafka 0.10.x connector (and possibly 0.11.x)

2020-08-27 Thread Aljoscha Krettek
@Konstantin: Yes, I'm talking about dropping those modules. We don't 
have any special code for supporting Kafka 0.10/0.11 in the "modern" 
connector, that comes from the Kafka Consumer/Producer code we're using.


@Paul: The modern Kafka connector works with Kafka brokers as far back 
as 0.10, would that be enough or do you still think we should have the 
actual Kafka 0.10 Consumer code in Flink as well.


Best,
Aljoscha

On 25.08.20 23:15, Chesnay Schepler wrote:

+1 to remove both the 1.10 and 1.11 connectors.

The connectors have not been actively developed for some time. They are 
basically just sitting around causing noise by causing test 
instabilities and eating CI time.
It would  also allow us to really simplify the module structure of the 
Kafka connectors.


Users may continue to use the 1.11 version of the connectors with future 
Flink versions, and we may even provide critical bug fixes in a 1.11 
bugfix release (albeit unlikely).


While ultimately this is a separate topic I would also be in favor of 
removing any migration paths we have from 0.11 to the universal connector;
as these are already present in 1.11 users may migrate to the universal 
connector before jumping to Flink 1.12+.


On 25/08/2020 18:49, Konstantin Knauf wrote:

Hi Aljoscha,

I am assuming you're asking about dropping the 
flink-connector-kafka-0.10/0.11 modules, right? Or are you talking 
about removing support for Kafka 0.10/0.11 from the universal connector?


I am in favor of removing flink-connector-kafka-0.10/0.11 in the next 
release. These modules would still be available in Flink 1.11- as a 
reference, and could be used with Flink 1.12+ with small or no 
modifications. To my knowledge, you also use the universal Kafka 
connector with 0.10 brokers, but there might be a performance 
penalty if I remember correctly. In general, I find it important 
to continuously reduce baggage that accumulates over time and this 
seems like a good opportunity.


Best,

Konstantin

On Tue, Aug 25, 2020 at 4:59 AM Paul Lam <mailto:paullin3...@gmail.com>> wrote:


    Hi Aljoscha,

    I'm lightly leaning towards keeping the 0.10 connector, for Kafka
    0.10 still has a steady user base in my observation.

    But if we drop 0.10 connector, can we ensure the users would be
    able to smoothly migrate to 0.11 connector/universal connector?

    If I remember correctly, the universal connector is compatible
    with 0.10 brokers, but I want to double check that.

    Best,
    Paul Lam


    2020年8月24日 22:46,Aljoscha Krettek mailto:aljos...@apache.org>> 写道:

    Hi all,

    this thought came up on FLINK-17260 [1] but I think it would be a
    good idea in general. The issue reminded us that Kafka didn't
    have an idempotent/fault-tolerant Producer before Kafka 0.11.0.
    By now we have had the "modern" Kafka connector that roughly
    follows new Kafka releases for a while and this one supports
    Kafka cluster versions as far back as 0.10.2.0 (I believe).

    What are your thoughts on removing support for older Kafka
    versions? And yes, I know that we had multiple discussions like
    this in the past but I'm trying to gauge the current sentiment.

    I'm cross-posting to the user-ml since this is important for both
    users and developers.

    Best,
    Aljoscha

    [1] https://issues.apache.org/jira/browse/FLINK-17260




--

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk








Re: Idle stream does not advance watermark in connected stream

2020-08-26 Thread Aljoscha Krettek
Yes, I'm afraid this analysis is correct. The StreamOperator, 
AbstractStreamOperator to be specific, computes the combined watermarks 
from both inputs here: 
https://github.com/apache/flink/blob/f0ed29c06d331892a06ee9bddea4173d6300835d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L573. 
The operator layer is not aware of idleness so it will never notice. The 
idleness only works on the level of inputs but is never forwarded to an 
operator itself.


To fix this we would have to also make operators aware of idleness such 
that they can take this into account when computing the combined output 
watermark.


Best,
Aljoscha

On 26.08.20 10:02, Dawid Wysakowicz wrote:

Hi Kien,

I am afraid this is a valid bug. I am not 100% sure but the way I
understand the code the idleness mechanism applies to input channels,
which means e.g. when multiple parallell instances shuffle its results
to downstream operators.

In case of a two input operator, combining the watermark of two
different upstream operators happens inside of the operator itself.
There we do not have the idleness status. We do not have a status that a
whole upstream operator became idle. That's definitely a bug/limitation.

I'm also cc'ing Aljoscha who could maybe confirm my analysis.

Best,

Dawid

On 24/08/2020 16:00, Truong Duc Kien wrote:

Hi all,
We are testing the new Idleness detection feature in Flink 1.11,
however, it does not work as we expected:
When we connect two data streams, of which one is idle, the output
watermark CoProcessOperator does not increase, hence the program
cannot progress.

I've made a small project to illustrate the problem. The watermark
received by the sink does not increase at all until the idle source is
stopped.

https://github.com/kien-truong/flink-idleness-testing

Is this a bug or does the idleness detection not support this use case ?

Regards.
Kien






Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-25 Thread Aljoscha Krettek
Thanks for creating this FLIP! I think the general direction is very 
good but I think there are some specifics that we should also put in 
there and that we may need to discuss here as well.


## About batch vs streaming scheduling

I think we shouldn't call it "scheduling", because the decision between 
bounded and unbounded affects more than just scheduling. It affects how 
we do network transfers and the semantics of time, among other things. 
So maybe we should differentiate between batch-style and streaming-style 
execution, though I'm not sure I like those terms either.


## About processing-time support in batch

It's not just about "batch" changing the default to ingestion time is a 
change for stream processing as well. Actually, I don't know if 
ingestion time even makes sense for batch processing. IIRC, with the new 
sources we actually always have a timestamp, so this discussion might be 
moot. Maybe Becket and/or Stephan (cc'ed) could chime in on this.


Also, I think it's right that we currently ignore processing-time timers 
at the end of input in streaming jobs, but this has been a source of 
trouble for users. See [1] and several discussions on the ML. I'm also 
cc'ing Flavio here who also ran into this problem. I think we should 
solve this quickly after laying the foundations of bounded processing on 
the DataStream API.


## About broadcast state support

I think as a low-hanging fruit we could just read the broadcast side 
first and then switch to the regular input. We do need to be careful 
with creating distributed deadlocks, though, so this might be trickier 
than it seems at first.


## Loose ends and weird semantics

There are some operations in the DataStream API that have semantics that 
might make sense for stream processing but should behave differently for 
batch. For example, KeyedStream.reduce() is essentially a reduce on a 
GlobalWindow with a Trigger that fires on every element. In DB terms it 
produces an UPSERT stream as an output, if you get ten input elements 
for a key you also get ten output records. For batch processing it might 
make more sense to instead only produce one output record per key with 
the result of the aggregation. This would be correct for downstream 
consumers that expect an UPSERT stream but it would change the actual 
physical output stream that they see.


There might be other such operations in the DataStream API that have 
slightly weird behaviour that doesn't make much sense when you do 
bounded processing.


Best,
Aljoscha

[1] https://issues.apache.org/jira/browse/FLINK-18647

On 24.08.20 11:29, Kostas Kloudas wrote:

Thanks a lot for the discussion!

I will open a voting thread shortly!

Kostas

On Mon, Aug 24, 2020 at 9:46 AM Kostas Kloudas  wrote:


Hi Guowei,

Thanks for the insightful comment!

I agree that this can be a limitation of the current runtime, but I
think that this FLIP can go on as it discusses mainly the semantics
that the DataStream API will expose when applied on bounded data.
There will definitely be other FLIPs that will actually handle the
runtime-related topics.

But it is good to document them nevertheless so that we start soon
ironing out the remaining rough edges.

Cheers,
Kostas

On Mon, Aug 24, 2020 at 9:16 AM Guowei Ma  wrote:


Hi, Klou

Thanks for your proposal. It's a very good idea.
Just a little comment about the "Batch vs Streaming Scheduling".  In the 
AUTOMATIC execution mode maybe we could not pick BATCH execution mode even if all sources 
are bounded. For example some applications would use the `CheckpointListener`, which is 
not available in the BATCH mode in current implementation.
So maybe we need more checks in the AUTOMATIC execution mode.

Best,
Guowei


On Thu, Aug 20, 2020 at 10:27 PM Kostas Kloudas  wrote:


Hi all,

Thanks for the comments!

@Dawid: "execution.mode" can be a nice alternative and from a quick
look it is not used currently by any configuration option. I will
update the FLIP accordingly.

@David: Given that having the option to allow timers to fire at the
end of the job is already in the FLIP, I will leave it as is and I
will update the default policy to be "ignore processing time timers
set by the user". This will allow existing dataStream programs to run
on bounded inputs. This update will affect point 2 in the "Processing
Time Support in Batch" section.

If these changes cover your proposals, then I would like to start a
voting thread tomorrow evening if this is ok with you.

Please let me know until then.

Kostas

On Tue, Aug 18, 2020 at 3:54 PM David Anderson  wrote:


Being able to optionally fire registered processing time timers at the end of a 
job would be interesting, and would help in (at least some of) the cases I have 
in mind. I don't have a better idea.

David

On Mon, Aug 17, 2020 at 8:24 PM Kostas Kloudas  wrote:


Hi Kurt and David,

Thanks a lot for the insightful feedback!

@Kurt: For the topic of checkpointing with Batch Scheduling, I totally
agree with

[DISCUSS] Remove Kafka 0.10.x connector (and possibly 0.11.x)

2020-08-24 Thread Aljoscha Krettek

Hi all,

this thought came up on FLINK-17260 [1] but I think it would be a good 
idea in general. The issue reminded us that Kafka didn't have an 
idempotent/fault-tolerant Producer before Kafka 0.11.0. By now we have 
had the "modern" Kafka connector that roughly follows new Kafka releases 
for a while and this one supports Kafka cluster versions as far back as 
0.10.2.0 (I believe).


What are your thoughts on removing support for older Kafka versions? And 
yes, I know that we had multiple discussions like this in the past but 
I'm trying to gauge the current sentiment.


I'm cross-posting to the user-ml since this is important for both users 
and developers.


Best,
Aljoscha

[1] https://issues.apache.org/jira/browse/FLINK-17260


Re: Watermark generation issues with File sources in Flink 1.11.1

2020-08-24 Thread Aljoscha Krettek

Hi Arti,

what exactly do you mean by "checkpoints do not work"? Are there 
exceptions being thrown? How are you writing your file-based sources, 
what API methods are you using?


Best,
Aljoscha

On 20.08.20 16:21, Arti Pande wrote:

Hi Till,

Thank you for your quick response. Both the AssignerWithPeriodicWatermarks
and WatermarkStrategy I am using are very simple ones.

*Code for AssignerWithPeriodicWatermarks:*

public class CustomEventTimeWatermarkGenerator implements
AssignerWithPeriodicWatermarks {

 private final long maxOutOfOrderness = 0;
 private long currentMaxTimestamp;

 @Override
 public long extractTimestamp(MyPojo myPojo, long previousTimestamp) {
 long timestamp = myPojo.getInitiationTime().toEpochMilli();
 currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
 return timestamp;
 }

 @Override
 public Watermark getCurrentWatermark() {
 return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
 }
}


*Code for WatermarkStrategy :*

WatermarkStrategy watermarkStrategy =
 
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(0))
 .withTimestampAssigner((event, timestamp) ->
event.getInitiationTime().toEpochMilli());


Thanks & regards,
Arti


On Thu, Aug 20, 2020 at 11:43 AM Till Rohrmann  wrote:


Hi Arti,

thanks for sharing this feedback with us. The WatermarkStrategy has been
introduced quite recently and might have some rough edges. I am pulling in
Aljoscha and Klou who have worked on this feature and might be able to help
you. For better understanding your problem, it would be great if you could
share the AssignerWithPeriodicWatermarks/WatermarkStrategy code with us.

For the file source, the Flink community has recently introduced a new
source abstraction which will also support checkpoints for file sources
once the file source connector has been migrated to the new interfaces. The
community is currently working on it.

Cheers,
Till

On Wed, Aug 19, 2020 at 5:38 PM Arti Pande  wrote:


Hi,

When migrating Stream API based Flink application from 1.9.2 to 1.11.1
the watermark generation has issues with file source alone. It works well
with Kafka source.

With 1.9.2 a custom watermark generator implementation of
AssignerWithPeriodicWatermarks was to be used. Starting 1.11.1 it is
deprecated and to be replaced with WatermarkStrategy (that combines both
WatermarkGenerator and TimestampAssigner).

With Flink 1.11.1 when using Kafka source both the above options (i.e.
old  AssignerWithPeriodicWatermarks  and new WatermarkStrategy) work
perfectly well but with file source none of them works. The watermark
assigner never increments the watermarks resulting in stateful operators
not clearing their state ever, leading to erroneous results and
continuously increasing memory usage.

Same code works well with Kafka source. Is this a known issue? If so, any
fix planned shortly?

A side note (and probably a candidate for separate email, but I will
write it here) even checkpoints do not work with File Source since 1.9.2
and it is still the problem with 1.11.1. Just wondering if File source with
stream API is not a priority in Flink development? If so we can rethink our
sources.

Thanks & regards,
Arti









Re: Customization of execution environment

2020-07-31 Thread Aljoscha Krettek
I agree! My long-term goal is that a Configuration is the basis of truth 
and that the programmatic setter methods and everything else just modify 
the underlying configuration.


We have made big steps in at least allowing to configure most (if not 
all) StreamExecutionEnvironment and TableEnvironment settings via a 
Configuration but we're not completely there yet.


To me it's not yet clear whether modifications on the Configuration of 
the TableEnvironment should go back to the Configuration of the 
StreamExecutionEnvironment. It might be that some users find it 
surprising that changes propagate.


Best,
Aljoscha



On 30.07.20 15:41, Flavio Pompermaier wrote:

That's fine and it's basically what I do as well..I was arguing that it's
bad (IMHO) that you could access the config from the BatchTableEnvironment
(via bte.getConfig().getConfiguration()).
You legitimately think that you are customizing the env but that's
illusory. You should not be able to set properties if they are read only.

Cheers,
Flavio


On Thu, Jul 30, 2020 at 12:15 PM Arvid Heise  wrote:


I'm not entirely sure, if I completely understand the interaction of BTE
and ExecEnv, but I'd create it this way

Configuration conf = new Configuration();
conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, PARALLELISM);

ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
BatchTableEnvironment bte = BatchTableEnvironment.create(env);


On Wed, Jul 29, 2020 at 8:14 AM Robert Metzger 
wrote:


Hi Flavio,

I think the recommended approach is as follows: (then you don't need to
create to environments)

final Configuration conf = new Configuration();
conf.setLong(...)
env = new LocalEnvironment(conf);

I agree that in theory it would be nicer if the configuration returned
was editable, but the handling of configs in Flink is pretty involved
already.


On Tue, Jul 28, 2020 at 10:13 AM Flavio Pompermaier 
wrote:


Hi to all,
migrating to Flink 1.11 I've tried to customize the exec env in this way:

ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment bte = BatchTableEnvironment.create(env);
final Configuration conf = bte.getConfig().getConfiguration();
conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);
conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");
conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");
conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");
conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));

This seems to not have any influence on the setting in my local env and
I need to create env as a new LocalEnvironment if I want to customize it
during tests:

final Configuration conf = env.getConfiguration();
conf.setLong(...)
env = new LocalEnvironment(conf);

Is this the desired behaviour or is it a bug?
Wouldn't it be possible to allow customization of env config it's
actually editable?

Best,
Flavio





--

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng







Re: [DISCUSS] FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API)

2020-07-30 Thread Aljoscha Krettek
That is good input! I was not aware that anyone was actually using 
`runCustomOperation()`. Out of curiosity, what are you using that for?


We have definitely thought about the first two points you mentioned, 
though. Especially processing-time will make it tricky to define unified 
execution semantics.


Best,
Aljoscha

On 30.07.20 17:10, Flavio Pompermaier wrote:

I just wanted to be propositive about missing api.. :D

On Thu, Jul 30, 2020 at 4:29 PM Seth Wiesman  wrote:


+1 Its time to drop DataSet

Flavio, those issues are expected. This FLIP isn't just to drop DataSet
but to also add the necessary enhancements to DataStream such that it works
well on bounded input.

On Thu, Jul 30, 2020 at 8:49 AM Flavio Pompermaier 
wrote:


Just to contribute to the discussion, when we tried to do the migration we
faced some problems that could make migration quite difficult.
1 - It's difficult to test because of
https://issues.apache.org/jira/browse/FLINK-18647
2 - missing mapPartition
3 - missing   DataSet runOperation(CustomUnaryOperation
operation)

On Thu, Jul 30, 2020 at 12:40 PM Arvid Heise  wrote:


+1 of getting rid of the DataSet API. Is DataStream#iterate already
superseding DataSet iterations or would that also need to be accounted

for?


In general, all surviving APIs should also offer a smooth experience for
switching back and forth.

On Thu, Jul 30, 2020 at 9:39 AM Márton Balassi <

balassi.mar...@gmail.com>

wrote:


Hi All,

Thanks for the write up and starting the discussion. I am in favor of
unifying the APIs the way described in the FLIP and deprecating the

DataSet

API. I am looking forward to the detailed discussion of the changes
necessary.

Best,
Marton

On Wed, Jul 29, 2020 at 12:46 PM Aljoscha Krettek <

aljos...@apache.org>

wrote:


Hi Everyone,

my colleagues (in cc) and I would like to propose this FLIP for
discussion. In short, we want to reduce the number of APIs that we

have

by deprecating the DataSet API. This is a big step for Flink, that's

why

I'm also cross-posting this to the User Mailing List.

FLIP-131: http://s.apache.org/FLIP-131

I'm posting the introduction of the FLIP below but please refer to

the

document linked above for the full details:

--
Flink provides three main SDKs/APIs for writing Dataflow Programs:

Table

API/SQL, the DataStream API, and the DataSet API. We believe that

this

is one API too many and propose to deprecate the DataSet API in

favor of

the Table API/SQL and the DataStream API. Of course, this is easier

said

than done, so in the following, we will outline why we think that

having

too many APIs is detrimental to the project and community. We will

then

describe how we can enhance the Table API/SQL and the DataStream API

to

subsume the DataSet API's functionality.

In this FLIP, we will not describe all the technical details of how

the

Table API/SQL and DataStream will be enhanced. The goal is to achieve
consensus on the idea of deprecating the DataSet API. There will

have to

be follow-up FLIPs that describe the necessary changes for the APIs

that

we maintain.
--

Please let us know if you have any concerns or comments. Also, please
keep discussion to this ML thread instead of commenting in the Wiki

so

that we can have a consistent view of the discussion.

Best,
Aljoscha





--

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng









[DISCUSS] FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API)

2020-07-29 Thread Aljoscha Krettek

Hi Everyone,

my colleagues (in cc) and I would like to propose this FLIP for 
discussion. In short, we want to reduce the number of APIs that we have 
by deprecating the DataSet API. This is a big step for Flink, that's why 
I'm also cross-posting this to the User Mailing List.


FLIP-131: http://s.apache.org/FLIP-131

I'm posting the introduction of the FLIP below but please refer to the 
document linked above for the full details:


--
Flink provides three main SDKs/APIs for writing Dataflow Programs: Table 
API/SQL, the DataStream API, and the DataSet API. We believe that this 
is one API too many and propose to deprecate the DataSet API in favor of 
the Table API/SQL and the DataStream API. Of course, this is easier said 
than done, so in the following, we will outline why we think that having 
too many APIs is detrimental to the project and community. We will then 
describe how we can enhance the Table API/SQL and the DataStream API to 
subsume the DataSet API's functionality.


In this FLIP, we will not describe all the technical details of how the 
Table API/SQL and DataStream will be enhanced. The goal is to achieve 
consensus on the idea of deprecating the DataSet API. There will have to 
be follow-up FLIPs that describe the necessary changes for the APIs that 
we maintain.

--

Please let us know if you have any concerns or comments. Also, please 
keep discussion to this ML thread instead of commenting in the Wiki so 
that we can have a consistent view of the discussion.


Best,
Aljoscha


Re: AllwindowStream and RichReduceFunction

2020-07-27 Thread Aljoscha Krettek

I think that should work with an aggregate() instead of reduce().

Best,
Aljoscha

On 24.07.20 17:02, Flavio Pompermaier wrote:

In my reduce function I want to compute some aggregation on the sub-results
of a map-partition (that I tried to migrate from DataSet to DataStream
without success).
The original code was something like:

  input.mapPartition(new RowToStringSketches(sketchMapSize)) //
 .reduce(new SketchesStringReducer()) //
 .map(new SketchesStringToStatsPojo(colIndex, topK));

I asked about the simulation of the mapPartition function in the streaming
env in another thread in the mailing list [1] because I was not able to
test it..it seems that the program was exiting before be able to process
anything..
So I gave up on replacing DataSet with DataStream API for the moment..it
seems that there are too many things still to migrate.
Btw, this is the reduce function:

public class SketchesStringReducer extends
RichReduceFunction> {
   private static final long serialVersionUID = 1L;

   private transient ArrayOfItemsSerDe serDe;

   @Override
   public void open(Configuration parameters) throws Exception {
 this.serDe = new ArrayOfStringsSerDe();
   }

   @Override
   public Tuple2 reduce(Tuple2 t1,
Tuple2 t2)
   throws Exception {
 // merge HLL
 final HllSketch hll1 = HllSketch.heapify(Memory.wrap(t1.f0));
 final HllSketch hll2 = HllSketch.heapify(Memory.wrap(t2.f0));
 final Union union = new Union(hll1.getLgConfigK());
 union.update(hll1);
 union.update(hll2);
 final byte[] hllSketchBytes = union.getResult().toCompactByteArray();

 // merge Item
 final ItemsSketch s1 =
ItemsSketch.getInstance(Memory.wrap(t1.f1), serDe);
 final ItemsSketch s2 =
ItemsSketch.getInstance(Memory.wrap(t2.f1), serDe);
 final byte[] itemSketchBytes = s1.merge(s2).toByteArray(serDe);
 return new Tuple2<>(hllSketchBytes, itemSketchBytes);
   }
}

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-there-a-way-to-use-stream-API-with-this-program-td36715.html#a36767

On Mon, Jul 20, 2020 at 6:32 PM Aljoscha Krettek 
wrote:


What are you trying to do in the ReduceFunction? Without knowing the
code, maybe an aggregate(AggregateFunction) is the solution.

Best,
Aljoscha

On 20.07.20 18:03, Flavio Pompermaier wrote:

Thanks Aljosha for the reply. So what can I do in my reduce function that
contains transient variables (i.e. not serializable)?

On Mon, Jul 20, 2020 at 4:38 PM Aljoscha Krettek 
wrote:


Hi Flavio,

the reason is that under the covers the ReduceFunction will be used as
the ReduceFunction of a ReducingState. And those cannot be rich
functions because we cannot provide all the required context "inside"
the state backend.

You can see how the ReduceFunction is used to create a
ReducingStateDescriptor here:



https://github.com/apache/flink/blob/0c43649882c831c1ec88f4e33d8a59b1cbf5f2fe/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java#L300


Best,
Aljoscha

On 16.07.20 16:28, Flavio Pompermaier wrote:

Hi to all,
I'm trying to apply a rich reduce function after a countWindowAll but

Flink

says
"ReduceFunction of reduce can not be a RichFunction. Please use
reduce(ReduceFunction, WindowFunction) instead."

Is there any good reason for this? Or am I doing something wrong?

Best,
Flavio













Re: GenericData cannot be cast to type scala.Product

2020-07-24 Thread Aljoscha Krettek
For anyone following this: the discussion is happening on the Jira 
issue: https://issues.apache.org/jira/browse/FLINK-18478


Best,
Aljoscha

On 23.07.20 15:32, Georg Heiler wrote:

Hi,

as a follow up to https://issues.apache.org/jira/browse/FLINK-18478 I now
face a class cast exception.
The reproducible example is available at
https://gist.github.com/geoHeil/5a5a4ae0ca2a8049617afa91acf40f89

I do not understand (yet) why such a simple example of reading Avro from a
Schema Registry and Kafka (in the scala API) is still causing problems.

java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record
cannot be cast to scala.Product

ava.lang.ClassCastException: org.apache.avro.generic.GenericData$Record
cannot be cast to scala.Product
 at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(
CaseClassSerializer.scala:32) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
 at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
 at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
 at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
 at org.apache.flink.streaming.api.operators.CountingOutput.collect(
CountingOutput.java:52) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
 at org.apache.flink.streaming.api.operators.CountingOutput.collect(
CountingOutput.java:30) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
 at org.apache.flink.streaming.api.operators.
StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:
104) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
 at org.apache.flink.streaming.api.operators.
StreamSourceContexts$NonTimestampContext.collectWithTimestamp(
StreamSourceContexts.java:111) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
 at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher
.emitRecordsWithTimestamps(AbstractFetcher.java:352)
~[flink-connector-kafka-base_2.11-1.11.1.jar:1.11.1]
 at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher
.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
~[flink-connector-kafka_2.11-1.11.1.jar:1.11.1]
 at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher
.runFetchLoop(KafkaFetcher.java:141) ~[flink-connector-kafka_2.11-1.11.1
.jar:1.11.1]
 at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
.run(FlinkKafkaConsumerBase.java:755) ~[flink-connector-kafka-base_2.11-1.11
.1.jar:1.11.1]
 at org.apache.flink.streaming.api.operators.StreamSource.run(
StreamSource.java:100) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
 at org.apache.flink.streaming.api.operators.StreamSource.run(
StreamSource.java:63) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
 at org.apache.flink.streaming.runtime.tasks.
SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
~[flink-dist_2.11-1.11.1.jar:1.11.1]

Best,
Georg





Re: AllwindowStream and RichReduceFunction

2020-07-20 Thread Aljoscha Krettek
What are you trying to do in the ReduceFunction? Without knowing the 
code, maybe an aggregate(AggregateFunction) is the solution.


Best,
Aljoscha

On 20.07.20 18:03, Flavio Pompermaier wrote:

Thanks Aljosha for the reply. So what can I do in my reduce function that
contains transient variables (i.e. not serializable)?

On Mon, Jul 20, 2020 at 4:38 PM Aljoscha Krettek 
wrote:


Hi Flavio,

the reason is that under the covers the ReduceFunction will be used as
the ReduceFunction of a ReducingState. And those cannot be rich
functions because we cannot provide all the required context "inside"
the state backend.

You can see how the ReduceFunction is used to create a
ReducingStateDescriptor here:

https://github.com/apache/flink/blob/0c43649882c831c1ec88f4e33d8a59b1cbf5f2fe/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java#L300

Best,
Aljoscha

On 16.07.20 16:28, Flavio Pompermaier wrote:

Hi to all,
I'm trying to apply a rich reduce function after a countWindowAll but

Flink

says
"ReduceFunction of reduce can not be a RichFunction. Please use
reduce(ReduceFunction, WindowFunction) instead."

Is there any good reason for this? Or am I doing something wrong?

Best,
Flavio









Re: Status of a job when a kafka source dies

2020-07-20 Thread Aljoscha Krettek

Hi,

Flink doesn't do any special failure-handling or retry logic, so it’s up 
to how the KafkaConsumer is configured via properties. In general Flink 
doesn’t try to be smart: when something fails an exception fill bubble 
up that will fail this execution of the job. If checkpoints are enabled 
this will trigger a restore, this is controlled by the restart strategy. 
If that eventually gives up the job fill go to “FAILED” and stop.


This is the relevant section of the docs: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/task_failure_recovery.html


Best,
Aljoscha

On 15.07.20 17:42, Nick Bendtner wrote:

Hi guys,
I want to know what is the default behavior of Kafka source when a kafka
cluster goes down during streaming. Will the job status go to failing or is
the exception caught and there is a back off before the source tries to
poll for more events ?


Best,
Nick.





Re: AllwindowStream and RichReduceFunction

2020-07-20 Thread Aljoscha Krettek

Hi Flavio,

the reason is that under the covers the ReduceFunction will be used as 
the ReduceFunction of a ReducingState. And those cannot be rich 
functions because we cannot provide all the required context "inside" 
the state backend.


You can see how the ReduceFunction is used to create a 
ReducingStateDescriptor here: 
https://github.com/apache/flink/blob/0c43649882c831c1ec88f4e33d8a59b1cbf5f2fe/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java#L300


Best,
Aljoscha

On 16.07.20 16:28, Flavio Pompermaier wrote:

Hi to all,
I'm trying to apply a rich reduce function after a countWindowAll but Flink
says
"ReduceFunction of reduce can not be a RichFunction. Please use
reduce(ReduceFunction, WindowFunction) instead."

Is there any good reason for this? Or am I doing something wrong?

Best,
Flavio





Re: map JSON to scala case class & off-heap optimization

2020-07-15 Thread Aljoscha Krettek

On 11.07.20 10:31, Georg Heiler wrote:

1) similarly to spark the Table API works on some optimized binary
representation
2) this is only available in the SQL way of interaction - there is no
programmatic API


yes it's available from SQL, but also the Table API, which is a 
programmatic declarative API, similar to Spark's Structured Streaming.




q1) I have read somewhere (I think in some Flink Forward presentations)
that the SQL API is not necessarily stable with regards to state - even
with small changes to the DAG (due to optimization). So does this also
/still apply to the table API? (I assume yes)


Yes, unfortunately this is correct. Because the Table API/SQL is 
declarative users don't have control over the DAG and the state that the 
operators have. Some work will happen on at least making sure that the 
optimizer stays stable between Flink versions or that we can let users 
pin a certain physical graph of a query so that it can be re-used across 
versions.



q2) When I use the DataSet/Stream (classical scala/java) API it looks like
I must create a custom serializer if I want to handle one/all of:

   - side-output failing records and not simply crash the job
   - as asked before automatic serialization to a scala (case) class


This is true, yes.


But I also read that creating the ObjectMapper (i.e. in Jackson terms)
inside the map function is not recommended. From Spark I know that there is
a map-partitions function, i.e. something where a database connection can
be created and then reused for the individua elements. Is a similar
construct available in Flink as well?


Yes, for this you can use "rich functions", which have an open()/close() 
method that allows initializing and re-using resources across 
invocations: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/user_defined_functions.html#rich-functions



Also, I have read a lot of articles and it looks like a lot of people
are using the String serializer and then manually parse the JSON which also
seems inefficient.
Where would I find an example for some Serializer with side outputs for
failed records as well as efficient initialization using some similar
construct to map-partitions?


I'm not aware of such examples, unfortunately.

I hope that at least some answers will be helpful!

Best,
Aljoscha


Re: flink take single element from stream

2020-07-10 Thread Aljoscha Krettek
I'm afraid limit() is not yet available on the Table API but you can use 
it via SQL, i.e. sth like "select * FROM (VALUES 'Hello', 'CIAO', 'foo', 
'bar') LIMIT 2;" works. You can execute that from the Table API via 
`TableEnvironment.executeSql()`.


Best,
Aljoscha

On 09.07.20 17:53, Georg Heiler wrote:

How can I explore a stream in Flink interactively?

Spark has the concept of take/head to extract the first n elements of a
dataframe / table.

Is something similar available in Flink for a stream like:

val serializer = new JSONKeyValueDeserializationSchema(false)
val stream = senv.addSource(
 new FlinkKafkaConsumer(
   "tweets-raw-json",
   serializer,
   properties
 ).setStartFromEarliest() // TODO experiment with different start values
   )

stream.head/take

does not seem to be implemented.





Re: map JSON to scala case class & off-heap optimization

2020-07-10 Thread Aljoscha Krettek

Hi Georg,

I'm afraid the other suggestions are missing the point a bit. From your 
other emails it seems you want to use Kafka with JSON records together 
with the Table API/SQL. For that, take a look at [1] which describes how 
to define data sources for the Table API. Especially the Kafka and JSON 
sections should be relevant.


That first link I mentioned is for the legacy connector API. There is a 
newer API with slightly different properties which will allow us to do 
the kinds of optimization like working on binary data throughout the 
stack: [2]. Unfortunately, there is no programmatic API yet, you would 
have to use `TableEnvironment.executeSql()` to execute SQL DDL that 
defines your sources. There is a FLIP for adding the programmatic API: [3]


Best,
Aljoscha

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connect.html


[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/


[3] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API


On 10.07.20 05:01, Aaron Levin wrote:

Hi Georg, you can try using the circe library for this which has a way to
automatically generate JSON decoders for scala case classes.

As it was mentioned earlier, Flink does not come packaged with
JSON-decoding generators for Scala like spark does.

On Thu, Jul 9, 2020 at 4:45 PM Georg Heiler 
wrote:


Great. Thanks.
But would it be possible to automate this i.e. to have this work
automatically for the case class / product?

Am Do., 9. Juli 2020 um 20:21 Uhr schrieb Taher Koitawala <
taher...@gmail.com>:


The performant way would be to apply a map function over the stream and
then use the Jackson ObjectMapper to convert to scala objects. In flink
there is no API like Spark to automatically get all fields.

On Thu, Jul 9, 2020, 11:38 PM Georg Heiler 
wrote:


How can I use it with a scala case class?
If I understand it correctly for better performance the Object Mapper is
already initialized in each KafkaConsumer and returning ObjectNodes. So
probably I should rephrase to: how can I then map these to case classes
without handcoding it?  https://github.com/json4s/json4s or
https://github.com/FasterXML/jackson-module-scala both only seem to
consume strings.

Best,
Georg

Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala <
taher...@gmail.com>:


You can try the Jackson ObjectMapper library and that will get you from
json to object.

Regards,
Taher Koitawala

On Thu, Jul 9, 2020, 9:54 PM Georg Heiler 
wrote:


Hi,

I want to map a stream of JSON documents from Kafka to a scala
case-class. How can this be accomplished using the
JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes
required?

I have a Spark background. There, such manual mappings usually are
discouraged. Instead, they offer a nice API (dataset API) to perform such a
type of assignment.
1) this is concise
2) it operates on sparks off-heap memory representations (tungsten) to
be faster

In Flink, instead, such off-heap optimizations seem not to be talked
much about (sorry if I miss something, I am a Flink newbie). Is there a
reason why these optimizations are not necessary in Flink?


How could I get the following example:
val serializer = new JSONKeyValueDeserializationSchema(false)
val stream = senv.addSource(
 new FlinkKafkaConsumer(
   "tweets-raw-json",
   serializer,
   properties
 ).setStartFromEarliest() // TODO experiment with different start
values
   )

to map to this Tweet class concisely, i.e. without manually iterating
through all the attribute fields and parsing the keys from the object node
tree.

final case class Tweet(tweet_id: Option[String], text: Option[String],
source: Option[String], geo: Option[String], place: Option[String], lang:
Option[String], created_at: Option[String], timestamp_ms: Option[String],
coordinates: Option[String], user_id: Option[Long], user_name:
Option[String], screen_name: Option[String], user_created_at:
Option[String], followers_count: Option[Long], friends_count: Option[Long],
user_lang: Option[String], user_location: Option[String], hashtags:
Option[Seq[String]])

Best,
Georg









Re: MalformedClassName for scala case class

2020-07-10 Thread Aljoscha Krettek

Hi,

could you please post the stacktrace with the exception and also let us 
know which Flink version you're using?


I have tried the following code and it works on 
master/flink-1.11/flink-1.10:


  case class Foo(lang: String, count: Int)
  def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val stenv = StreamTableEnvironment.create(senv)

val source = senv.fromElements("Hello", "ciao")
val mapped = source.map( e => {Foo(e, 13) } )
stenv.registerDataStream("foo", mapped)

senv.execute()
  }

It's not exactly your code but pretty similar and I use the same case class.

Best,
Aljoscha

On 09.07.20 22:44, Georg Heiler wrote:

Hi,

why can't I register the stream as a table and get a MalformedClassName
exception?

val serializer = new JSONKeyValueDeserializationSchema(false)
val stream = senv.addSource(
 new FlinkKafkaConsumer(
   "tweets-raw-json",
   serializer,
   properties
 ).setStartFromEarliest() // TODO experiment with different start values
   )

case class Foo(lang: String, count: Int)
val r = stream
 .map(e => {
   Foo(e.get("value").get("lang").asText(), 1)
 })
 .keyBy(_.lang)
 .timeWindow(Time.seconds(10))
 .sum("count")
r.print()
stenv.registerDataStream("tweets_json", r)

Best,
Georg





Re: Task recovery?

2020-07-10 Thread Aljoscha Krettek

On 03.07.20 18:42, John Smith wrote:

If I understand correctly on June 23rd it suspended the jobs? So at that
point they would no longer show in the UI or be restarted?


Yes, that is correct, though in the logs it seems the jobs failed 
terminally on June 22nd:


2020-06-22 23:30:22,130 INFO 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job 
ba50a77608992097a98b250b87a08da0 reached globally terminal state FAILED.


What you can do in that case is restore the jobs from a savepoint or 
from a retained checkpoint. See 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints, 
you need to manually enable checkpoint retention.


I hope that helps.

Best,
Aljoscha


Re: Avro from avrohugger still invalid

2020-07-03 Thread Aljoscha Krettek

Hi,

I don't think there's a workaround, except copying the code and manually 
fixing it. Did you check out my comment on the Jira issue and the new 
one I created?


Best,
Aljoscha

On 03.07.20 07:19, Georg Heiler wrote:

But would it be possible to somehow use AvroSerializer for now?

Best,
Georg

Am Do., 2. Juli 2020 um 23:44 Uhr schrieb Georg Heiler <
georg.kf.hei...@gmail.com>:


What is the suggested workaround for now?


Thanks!

Aljoscha Krettek  schrieb am Do. 2. Juli 2020 um
20:55:


Hi Georg,

unfortunately, it seems I only fixed the issue for AvroSerializer and
not for AvroDeserializationSchema. I created a new issue (which is a
clone of the old one) to track this [1]. The fix should be very simple
since it's the same issue.

Best,
Aljoscha

[1] https://issues.apache.org/jira/browse/FLINK-18478

On 01.07.20 09:11, Till Rohrmann wrote:

Hi Georg,

I'm pulling in Aljoscha who might know more about the problem you are
describing.

Cheers,
Till

On Mon, Jun 29, 2020 at 10:21 PM Georg Heiler <

georg.kf.hei...@gmail.com>

wrote:


Older versions of flink were incompatible with the Scala specific

record

classes generated from AvroHugger.

https://issues.apache.org/jira/browse/FLINK-12501 Flink 1.10

apparently

is fixing this. I am currently using 1.10.1. However, still experience

thus

problem


https://stackoverflow.com/questions/62637009/flink-use-confluent-schema-registry-for-avro-serde

of:

AvroRuntimeException: Not a Specific class

What is still wrong here?

Best,

Georg













Re: Avro from avrohugger still invalid

2020-07-02 Thread Aljoscha Krettek

Hi Georg,

unfortunately, it seems I only fixed the issue for AvroSerializer and 
not for AvroDeserializationSchema. I created a new issue (which is a 
clone of the old one) to track this [1]. The fix should be very simple 
since it's the same issue.


Best,
Aljoscha

[1] https://issues.apache.org/jira/browse/FLINK-18478

On 01.07.20 09:11, Till Rohrmann wrote:

Hi Georg,

I'm pulling in Aljoscha who might know more about the problem you are
describing.

Cheers,
Till

On Mon, Jun 29, 2020 at 10:21 PM Georg Heiler 
wrote:


Older versions of flink were incompatible with the Scala specific record
classes generated from AvroHugger.

https://issues.apache.org/jira/browse/FLINK-12501 Flink 1.10 apparently
is fixing this. I am currently using 1.10.1. However, still experience thus
problem
https://stackoverflow.com/questions/62637009/flink-use-confluent-schema-registry-for-avro-serde
of:

AvroRuntimeException: Not a Specific class

What is still wrong here?

Best,

Georg








Re: EventTimeSessionWindow firing too soon

2020-06-16 Thread Aljoscha Krettek
Did you look at the watermark metrics? Do you know what the current 
watermark is when the windows are firing. You could also get the current 
watemark when using a ProcessWindowFunction and also emit that in the 
records that you're printing, for debugging.


What is that TimestampAssigner you're using for your timestamp 
assigner/watermark extractor?


Best,
Aljoscha

On 16.06.20 14:10, Ori Popowski wrote:

Okay, so I created a simple stream (similar to the original stream), where
I just write the timestamps of each evaluated window to S3.
The session gap is 30 minutes, and this is one of the sessions:
(first-event, last-event, num-events)

11:23-11:23 11 events
11:25-11:26 51 events
11:28-11:29 74 events
11:31-11:31 13 events

Again, this is one session. How can we explain this? Why does Flink create
4 distinct windows within 8 minutes? I'm really lost here, I'd appreciate
some help.

On Tue, Jun 16, 2020 at 2:17 PM Ori Popowski  wrote:


Hi, thanks for answering.


I guess you consume from Kafka from the earliest offset, so you consume

historical data and Flink is catching-up.
Yes, it's what's happening. But Kafka is partitioned on sessionId, so skew
between partitions cannot explain it.
I think the only way it can happen is when when suddenly there's one event
with very late timestamp


Just to verify, if you do keyBy sessionId, do you check the gaps of

events from the same session?
Good point. sessionId is unique in this case, and even if it's not - every
single session suffers from this problem of early triggering so it's very
unlikely that all millions sessions within that hour had duplicates.

I'm suspecting that the fact I have two ProcessWindowFunctions one after
the other somehow causes this.
I deployed a version with one window function which just prints the
timestamps to S3 (to find out if I have event-time jumps) and suddenly it
doesn't trigger early (I'm running for 10 minutes and not a single event
has arrived to the sink)

On Tue, Jun 16, 2020 at 12:01 PM Rafi Aroch  wrote:


Hi Ori,

I guess you consume from Kafka from the earliest offset, so you consume
historical data and Flink is catching-up.

Regarding: *My event-time timestamps also do not have big gaps*

Just to verify, if you do keyBy sessionId, do you check the gaps of
events from the same session?

Rafi


On Tue, Jun 16, 2020 at 9:36 AM Ori Popowski  wrote:


So why is it happening? I have no clue at the moment.
My event-time timestamps also do not have big gaps between them that
would explain the window triggering.


On Mon, Jun 15, 2020 at 9:21 PM Robert Metzger 
wrote:


If you are using event time in Flink, it is disconnected from the real
world wall clock time.
You can process historical data in a streaming program as if it was
real-time data (potentially reading through (event time) years of data in a
few (wall clock) minutes)

On Mon, Jun 15, 2020 at 4:58 PM Yichao Yang <1048262...@qq.com> wrote:


Hi

I think it maybe you use the event time, and the timestamp between
your event data is bigger than 30minutes, maybe you can check the source
data timestamp.

Best,
Yichao Yang

--
发自我的iPhone


-- Original --
*From:* Ori Popowski 
*Date:* Mon,Jun 15,2020 10:50 PM
*To:* user 
*Subject:* Re: EventTimeSessionWindow firing too soon








Re: EventTimeSessionWindow firing too soon

2020-06-16 Thread Aljoscha Krettek
Sorry, I now saw that this thread diverged. My mail client didn't pick 
it up because someone messed up the subject of the thread.


On 16.06.20 14:06, Aljoscha Krettek wrote:

Hi,

what is the timescale of your data in Kafka. If you have data in there 
that spans more than ~30 minutes I would expect your windows to fire 
very soon after the job is started. Event time does not depend on a wall 
clock but instead advances with the time in the stream. As Flink 
advances through the data in Kafka so does event-time advance in step.


Does that explain your situation?

Best,
Aljoscha

On 15.06.20 16:49, Ori Popowski wrote:

I'm using Flink 1.10 on YARN, and I have a EventTimeSessionWindow with a
gap of 30 minutes.

But as soon as I start the job, events are written to the sink (I can see
them in S3) even though 30 minutes have not passed.

This is my job:

val stream = senv
   .addSource(new FlinkKafkaConsumer("…", 
compressedEventDeserializer,

properties))
   .filter(_.sessionId.nonEmpty)
   .flatMap(_ match { case (_, events) => events })
   .assignTimestampsAndWatermarks(new
TimestampExtractor[Event](Time.minutes(10)) {
 override def extractTimestamp(element: Event): Long =
event.sequence / 1000 // microseconds
   })
   .keyBy(_.sessionId)
   .window(EventTimeSessionWindows.withGap(Time.of(30, MINUTES)))
   .process(myProcessWindowFunction)

AsyncDataStream.unorderedWait(stream, myAsyncS3Writer, 30, SECONDS, 100)

Any idea why it's happening?







Re: EventTimeSessionWindow firing too soon

2020-06-16 Thread Aljoscha Krettek

Hi,

what is the timescale of your data in Kafka. If you have data in there 
that spans more than ~30 minutes I would expect your windows to fire 
very soon after the job is started. Event time does not depend on a wall 
clock but instead advances with the time in the stream. As Flink 
advances through the data in Kafka so does event-time advance in step.


Does that explain your situation?

Best,
Aljoscha

On 15.06.20 16:49, Ori Popowski wrote:

I'm using Flink 1.10 on YARN, and I have a EventTimeSessionWindow with a
gap of 30 minutes.

But as soon as I start the job, events are written to the sink (I can see
them in S3) even though 30 minutes have not passed.

This is my job:

val stream = senv
   .addSource(new FlinkKafkaConsumer("…", compressedEventDeserializer,
properties))
   .filter(_.sessionId.nonEmpty)
   .flatMap(_ match { case (_, events) => events })
   .assignTimestampsAndWatermarks(new
TimestampExtractor[Event](Time.minutes(10)) {
 override def extractTimestamp(element: Event): Long =
event.sequence / 1000 // microseconds
   })
   .keyBy(_.sessionId)
   .window(EventTimeSessionWindows.withGap(Time.of(30, MINUTES)))
   .process(myProcessWindowFunction)

AsyncDataStream.unorderedWait(stream, myAsyncS3Writer, 30, SECONDS, 100)

Any idea why it's happening?





Re: Does Flink support reading files or CSV files from java.io.InputStream instead of file paths?

2020-06-16 Thread Aljoscha Krettek

Hi Marco,

this is not possible since Flink is designed mostly to read files from a 
distributed filesystem, where paths are used to refer to those files. If 
you read from files on the classpath you could just use plain old Java 
code and won't need a distributed processing system such as Flink.


Best,
Aljoscha

On 16.06.20 06:46, Marco Villalobos wrote:


Does Flink support reading files or CSV files from java.io.InputStream instead 
of file paths?

I'd rather just store my file on the class path and load it with 
java.lang.ClassLoader#getResourceAsStream(String).

If there is a way, I'd appreciate an example.





Re: Improved performance when using incremental checkpoints

2020-06-16 Thread Aljoscha Krettek

Hi,

it might be that the operations that Flink performs on RocksDB during 
checkpointing will "poke" RocksDB somehow and make it clean up it's 
internal hierarchies of storage more. Other than that, I'm also a bit 
surprised by this.


Maybe Yun Tang will come up with another idea.

Best,
Aljoscha

On 16.06.20 12:42, nick toker wrote:

Hi,

We used both flink versions 1.9.1 and 1.10.1
We used rocksDB default configuration.
The streaming pipeline is very simple.

1. Kafka consumer
2. Process function
3. Kafka producer

The code of the process function is listed below:

private transient MapState testMapState;

@Override
 public void processElement(Map value, Context ctx,
Collector> out) throws Exception {

 if (testMapState.isEmpty()) {

 testMapState.putAll(value);

 out.collect(value);

 testMapState.clear();
 }
 }

We used the same code with ValueState and observed the same results.


BR,

Nick


‫בתאריך יום ג׳, 16 ביוני 2020 ב-11:56 מאת ‪Yun Tang‬‏ <‪myas...@live.com
‬‏>:‬


Hi Nick

It's really strange that performance could improve when checkpoint is
enabled.
In general, enable checkpoint might bring a bit performance downside to
the whole job.

Could you give more details e.g. Flink version, configurations of RocksDB
and simple code which could reproduce this problem.

Best
Yun Tang
--
*From:* nick toker 
*Sent:* Tuesday, June 16, 2020 15:44
*To:* user@flink.apache.org 
*Subject:* Improved performance when using incremental checkpoints

Hello,

We are using RocksDB as the backend state.
At first we didn't enable the checkpoints mechanism.

We observed the following behaviour and we are wondering why ?

When using the rocksDB *without* checkpoint the performance was very
extremely bad.
And when we enabled the checkpoint the performance was improved by a*
factor of 10*.

Could you please explain if this behaviour is expected ?
Could you please explain why enabling the checkpoint significantly
improves the performance ?

BR,
Nick







Re: Best way to "emulate" a rich Partitioner with open() and close() methods ?

2020-06-09 Thread Aljoscha Krettek

Hi,

I agree with Robert that adding open/close support for partitioners 
would mean additional complexity in the code base. We're currently not 
thinking of supporting that.


Best,
Aljoscha

On 05.06.20 20:19, Arvid Heise wrote:

Hi Arnaud,

just to add up. The overhead of this additional map is negligible if you
enable object reuse [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html

On Tue, Jun 2, 2020 at 10:34 AM Robert Metzger  wrote:


I'm not 100% sure about this answer, that's why I'm CCing Aljoscha to
correct me if needed:

Partitioners are not regular operators (like a map or window), thus they
are not included in the regular Task lifecycle methods (of open() / map()
etc. / close(), with the proper error handling, task cancellation
mechanisms etc.). The custom partition function is called somewhere close
to the network stack.
It would be quite a lot of effort (and added complexity to the codebase)
to allow for rich partitioners. Given that custom partitioners are a rarely
used feature, it would not be justified to spend a lot of time for this
(there's also a good workaround available)


On Fri, May 29, 2020 at 2:46 PM LINZ, Arnaud 
wrote:


Hello,



Yes, that would definitely do the trick, with an extra mapper after keyBy
to remove the tuple so that it stays seamless. It’s less hacky that what I
was thinking of, thanks!

However, is there any plan in a future release to have rich partitioners
? That would avoid adding  overhead and “intermediate” technical info in
the stream payload.

Best,

Arnaud



*De :* Robert Metzger 
*Envoyé :* vendredi 29 mai 2020 13:10
*À :* LINZ, Arnaud 
*Cc :* user 
*Objet :* Re: Best way to "emulate" a rich Partitioner with open() and
close() methods ?



Hi Arnaud,



Maybe I don't fully understand the constraints, but what about

stream.map(new GetKuduPartitionMapper).keyBy(0).addSink(KuduSink());


The map(new GetKuduPartitionMapper) will be a regular RichMapFunction
with open() and close() where you can handle the connection with Kudu's
partitioning service.

The map will output a Tuple2 (or something nicer :) ),
then Flink shuffles your data correctly, and the sinks will process the
data correctly partitioned.



I hope that this is what you were looking for!



Best,

Robert



On Thu, May 28, 2020 at 6:21 PM LINZ, Arnaud 
wrote:

Hello,



I would like to upgrade the performance of my Apache Kudu Sink by using
the new “KuduPartitioner” of Kudu API to match Flink stream partitions with
Kudu partitions to lower the network shuffling.

For that, I would like to implement something like

stream.partitionCustom(new KuduFlinkPartitioner<>(…)).addSink(new
KuduSink(…)));

With KuduFLinkPartitioner a implementation of 
org.apache.flink.api.common.functions.Partitioner
that internally make use of the KuduPartitioner client tool of Kudu’s API.



However for that KuduPartioner to work, it needs to open – and close at
the end – a connection to the Kudu table – obviously something that can’t
be done for each line. But there is no “AbstractRichPartitioner” with
open() and close() method that I can use for that (the way I use it in the
sink for instance).



What is the best way to implement this ?

I thought of ThreadLocals that would be initialized during the first call
to *int* partition(K key, *int* numPartitions);  but I won’t be able to
close() things nicely as I won’t be notified on job termination.



I thought of putting those static ThreadLocals inside a “Identity Mapper”
that would be called just prior the partition with something like :

stream.map(richIdentiyConnectionManagerMapper).partitionCustom(new
KuduFlinkPartitioner<>(…)).addSink(new KuduSink(…)));

with kudu connections initialized in the mapper open(), closed in the
mapper close(), and used  in the partitioner partition().

However It looks like an ugly hack breaking every coding principle, but
as long as the threads are reused between the mapper and the partitioner I
think that it should work.



Is there a better way to do this ?



Best regards,

Arnaud






--


L'intégrité de ce message n'étant pas assurée sur internet, la société
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
vous n'êtes pas destinataire de ce message, merci de le détruire et
d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The
company that sent this message cannot therefore be held liable for its
content nor attachments. Any unauthorized use or dissemination is
prohibited. If you are not the intended recipient of this message, then
please delete it and notify the sender.








Re: java.lang.AbstractMethodError when implementing KafkaSerializationSchema

2020-05-26 Thread Aljoscha Krettek
I think what might be happening is that you're mixing dependencies from 
the flink-sql-connector-kafka and the proper flink-connector-kafka that 
should be used with the DataStream API. Could that be the case?


Best,
Aljoscha

On 25.05.20 19:18, Piotr Nowojski wrote:

Hi,

It would be helpful if you could provide full stack trace, what Flink version 
and which Kafka connector version are you using?

It sounds like either a dependency convergence error (mixing Kafka 
dependencies/various versions of flink-connector-kafka inside a single job/jar) 
or some shading issue. Can you check your project for such issues (`mvn 
dependency:tree` command [1]).

Also what’s a bit suspicious for me is the return type:


Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord;


I’m not sure, but I was not aware that we are shading Kafka dependency in our 
connectors? Are you manually shading something?

Piotrek

[1] 
https://maven.apache.org/plugins/maven-dependency-plugin/examples/resolving-conflicts-using-the-dependency-tree.html
 



On 22 May 2020, at 15:34, wangl...@geekplus.com.cn wrote:


public class MyKafkaSerializationSchema implements 
KafkaSerializationSchema> {
 @Override
 public ProducerRecord serialize(Tuple2 o, 
@Nullable Long aLong) {
 ProducerRecord record = new ProducerRecord<>(o.f0,
 o.f1.getBytes(StandardCharsets.UTF_8));
 return record;
 }
}
FlinkKafkaProducer> producer = new 
FlinkKafkaProducer>(
 "default", new MyKafkaSerializationSchema(),
 prop2,Semantic.EXACTLY_ONCE);

But there's  error when runnng:

java.lang.AbstractMethodError: 
com.geekplus.flinketl.schema.MyKafkaSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord;

Any suggestion on this?

Thanks,
Lei
wangl...@geekplus.com.cn 






Re: [EXTERNAL] Re: Memory growth from TimeWindows

2020-05-25 Thread Aljoscha Krettek
Just to double check: the issue was resolved by using a different GC? 
Because the default GC was too "lazy". ;-)


Best,
Aljoscha

On 21.05.20 18:09, Slotterback, Chris wrote:

For those who are interested or googling the mail archives in 8 months, the 
issue was garbage collection related.

The default 1.8 jvm garbage collector (parallel gc) was being lazy in its 
marking and collection phases and letting the heap build to a level that was 
causing memory exceptions and stalled tms. This app has a lot of state, and 
memory usage well above 10GB at times. The solution was moving to the G1 
collector which is very aggressive in its young generation collection by 
default, at the cost of some cpu usage and requires some tuning, but keeps the 
memory levels much more stable.

On 5/20/20, 9:05 AM, "Slotterback, Chris"  
wrote:

 What I've noticed is that heap memory ends up growing linearly with time 
indefinitely (past 24 hours) until it hits the roof of the allocated heap for 
the task manager, which leads me to believe I am leaking somewhere. All of my 
windows have an allowed lateness of 5 minutes, and my watermarks are pulled 
from time embedded in the records using 
BoundedOutOfOrdernessTimestampExtractors. My TumblingEventTimeWindows and 
SlidingEventTimeWindow all use AggregateFunctions, and my intervalJoins use 
ProcessJoinFunctions.

 I expect this app to use a significant amount of memory at scale due to 
the 288 5-minute intervals in 24 hours, and records being put in all 288 window 
states, and as the application runs for 24 hours memory would increase as all 
288(*unique key) windows build with incoming records, but then after 24 hours 
the memory should stop growing, or at least grow at a different rate?

 Also of note, we are using a FsStateBackend configuration, and plan to 
move to RocksDBStateBackend, but from what I can tell, this would only reduce 
memory and delay hitting the heap memory capacity, not stall it forever?

 Thanks
 Chris


     On 5/18/20, 7:29 AM, "Aljoscha Krettek"  wrote:

 On 15.05.20 15:17, Slotterback, Chris wrote:
 > My understanding is that while all these windows build their memory 
state, I can expect heap memory to grow for the 24 hour length of the 
SlidingEventTimeWindow, and then start to flatten as the t-24hr window frames 
expire and release back to the JVM. What is actually happening is when a constant 
data source feeds the stream, the heap memory profile grows linearly past the 24 
hour mark. Could this be a result of a misunderstanding of how the window’s memory 
states are kept, or is my assumption correct, and it is more likely I have a leak 
somewhere?

 Will memory keep growing indefinitely? That would indicate a bug? What
 sort of lateness/watermark settings do you have? What window function 
do
 you use? ProcessWindowFunction, or sth that aggregates?

 Side note: with sliding windows of 24h/5min you will have a "write
 amplification" of 24*60/5=288, each record will be in 288 windows, 
which
 will each be kept in separate state?

 Best,
 Aljoscha







Re: Performance impact of many open windows at the same time

2020-05-25 Thread Aljoscha Krettek

Hi,

I don't think this will immediately degrade performance. State is 
essentially stored in a HashMap (for the FileStateBackend) or RocksDB 
(for the RocksDB backend). If these data structures don't degrade with 
size then your performance also shouldn't degrade.


There are of course some effects that would come into play. For example 
if the data grows so much that it negatively affects GC this would of 
course have an effect, but that doesn't come from the data structures as 
they are.


I hope that helps!

Best,
Aljoscha

On 22.05.20 06:23, Tzu-Li (Gordon) Tai wrote:

Hi Joe,

The main effect this should have is more state to be kept until the windows
can be fired (and state purged).
This would of course increase the time it takes to checkpoint the operator.

I'm not sure if there will be significant runtime per-record impact caused
by how windows are bookkeeped in data structures in the WindowOperator,
maybe Aljoscha (cc'ed) can chime in here for anything.
If it is certain that these windows will never fire (until far into the
future) because the event-timestamps are in the first place corrupted, then
it might make sense to have a way to drop windows based on some criteria.
I'm not sure if that is supported in any way without triggers (since you
mentioned that those windows might not receive any data), again Aljoscha
might be able to provide more info here.

Cheers,
Gordon

On Thu, May 21, 2020 at 7:02 PM Joe Malt  wrote:


Hi all,

I'm looking into what happens when messages are ingested with timestamps
far into the future (e.g. due to corruption or a wrong clock at the sender).

I'm aware of the effect on watermarking, but another thing I'm concerned
about is the performance impact of the extra windows this will create.

If a Flink operator has many (perhaps hundreds or thousands) of windows
open but not receiving any data (and never firing), will this degrade
performance?

Thanks,
Joe







Re: "Fill in" notification messages based on event time watermark

2020-05-18 Thread Aljoscha Krettek
I think there is some confusion in this thread between the auto 
watermark interval and the interval (length) of an event-time window. 
Maybe clearing that up for everyone helps.


The auto watermark interval is the periodicity (in processing time) at 
which Flink asks the source (or a watermark generator) what the current 
watermark is. The source will keep track of the timestamps that it can 
"respond" to Flink when it asks. For example, if the auto watermark 
interval is set to 1 sec, Flink will update the watermark information 
every second. This doesn't mean, though, that the watermark advances 1 
sec in that time. If you're reading through some historic data the 
watermark could jump by hours in between those 1 second intervals. You 
can also think of this as the sampling interval for updating the current 
watermark.


The window size size independent of the auto watermark interval, you can 
have an arbitrary size here. The auto watermark interval only controls 
how frequent Flink will check and emit the contents of windows, if their 
end timestamp is below the watermark.


I hope that helps. If we're all clear we can look at the concrete 
problem again.


Best,
Aljoscha

On 30.04.20 12:46, Manas Kale wrote:

Hi Timo and Piotrek,
Thank you for the suggestions.
I have been trying to set up unit tests at the operator granularity, and
the blog post's testHarness examples certainly help a lot in this regard.

I understood my problem - an upstream session window operator can only
report the end of the session window when the watermark has passed
{lastObserverEvent + sessionTimeout}. However, my watermark was being
updated periodically without taking this into account. It seems I will have
to delay this notification operator's watermark by sessionTimeout.
Another complication is that this sessionTimeout is per-key, so I guess I
will have to implement a watermark assigner that extracts the delay period
from data (similar to DynamicEventTimeWindows).

Also, if I do implement such an assigner, would it be helpful to add it to
Flink? I am happy to contribute if so. Any other comments/observations are
also welcome!

Thank you all for the help,
Manas


On Wed, Apr 29, 2020 at 3:39 PM Piotr Nowojski  wrote:


Hi Manas,

Adding to the response from Timo, if you don’t have unit tests/integration
tests, I would strongly recommend setting them up, as it makes debugging
and testing easier. You can read how to do it for your functions and
operators here [1] and here [2].

Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
[2]
https://flink.apache.org/news/2020/02/07/a-guide-for-unit-testing-in-apache-flink.html

On 28 Apr 2020, at 18:45, Timo Walther  wrote:

Hi Manas,

Reg. 1: I would recommend to use a debugger in your IDE and check which
watermarks are travelling through your operators.

Reg. 2: All event-time operations are only performed once the watermark
arrived from all parallel instances. So roughly speaking, in machine time
you can assume that the window is computed in watermark update intervals.
However, "what is computed" depends on the timestamps of your events and
how those are categorized in windows.

I hope this helps a bit.

Regards,
Timo

On 28.04.20 14:38, Manas Kale wrote:

Hi David and Piotrek,
Thank you both for your inputs.
I tried an implementation with the algorithm Piotrek suggested and David's
example. Although notifications are being generated with the watermark,
subsequent transition events are being received after the watermark has
crossed their timestamps. For example:
state1 @ 100
notification state1@ 110
notification state1@ 120
notification state1@ 130<- shouldn't have emitted this
state2 @ 125 <- watermark is > 125 at this stage
I think something might be subtly(?) wrong with how I have structured
upstream operators. The allowed lateness is 0 in the watermarkassigner
upstream, and I generate watermarks every x seconds.
The operator that emits state transitions is constructed using the
TumblingWindow approach I described in the first e-mail (so that I can
compute at every watermark update). Note that I can use this approach for
state-transition-operator because it only wants to emit transitions, and
nothing in between.
So, two questions:
1. Any idea on what might be causing this incorrect watermark behaviour?
2. If I want to perform some computation only when the watermark updates,
is using a watermark-aligned EventTimeTumblingWindow (meaning
windowDuration = watermarkUpdateInterval) the correct way to do this?
Regards,
Manas
On Tue, Apr 28, 2020 at 2:16 AM David Anderson mailto:da...@ververica.com >> wrote:
Following up on Piotr's outline, there's an example in the
documentation of how to use a KeyedProcessFunction to implement an
event-time tumbling window [1]. Perhaps that can help you get started.
Regards,
David
[1]

https://ci.apache.org/projects/flink/flink-docs-master/tutorials/event

Re: Memory growth from TimeWindows

2020-05-18 Thread Aljoscha Krettek

On 15.05.20 15:17, Slotterback, Chris wrote:

My understanding is that while all these windows build their memory state, I 
can expect heap memory to grow for the 24 hour length of the 
SlidingEventTimeWindow, and then start to flatten as the t-24hr window frames 
expire and release back to the JVM. What is actually happening is when a 
constant data source feeds the stream, the heap memory profile grows linearly 
past the 24 hour mark. Could this be a result of a misunderstanding of how the 
window’s memory states are kept, or is my assumption correct, and it is more 
likely I have a leak somewhere?


Will memory keep growing indefinitely? That would indicate a bug? What 
sort of lateness/watermark settings do you have? What window function do 
you use? ProcessWindowFunction, or sth that aggregates?


Side note: with sliding windows of 24h/5min you will have a "write 
amplification" of 24*60/5=288, each record will be in 288 windows, which 
will each be kept in separate state?


Best,
Aljoscha




Re: Export user metrics with Flink Prometheus endpoint

2020-05-18 Thread Aljoscha Krettek
Now I see what you mean. I think you would have to somehow set up the 
Flink metrics system as a backend for opencensus. Then the metrics would 
be reported to the same system (prometheus) in this case. In Opencensus 
lingo, this would mean using a Flink-based Stats Exporter instead of the 
Prometheus backend directly.


I don't have experience developing opencensus Stats Exporters but 
working off the provided ones should work. Maybe have a look at the 
prometheus exporter [1] and go from there. The entrypoint for a Flink 
exporter would then probably be


FlinkStatsCollector.createAndRegister(getMetricGroup());

Best,
Aljoscha

On 06.05.20 17:11, Eleanore Jin wrote:

Hi Aljoscha,

Thanks for the response, yes prometheus reporter is already enabled and I
am able to get the flink metrics from prometheus.

My question is more like, currently I am using opencensus library
<https://github.com/census-instrumentation/opencensus-java> to collect
application metrics, so is there an easy way to integrate this metrics with
flink metrics endpoint.

Thanks!
Eleanore

On Wed, May 6, 2020 at 7:48 AM Aljoscha Krettek  wrote:


Hi,

that should be possible. Did you have a look at the documentation for
setting up a Prometheus metrics reporter:

https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#prometheus-orgapacheflinkmetricsprometheusprometheusreporter

Best,
Aljoscha

On 06.05.20 03:02, Eleanore Jin wrote:

Hi all,

I just wonder is it possible to use Flink Metrics endpoint to allow
Prometheus to scrape user defined metrics?

Context:
In addition to Flink metrics, we also collect some application level
metrics using opencensus. And we run opencensus agent as side car in
kubernetes pod to collect metrics (opencensus agent talks to task manager
container via rpcs)

The issue with this approach is: it looks like opencensus agent keeps
staled metrics, causing the metrics reporting inaccurate, and this

project

is not actively maintained anymore.

So I wonder if it is possible to use Flink metrics endpoint for user
defined metrics.

Thanks a lot!
Eleanore










Re: Casting from org.apache.flink.api.java.tuple.Tuple2 to scala.Product; using java and scala in flink job

2020-05-06 Thread Aljoscha Krettek

No, I think that should be all right.

On 06.05.20 16:57, Vishwas Siravara wrote:

Thanks I figured that would be the case. I m using the flink tuple type in
the map functions ,so there is no casting required now. Can you think of
any downsides of using flink tuples in scala code, especially since the
flink tuple is in the java api package in flink ?

Best,
Nick.

On Wed, May 6, 2020 at 9:52 AM Aljoscha Krettek  wrote:


Hi,

Flink will not do any casting between types. You either need to output
to correct (Scala) Tuple type from the deserialization schema or insert
a step (say a map function) that converts between the two types. The
Tuple2 type and the Scala tuple type, i.e. (foo, bar) have nothing in
common when it comes to the type system.

Best,
Aljoscha

On 06.05.20 01:42, Nick Bendtner wrote:

Hi guys,
In our flink job we use java source for deserializing a message from

kafka

using a kafka deserializer. Signature is as below.


public class CustomAvroDeserializationSchema implements
  KafkaDeserializationSchema>

The other parts of the streaming job are in scala. When data has to be
serialized I get this exception




*java.lang.RuntimeException: org.apache.flink.api.java.tuple.Tuple2

cannot

be cast to scala.Product at


org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)

at


org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)

at


org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)*


Here is how I provide type info for serialization in the java
deserialization class:

@Override
public TypeInformation>

getProducedType() {

  return new TupleTypeInfo(TypeInformation.of(EventMetaData.class),new
GenericRecordAvroTypeInfo(this
  .writer));

Here is how I add the kafka source in scala :

private[flink] def sourceType(
deserialization: KafkaDeserializationSchema[(EventMetaData,

GenericRecord)],

properties: Properties): FlinkKafkaConsumer[(EventMetaData,
GenericRecord)] = {
val consumer = new FlinkKafkaConsumer[(EventMetaData, GenericRecord)](
  source.asJava,
  deserialization,
  properties)
consumer
}

Any idea thoughts on how to interoperate between java tuple2 and scala

case

class ? Also using 1.9.1 version of flink-connector-kafka while the rest

of

the cluster uses 1.7.2 version of flink.

Best,
Nick.










Re: Casting from org.apache.flink.api.java.tuple.Tuple2 to scala.Product; using java and scala in flink job

2020-05-06 Thread Aljoscha Krettek

Hi,

Flink will not do any casting between types. You either need to output 
to correct (Scala) Tuple type from the deserialization schema or insert 
a step (say a map function) that converts between the two types. The 
Tuple2 type and the Scala tuple type, i.e. (foo, bar) have nothing in 
common when it comes to the type system.


Best,
Aljoscha

On 06.05.20 01:42, Nick Bendtner wrote:

Hi guys,
In our flink job we use java source for deserializing a message from kafka
using a kafka deserializer. Signature is as below.


public class CustomAvroDeserializationSchema implements
 KafkaDeserializationSchema>

The other parts of the streaming job are in scala. When data has to be
serialized I get this exception




*java.lang.RuntimeException: org.apache.flink.api.java.tuple.Tuple2 cannot
be cast to scala.Product at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)*

Here is how I provide type info for serialization in the java
deserialization class:

@Override
public TypeInformation> getProducedType() {
 return new TupleTypeInfo(TypeInformation.of(EventMetaData.class),new
GenericRecordAvroTypeInfo(this
 .writer));

Here is how I add the kafka source in scala :

private[flink] def sourceType(
   deserialization: KafkaDeserializationSchema[(EventMetaData, GenericRecord)],
   properties: Properties): FlinkKafkaConsumer[(EventMetaData,
GenericRecord)] = {
   val consumer = new FlinkKafkaConsumer[(EventMetaData, GenericRecord)](
 source.asJava,
 deserialization,
 properties)
   consumer
}

Any idea thoughts on how to interoperate between java tuple2 and scala case
class ? Also using 1.9.1 version of flink-connector-kafka while the rest of
the cluster uses 1.7.2 version of flink.

Best,
Nick.





Re: MongoDB as a Sink;

2020-05-06 Thread Aljoscha Krettek

Hi,

yes, that is correct. You need to implement a SinkFunction. For getting 
started you can take a look at the Elasticsearch connector because 
Elasticsearch and MongoDB are roughly similar in terms of how you work 
with them, i.e. they are both key-value stores.


Best,
Aljoscha

On 06.05.20 02:36, Aissa Elaffani wrote:

Hello Guys,
I am looking for some help concerning my flink sink, i want te output to be
stocked in MongoDB database. As far as I know, there is no sink
conector for MongoDB, and I need to implement one by my self, and i don't
know how to do that. Can you please help me in this ?





  1   2   3   4   5   6   7   8   9   10   >