Re: End-to-end lag spikes when closing a large number of panes

2024-03-27 Thread Robert Metzger
Hey Caio,

Your analysis of the problem sounds right to me, I don't have a good
solution for you :(

I’ve validated that CPU profiles show clearAllState using a significant
> amount of CPU.


Did you use something like async-profiler here? Do you have more info on
the breakdown into what used the CPU time?
Once we know that, there might be an opportunity to do such operations
async/lazily, or fix something with the underlying platform (e.g. Rocksdb
is slow, ...)


On Thu, Mar 21, 2024 at 12:05 AM Caio Camatta via user <
user@flink.apache.org> wrote:

> Hey Asimansu,
>
> The inputDataStream is a KeyedStream, I forgot to mention that.
>
> Caio
>
> On Wed, Mar 20, 2024 at 6:56 PM Asimansu Bera 
> wrote:
>
>> Hello Caio,
>>
>> Based on the pseudocode, there is no keyed function present. Hence, the
>> window will not be processed parallely . Please check again and respond
>> back.
>>
>> val windowDataStream =
>>
>>   inputDataStream
>>
>> .window(TumblingEventTimeWindows of 1 hour)
>>
>> .trigger(custom trigger)
>>
>> .aggregate(
>>
>>preAggregator = custom AggregateFunction,
>>
>>windowFunction = custom ProcessWindowFunction
>>
>> )
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/#keyed-vs-non-keyed-windows
>>
>> -A
>>
>>
>> On Wed, Mar 20, 2024 at 8:55 AM Caio Camatta via user <
>> user@flink.apache.org> wrote:
>>
>>> We run a large-scale Flink 1.16 cluster that uses windowed aggregations
>>> and we’re seeing lag spikes on window closure. I’m curious if others have
>>> encountered similar issues before and if anyone has suggestions for how to
>>> tackle this problem (other than simply increasing parallelism).
>>> ContextLag definition
>>>
>>> We define end-to-end lag as the delta between the time when the event
>>> was persisted in Kafka and the time when Flink finishes processing the
>>> event.
>>> Window operator definition
>>>
>>> The important parts (in pseudocode):
>>>
>>> val windowDataStream =
>>>
>>>   inputDataStream
>>>
>>> .window(TumblingEventTimeWindows of 1 hour)
>>>
>>> .trigger(custom trigger)
>>>
>>> .aggregate(
>>>
>>>preAggregator = custom AggregateFunction,
>>>
>>>windowFunction = custom ProcessWindowFunction
>>>
>>> )
>>>
>>> The custom trigger emits a TriggerResult.CONTINUE in onEventTime, i.e.
>>> we don’t run any user-defined logic at the end of the window. (This trigger
>>> only fires while the window is active via custom logic in onElement.)
>>> Numbers
>>>
>>> Our Flink app processes ~3K events per second and I’ve calculated that
>>> there are around 200-300K panes to close per Task at the end of the 1-hour
>>> window. Our lag is fairly stable at a few hundred milliseconds during
>>> the window but spikes to 5-10 seconds when the window expires, which is a
>>> problem for us.
>>> The issue
>>>
>>> The magnitude of the lag spikes on window closure correlate with
>>>
>>>-
>>>
>>>the size of the window (a 1-hour window has bigger spikes than a
>>>5-minute window.)
>>>-
>>>
>>>the cardinality of the keys in the event stream.
>>>-
>>>
>>>the number of events being processed per second.
>>>
>>> In other words, the more panes to close, the bigger the lag spike. I'm
>>> fairly sure that the lag is coming entirely from the WindowOperator’s
>>> clearAllState and I’ve validated that CPU profiles show clearAllState
>>> using a significant amount of CPU.
>>>
>>> Does this theory sound plausible? What could we do to minimize the
>>> effects of window clean-up? It would be nice to do it incrementally or
>>> asynchronously but I'm not sure if Flink provides this functionality today.
>>> Thanks,
>>> Caio
>>>
>>


Re: [Discuss] Creating an Apache Flink slack workspace

2022-05-12 Thread Robert Metzger
e Discourse forum is much more inviting and vibrant
>> > than a
>> > >>>> mailing list. Just from a tool perspective, discourse would have
>> the
>> > >>>> advantage of being Open Source and so we could probably self-host
>> it
>> > on an
>> > >>>> ASF machine. [1]
>> > >>>>
>> > >>>> When it comes to Slack, I definitely see the benefit of a dedicated
>> > >>>> Apache Flink Slack compared to ASF Slack. For example, we could
>> have
>> > more
>> > >>>> channels (e.g. look how many channels Airflow is using
>> > >>>> http://apache-airflow.slack-archives.org) and we could generally
>> > >>>> customize the experience more towards Apache Flink.  If we go for
>> > Slack,
>> > >>>> let's definitely try to archive it like Airflow did. If we do
>> this, we
>> > >>>> don't necessarily need infinite message retention in Slack itself.
>> > >>>>
>> > >>>> Cheers,
>> > >>>>
>> > >>>> Konstantin
>> > >>>>
>> > >>>> [1] https://github.com/discourse/discourse
>> > >>>>
>> > >>>>
>> > >>>> Am Di., 10. Mai 2022 um 10:20 Uhr schrieb Timo Walther <
>> > >>>> twal...@apache.org>:
>> > >>>>
>> > >>>>> I also think that a real-time channel is long overdue. The Flink
>> > >>>>> community in China has shown that such a platform can be useful
>> for
>> > >>>>> improving the collaboration within the community. The DingTalk
>> > channel of
>> > >>>>> 10k+ users collectively helping each other is great to see. It
>> could
>> > also
>> > >>>>> reduce the burden from committers for answering frequently asked
>> > questions.
>> > >>>>>
>> > >>>>> Personally, I'm a mailing list fan esp. when it comes to design
>> > >>>>> discussions. In my opinion, the dev@ mailing list should
>> definitely
>> > >>>>> stay where and how it is. However, I understand that users might
>> not
>> > want
>> > >>>>> to subscribe to a mailing list for a single question and get their
>> > mailbox
>> > >>>>> filled with unrelated discussions afterwards. Esp. in a company
>> > setting it
>> > >>>>> might not be easy to setup a dedicated email address for mailing
>> > lists and
>> > >>>>> setting up rules is also not convenient.
>> > >>>>>
>> > >>>>> It would be great if we could use the ASF Slack. We should find an
>> > >>>>> official, accessible channel. I would be open for the right tool.
>> It
>> > might
>> > >>>>> make sense to also look into Discourse or even Reddit? The latter
>> > would
>> > >>>>> definitely be easier to index by a search engine. Discourse is
>> > actually
>> > >>>>> made for modern real-time forums.
>> > >>>>>
>> > >>>>> Regards,
>> > >>>>> Timo
>> > >>>>>
>> > >>>>>
>> > >>>>> Am 10.05.22 um 09:59 schrieb David Anderson:
>> > >>>>>
>> > >>>>> Thank you @Xintong Song  for sharing the
>> > >>>>> experience of the Flink China community.
>> > >>>>>
>> > >>>>> I'm become convinced we should give Slack a try, both for
>> discussions
>> > >>>>> among the core developers, and as a place where the community can
>> > reach out
>> > >>>>> for help. I am in favor of using the ASF slack, as we will need a
>> > paid
>> > >>>>> instance for this to go well, and joining it is easy enough (took
>> me
>> > about
>> > >>>>> 2 minutes). Thanks, Robert, for suggesting we go down this route.
>> > >>>>>
>> > >>>>> David
>> > >>>>>
>> > >>>>> On Tue, May 10, 2022 at 8:21 AM Robert Metzger <
>> rmetz...@apache.org>
>> > >>>>> wrote:
>> > >>>>>
&g

Re: [Discuss] Creating an Apache Flink slack workspace

2022-05-10 Thread Robert Metzger
It seems that we'd have to use invite links on the Flink website for people
to join our Slack (1)
These links can be configured to have no time-expiration, but they will
expire after 100 guests have joined.
I guess we'd have to use a URL shortener (https://s.apache.org) that we
update once the invite link expires. It's not a nice solution, but it'll
work.


(1) https://the-asf.slack.com/archives/CBX4TSBQ8/p1652125017094159


On Mon, May 9, 2022 at 3:59 PM Robert Metzger  wrote:

> Thanks a lot for your answer. The onboarding experience to the ASF Slack
> is indeed not ideal:
> https://apisix.apache.org/docs/general/join#join-the-slack-channel
> I'll see if we can improve it
>
> On Mon, May 9, 2022 at 3:38 PM Martijn Visser 
> wrote:
>
>> As far as I recall you can't sign up for the ASF instance of Slack, you
>> can
>> only get there if you're a committer or if you're invited by a committer.
>>
>> On Mon, 9 May 2022 at 15:15, Robert Metzger  wrote:
>>
>> > Sorry for joining this discussion late, and thanks for the summary
>> Xintong!
>> >
>> > Why are we considering a separate slack instance instead of using the
>> ASF
>> > Slack instance?
>> > The ASF instance is paid, so all messages are retained forever, and
>> quite
>> > a few people are already on that Slack instance.
>> > There is already a #flink channel on that Slack instance, that we could
>> > leave as passive as it is right now, or put some more effort into it,
>> on a
>> > voluntary basis.
>> > We could add another #flink-dev channel to that Slack for developer
>> > discussions, and a private flink-committer and flink-pmc chat.
>> >
>> > If we are going that path, we should rework the "Community" and "Getting
>> > Help" pages and explain that the mailing lists are the "ground truth
>> tools"
>> > in Flink, and Slack is only there to facilitate faster communication,
>> but
>> > it is optional / voluntary (e.g. a committers won't respond to DMs)
>> >
>> > All public #flink-* channels should be archived and google-indexable.
>> > I've asked Jarek from Airflow who's maintaining
>> > http://apache-airflow.slack-archives.org.
>> > If we can't use slack-archives.org, it would be nice to find some
>> > volunteers in the Flink community to hack a simple indexing tool.
>> > The indexing part is very important for me, because of some bad
>> > experiences with the Kubernetes experience, where most of the advanced
>> > stuff is hidden in their Slack, and it took me a few weeks to find that
>> > goldmine of information.
>> >
>> > Overall, I see this as an experiment worth doing, but I would suggest
>> > revisiting it in 6 to 12 months: We should check if really all important
>> > decisions are mirrored to the right mailing lists, and that we get the
>> > benefits we hoped for (more adoption, better experience for users and
>> > developers), and that we can handle the concerns (DMs to developers,
>> > indexing).
>> >
>> >
>> >
>> >
>> >
>> > On Sat, May 7, 2022 at 12:22 PM Xintong Song 
>> > wrote:
>> >
>> >> Thanks all for the valuable feedback.
>> >>
>> >> It seems most people are overall positive about using Slack for dev
>> >> discussions, as long as they are properly reflected back to the MLs.
>> >> - We definitely need a code of conduct that clearly specifies what
>> people
>> >> should / should not do.
>> >> - Contributors pinging well-known reviewers /committers, I think that
>> also
>> >> happens now on JIRA / Github. Personally, I'd understand a no-reply as
>> a
>> >> "soft no". We may consider to also put that in the cod of conduct.
>> >>
>> >> Concerning using Slack for user QAs, it seem the major concern is
>> that, we
>> >> may end up repeatedly answering the same questions from different
>> users,
>> >> due to lack of capacity for archiving and searching historical
>> >> conversations. TBH, I don't have a good solution for the archivability
>> and
>> >> searchability. I investigated some tools like Zapier [1], but none of
>> them
>> >> seems suitable for us. However, I'd like to share 2 arguments.
>> >> - The purpose of Slack is to make the communication more efficient? By
>> >> *efficient*, I mean saving time for both question askers and helpers
>> with
>> >> instance messa

Re: [Discuss] Creating an Apache Flink slack workspace

2022-05-09 Thread Robert Metzger
Thanks a lot for your answer. The onboarding experience to the ASF Slack is
indeed not ideal:
https://apisix.apache.org/docs/general/join#join-the-slack-channel
I'll see if we can improve it

On Mon, May 9, 2022 at 3:38 PM Martijn Visser 
wrote:

> As far as I recall you can't sign up for the ASF instance of Slack, you can
> only get there if you're a committer or if you're invited by a committer.
>
> On Mon, 9 May 2022 at 15:15, Robert Metzger  wrote:
>
> > Sorry for joining this discussion late, and thanks for the summary
> Xintong!
> >
> > Why are we considering a separate slack instance instead of using the ASF
> > Slack instance?
> > The ASF instance is paid, so all messages are retained forever, and quite
> > a few people are already on that Slack instance.
> > There is already a #flink channel on that Slack instance, that we could
> > leave as passive as it is right now, or put some more effort into it, on
> a
> > voluntary basis.
> > We could add another #flink-dev channel to that Slack for developer
> > discussions, and a private flink-committer and flink-pmc chat.
> >
> > If we are going that path, we should rework the "Community" and "Getting
> > Help" pages and explain that the mailing lists are the "ground truth
> tools"
> > in Flink, and Slack is only there to facilitate faster communication, but
> > it is optional / voluntary (e.g. a committers won't respond to DMs)
> >
> > All public #flink-* channels should be archived and google-indexable.
> > I've asked Jarek from Airflow who's maintaining
> > http://apache-airflow.slack-archives.org.
> > If we can't use slack-archives.org, it would be nice to find some
> > volunteers in the Flink community to hack a simple indexing tool.
> > The indexing part is very important for me, because of some bad
> > experiences with the Kubernetes experience, where most of the advanced
> > stuff is hidden in their Slack, and it took me a few weeks to find that
> > goldmine of information.
> >
> > Overall, I see this as an experiment worth doing, but I would suggest
> > revisiting it in 6 to 12 months: We should check if really all important
> > decisions are mirrored to the right mailing lists, and that we get the
> > benefits we hoped for (more adoption, better experience for users and
> > developers), and that we can handle the concerns (DMs to developers,
> > indexing).
> >
> >
> >
> >
> >
> > On Sat, May 7, 2022 at 12:22 PM Xintong Song 
> > wrote:
> >
> >> Thanks all for the valuable feedback.
> >>
> >> It seems most people are overall positive about using Slack for dev
> >> discussions, as long as they are properly reflected back to the MLs.
> >> - We definitely need a code of conduct that clearly specifies what
> people
> >> should / should not do.
> >> - Contributors pinging well-known reviewers /committers, I think that
> also
> >> happens now on JIRA / Github. Personally, I'd understand a no-reply as a
> >> "soft no". We may consider to also put that in the cod of conduct.
> >>
> >> Concerning using Slack for user QAs, it seem the major concern is that,
> we
> >> may end up repeatedly answering the same questions from different users,
> >> due to lack of capacity for archiving and searching historical
> >> conversations. TBH, I don't have a good solution for the archivability
> and
> >> searchability. I investigated some tools like Zapier [1], but none of
> them
> >> seems suitable for us. However, I'd like to share 2 arguments.
> >> - The purpose of Slack is to make the communication more efficient? By
> >> *efficient*, I mean saving time for both question askers and helpers
> with
> >> instance messages, file transmissions, even voice / video calls, etc.
> >> (Especially for cases where back and forth is needed, as David
> mentioned.)
> >> It does not mean questions that do not get enough attentions on MLs are
> >> now
> >> guaranteed to be answered immediately. We can probably put that into the
> >> code of conduct, and kindly guide users to first search and initiate
> >> questions on MLs.
> >> - I'd also like to share some experience from the Flink China community.
> >> We
> >> have 3 DingTalk groups with totally 25k members (might be less, I didn't
> >> do
> >> deduplication), posting hundreds of messages daily. What I'm really
> >> excited
> >> about is that, there are way more interactions between users & users
> than
> &

Re: [Discuss] Creating an Apache Flink slack workspace

2022-05-09 Thread Robert Metzger
Sorry for joining this discussion late, and thanks for the summary Xintong!

Why are we considering a separate slack instance instead of using the ASF
Slack instance?
The ASF instance is paid, so all messages are retained forever, and quite a
few people are already on that Slack instance.
There is already a #flink channel on that Slack instance, that we could
leave as passive as it is right now, or put some more effort into it, on a
voluntary basis.
We could add another #flink-dev channel to that Slack for developer
discussions, and a private flink-committer and flink-pmc chat.

If we are going that path, we should rework the "Community" and "Getting
Help" pages and explain that the mailing lists are the "ground truth tools"
in Flink, and Slack is only there to facilitate faster communication, but
it is optional / voluntary (e.g. a committers won't respond to DMs)

All public #flink-* channels should be archived and google-indexable.
I've asked Jarek from Airflow who's maintaining
http://apache-airflow.slack-archives.org.
If we can't use slack-archives.org, it would be nice to find some
volunteers in the Flink community to hack a simple indexing tool.
The indexing part is very important for me, because of some bad experiences
with the Kubernetes experience, where most of the advanced stuff is hidden
in their Slack, and it took me a few weeks to find that goldmine of
information.

Overall, I see this as an experiment worth doing, but I would suggest
revisiting it in 6 to 12 months: We should check if really all important
decisions are mirrored to the right mailing lists, and that we get the
benefits we hoped for (more adoption, better experience for users and
developers), and that we can handle the concerns (DMs to developers,
indexing).





On Sat, May 7, 2022 at 12:22 PM Xintong Song  wrote:

> Thanks all for the valuable feedback.
>
> It seems most people are overall positive about using Slack for dev
> discussions, as long as they are properly reflected back to the MLs.
> - We definitely need a code of conduct that clearly specifies what people
> should / should not do.
> - Contributors pinging well-known reviewers /committers, I think that also
> happens now on JIRA / Github. Personally, I'd understand a no-reply as a
> "soft no". We may consider to also put that in the cod of conduct.
>
> Concerning using Slack for user QAs, it seem the major concern is that, we
> may end up repeatedly answering the same questions from different users,
> due to lack of capacity for archiving and searching historical
> conversations. TBH, I don't have a good solution for the archivability and
> searchability. I investigated some tools like Zapier [1], but none of them
> seems suitable for us. However, I'd like to share 2 arguments.
> - The purpose of Slack is to make the communication more efficient? By
> *efficient*, I mean saving time for both question askers and helpers with
> instance messages, file transmissions, even voice / video calls, etc.
> (Especially for cases where back and forth is needed, as David mentioned.)
> It does not mean questions that do not get enough attentions on MLs are now
> guaranteed to be answered immediately. We can probably put that into the
> code of conduct, and kindly guide users to first search and initiate
> questions on MLs.
> - I'd also like to share some experience from the Flink China community. We
> have 3 DingTalk groups with totally 25k members (might be less, I didn't do
> deduplication), posting hundreds of messages daily. What I'm really excited
> about is that, there are way more interactions between users & users than
> between users & developers. Users are helping each other, sharing
> experiences, sending screenshots / log files / documentations and solving
> problems together. We the developers seldom get pinged, if not proactively
> joined the conversations. The DingTalk groups are way more active compared
> to the user-zh@ ML, which I'd attribute to the improvement of interaction
> experiences. Admittedly, there are questions being repeatedly asked &
> answered, but TBH I don't think that compares to the benefit of a
> self-driven user community. I'd really love to see if we can bring such
> success to the global English-speaking community.
>
> Concerning StackOverFlow, it definitely worth more attention from the
> community. Thanks for the suggestion / reminder, Piotr & David. I think
> Slack and StackOverFlow are probably not mutual exclusive.
>
> Thank you~
>
> Xintong Song
>
>
> [1] https://zapier.com/
>
>
>
> On Sat, May 7, 2022 at 9:50 AM Jingsong Li  wrote:
>
> > Most of the open source communities I know have set up their slack
> > channels, such as Apache Iceberg [1], Apache Druid [2], etc.
> > So I think slack can be worth trying.
> >
> > David is right, there are some cases that need to communicate back and
> > forth, slack communication will be more effective.
> >
> > But back to the question, ultimately it's about whether there are
> > enough core 

Re: Flink serialization errors at a batch job

2022-05-09 Thread Robert Metzger
Hi,

I suspect that this error is not caused by Flink code (because our
serializer stack is fairly stable, there would be more users reporting such
issues if it was a bug in Flink).
In my experience, these issues are caused by broken serializer
implementations (e.g. a serializer being used by multiple threads causing
issues; or a serializer somebow not being deterministic).

Maybe there's a bug in the "com.spotify.scio.coders.*" code? Have you
checked if these errors are known there?

On Tue, May 3, 2022 at 11:31 PM Yunus Olgun  wrote:

> Hi,
>
> We're running a large Flink batch job and sometimes it throws
> serialization errors in the middle of the job. It is always the same
> operator but the error can be different. Then the following attempts work.
> Or sometimes attempts get exhausted, then retrying the job.
>
> The job is basically reading a list of filenames, downloading them from
> GCS, doing a groupBy- reduce and then writing it. The error happens at the
> reducing operator.
>
> We use Flink 1.13.6 and Beam 2.35.
>
> 1 - Do you know what may be going wrong here or how to debug it further?
> 2 - Attempts require reading all data again. Is there any way to fasten
> the recovery time in cases like this?
>
> Thanks,
>
> >> Example stacktrace 1:
>
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce 
> (GroupReduce at groupByKey@{xxx}' , caused an error: 
> java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error 
> obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due 
> to an exception: Serializer consumed more bytes than the record had. This 
> indicates broken serialization. If you are using custom serialization types 
> (Value or Writable), check their serialization methods. If you are using a 
> Kryo-serialized type, check the corresponding Kryo serializer.
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:492)
>   at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:360)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>   at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: org.apache.flink.util.WrappingRuntimeException: 
> java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error 
> obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due 
> to an exception: Serializer consumed more bytes than the record had. This 
> indicates broken serialization. If you are using custom serialization types 
> (Value or Writable), check their serialization methods. If you are using a 
> Kryo-serialized type, check the corresponding Kryo serializer.
>   at 
> org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:260)
>   at 
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1227)
>   at 
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:105)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:484)
>   ... 4 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: Error obtaining the sorted input: Thread 
> 'SortMerger Reading Thread' terminated due to an exception: Serializer 
> consumed more bytes than the record had. This indicates broken serialization. 
> If you are using custom serialization types (Value or Writable), check their 
> serialization methods. If you are using a Kryo-serialized type, check the 
> corresponding Kryo serializer.
>   at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown 
> Source)
>   at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source)
>   at 
> org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:257)
>   ... 7 more
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: 
> Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer 
> consumed more bytes than the record had. This indicates broken serialization. 
> If you are using custom serialization types (Value or Writable), check their 
> serialization methods. If you are using a Kryo-serialized type, check the 
> corresponding Kryo serializer.
>   at 
> org.apache.flink.runtime.operators.sort.ExternalSorter.lambda$getIterator$1(ExternalSorter.java:254)
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniExceptionally(Unknown 
> Source)
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(Unknown
>  Source)
>   at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown 
> Source)
>   at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown
>  Source)
>   at 
> org.apache.flink.runtime.operators.sort.ExternalSorterBuilder.lambda$doBuild$1(ExternalSorterBuilder.java:392)
>   at 
> 

Re: Practical guidance with Scala and Flink >= 1.15

2022-05-09 Thread Robert Metzger
Hi Salva,
my somewhat wild guess (because I'm not very involved with the Scala
development on Flink): I would stick with option 1 for now. It should be
easier now for the Flink community to support Scala versions past 2.12
(because we don't need to worry about scala 2.12+ support for Flink's
internal dependencies such as akka).
An argument against supporting newer Scala versions is that I'm not aware
of anybody currently working on Flink with Scala in general.

On Fri, May 6, 2022 at 6:37 PM Salva Alcántara 
wrote:

> I've always used Scala in the context of Flink. Now that Flink 1.15 has
> become Scala-free, I wonder what is the best (most practical) route for me
> moving forward. These are my options:
>
> 1. Keep using Scala 2.12 for the years to come (and upgrade to newer
> versions when the community has come up with something). How long is Flink
> expected to support Scala 2.12?
>
> 2. Upgrade to Scala 2.13 or Scala 3 and use the Java API directly (without
> any Scala-specific wrapper/API). How problematic will that be, especially
> regarding type information & scala-specific serializers? I hate those
> "returns" (type hints) in the Java API...
>
> 3. Switch to Java, at least for the time being...
>
> To be clear, I have a strong preference for Scala over Java, but I'm
> trying to look at the "grand scheme of things" here, and be pragmatic. I
> guess I'm not alone here, and that many people are indeed evaluating the
> same pros & cons. Any feedback will be much appreciated.
>
> Thanks in advance!
>


Re: WatermarkStrategy for IngestionTime

2022-04-05 Thread Robert Metzger
Hi,

IngestionTime is usually used when the records don't have a proper event
time associated with it, but the job has a long topology, and users want to
persist the (time)order of events as they arrive in the system.
In that sense, you can use the regular event time watermark strategies also
for ingestion time.
Afaik ingestion time is rarely used in practice.

On Tue, Apr 5, 2022 at 12:10 AM Xinbin Huang  wrote:

> Hi,
>
> Since *TimeCharacteristic,* is deprecated.
>
> AFAIK,
> - TimeCharacteristic*.*ProcessingTime -> WatermarkStrategy.noWatermarks()
> - TimeCharacteristic*.*EventTime ->
> WatermarkStrategy.forBoundedOutOfOrderness()
> - TimeCharacteristic*.*IngestionTime -> ???
>
> Do we have a built-in *WatermarkStrategy *equivalent for the purpose?
>
> Thanks
> Bin
>
>


Re: Flink metric

2022-04-05 Thread Robert Metzger
Hi,

multiple records are in the system at the same time, because Flink is
buffering records in various components, for efficiency reasons. That's why
you see that an individual record might have a latency of ~100ms, while
Flink is processing many more messages.


On Tue, Apr 5, 2022 at 12:54 PM long ha dac  wrote:

> Hi,
> I'm trying to work with monitoring a job on Flink. Just a simple process:
> a data source from kafka joining with a data stream table (load from
> database).
> I've already enabled the latency tracking like the instruction from:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/metrics/#end-to-end-latency-tracking
> What I want to ask here is: The latency mean of the operator is about
> 100ms however the throughput is about 300 records/second (this value I got
> from the metric of the operator: 300 records/second for both records in and
> records out). What I understand about the latency tracking is: this value
> only measured the waiting time, so normally it should be less than the real
> latency of the operator. So how could it process 300 records/second (about
> 3ms for 1 message) with the latency tracked mean is 100ms ?
>
> Thanks in advanced.
>


Re: BigQuery connector debugging

2022-04-05 Thread Robert Metzger
Hi Matt,

At first glance your code looks fine. I guess you'll need to follow the
codepaths more with the debugger.
Have you made sure that "reachedEnd()" returns false?


On Tue, Apr 5, 2022 at 9:42 AM Matthew Brown  wrote:

> Hi all,
>
> I'm attempting to build a Table API connector for BigQuery using the
> BigQuery Storage API (
> https://cloud.google.com/bigquery/docs/reference/storage).
>
> I've got a base structure built out at
> https://github.com/mnbbrown/bigquery-connector
> There's a couple of things I have to do yet like correcting the mapping
> between the BigQuery avro schema and flink TypeInformation, and add a test
> suite.
>
> I've added it to an internal project I'm working on and "nextRecord" on
> the InputFormat is never called. I can see open/close,
> openInputFormat/closeInputFormat, etc being called correctly.
>
> Does anybody have any debugging tips?
>
> Thanks,
> Matt.
>
> --
> --
> AU: +61 459 493 730
> UK: +44 7927 618921
> @mnbbrown
>


Re: Example for Jackson JsonNode Kafka serialization schema

2022-01-28 Thread Robert Metzger
Hi Oran,

as you've already suggested, you could just use a (flat)map function that
takes an ObjectNode and outputs a string.
In the mapper, you can do whatever you want in case of an invalid object:
logging about it, discarding it, writing an "error json string", writing to
a side output stream, ...


On Tue, Jan 25, 2022 at 12:38 PM Oran Shuster 
wrote:

> In the documentation we have an example on how to implement
> deserialization from bytes to Jackson ObjectNode objects
> - JSONKeyValueDeserializationSchema
>
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/
>
> However, there is no example on the other direction: Taking an
> ObjectNode/JsonNode (or just any POJO class) and using Jackson to serialize
> it to string
>
> You can write a simple schema like so
>
>
> public class JSONKafkaSerializationSchema implements
> KafkaSerializationSchema {
> private final ObjectMapper objectMapper = new ObjectMapper();
>
> @Override
> public ProducerRecord serialize(JsonNode element,
> @Nullable Long timestamp) {
> String topic = getTargetTopic(element);
>
> byte[] value;
>
> try {
> value = objectMapper.writeValueAsBytes(element);
> return new ProducerRecord<>(topic, value);
> } catch (JsonProcessingException e) {
> return null;
> }
> }
>
> private String getTargetTopic(JsonNode jsonNode) {
> return jsonNode.get("topic").asText();
> }
> }
>
> But this raises a question - What to do when a serialization fails?
> if the input class is a simple POJO then Jackson should always succeed in
> converting to bytes but that's not 100% guaranteed.
> In case of failures, can we return null and the record will be discarded?
> Null values are discarded in the case of the deserialization schema, from
> the documentation - "Returns: The deserialized message as an object (null
> if the message cannot be deserialized)."
> If this is not possible, what is the proper way to serialize Jackson
> objets into bytes in flink? Its possible to convert everything to String
> before the kafka producer but then any logic to determine the topic we need
> to send to will need to deserialize the string again
>


Re: Upgrade to 1.14.3

2022-01-28 Thread Robert Metzger
Hi Sweta,

yes, you can not run a Flink job compiled against Flink 1.13. against a
1.14 cluster. But if you are only using stable APIs of Flink, you should be
able to compile your job with the 1.14 dependencies without touching the
code.

See also:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/upgrading/

> - How do I upgrade to 1.14.3 cluster without loosing running apps state?
I have even tried doing savepoint that did not revive the job.

Using a savepoint created with a job compiled against Flink 1.13, and
restoring it with a job compiled against 1.14 is the recommended approach.


On Tue, Jan 25, 2022 at 3:37 PM Sweta Kalakuntla 
wrote:

> Hi Ingo,
>
> So basically, I cannot deploy an older version of flink job in 1.14.3
> flink cluster, is it?
>
> Thanks,
> Sweta
>
> On Tue, Jan 25, 2022 at 4:02 AM Ingo Bürk  wrote:
>
>> Hi Sweta,
>>
>> there was a non-compatible change to SourceReaderContext#metricGroup in
>> the 1.14.x release line; I assume this is what you are seeing.
>>
>> Did you make sure to update the connector (and any other) dependencies
>> as well?
>>
>>
>> Best
>> Ingo
>>
>> On 25.01.22 05:36, Sweta Kalakuntla wrote:
>> > Hi,
>> >
>> > We are on flink 1.13.3 and trying to upgrade  the cluster to 1.14.3
>> > version. I see that job(on 1.13.3) is unable to start up because it
>> says
>> > it couldn't find metrics group(inside flinkkafkaconsumer class).
>> >
>> > - can I deploy 1.13.3 job on 1.14.3 cluster?
>> > - can I deploy 1.14.3 job on 1.13.3 cluster?
>> > - How do I upgrade to 1.14.3 cluster without loosing running apps
>> state?
>> > I have even tried doing savepoint that did not revive the job.
>> >
>> > Thank you,
>> > Sweta
>>
>


Re: IllegalArgumentException: URI is not hierarchical error when initializating jobmanager in cluster

2022-01-28 Thread Robert Metzger
Hi Javier,

I suspect that TwitterServer is using some classloading / dependency
injection / service loading "magic" that is causing this.
I would try to find out, either by attaching a remote debugger (should be
possible when executing in cluster mode locally) or by adding log
statements in the code, what the URI it's trying to load looks like.

On the cluster, Flink is using separate classloaders for the base flink
system, and the user code (as opposed to executing in the IDE, where
everything is loaded from the same loader). Check out this page and try out
the config arguments:
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/



On Wed, Jan 26, 2022 at 4:13 AM Javier Vegas  wrote:

> I am porting a Scala service to Flink in order to make it more scalable
> via running it in a cluster. All my Scala services extends a base Service
> class that extends TwitterServer (
> https://github.com/twitter/twitter-server/blob/develop/server/src/main/scala/com/twitter/server/TwitterServer.scala)
> and that base class contains a lot of logic about resource initialization,
> logging, stats and error handling, monitoring, etc that I want to keep
> using in my class. I ported my logic to Flink sources and sinks, and
> everything worked fine when I ran my class in single JVM mode either from
> sbt or my IDE, Flink's jobmanager and taskmanagers start and run my app.
> But when I try to run my application in cluster mode, when launching my
> class with "./bin/standalone-job.sh start --job-classname" the jobmanager
> runs into a "IllegalArgumentException: URI is not hierarchical" error on
> initialization, apparently because TwitterServer is trying to load
> something from the class path (see attached full log).
>
> Is there anything I can do to run a class that extends TwitterServer in a
> Flink cluster? I have tried making my class not extend it and it worked
> fine, but I really want to keep using all the common infraestructure logic
> that I have in my base class that extends TwitterServer.
>
> Thanks!
>


Re: Duplicate job submission error

2022-01-28 Thread Robert Metzger
Hi Parag,

it seems that you are submitting a job with the same job id multiple times.
An easy fix would be generating a new job id each time you are submitting
the job.

To debug this: check out the Flink jobmanager logs, there are log messages
for every job submission.


On Thu, Jan 27, 2022 at 9:16 AM Parag Somani  wrote:

> Hello All,
>
> While deploying on our one of environment, we encountered crashloopback of
> job manager pod.
> Env: K8s
> Flink: 1.14.2
>
> Could you suggest, how can we troubleshoot this and possible handling of
> this?
>
>
> exception snipper as follows:
>
> 2022-01-27 06:58:07.326 ERROR 44 --- [lt-dispatcher-4]
> c.b.a.his.service.FlinkExecutorService   : Failed to execute job
>
> org.apache.flink.util.FlinkException: Failed to execute job 'events rates
> calculation'.
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2056)
> ~[flink-streaming-java_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:137)
> ~[flink-clients_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
> ~[flink-clients_2.12-1.14.2.jar:1.14.2]
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1917)
> ~[flink-streaming-java_2.12-1.14.2.jar:1.14.2]
> at
> com.bmc.ade.his.service.FlinkExecutorService.init(FlinkExecutorService.java:37)
> ~[health-service-1.0.00.jar:1.0.00]
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method) ~[na:na]
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[na:na]
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[na:na]
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> ~[na:na]
> at
> org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:389)
> ~[spring-beans-5.3.10.jar:5.3.10]
> at
> org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:333)
> ~[spring-beans-5.3.10.jar:5.3.10]
> at
> org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:157)
> ~[spring-beans-5.3.10.jar:5.3.10]
> at
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:440)
> ~[spring-beans-5.3.10.jar:5.3.10]
> at
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1796)
> ~[spring-beans-5.3.10.jar:5.3.10]
> at
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:620)
> ~[spring-beans-5.3.10.jar:5.3.10]
> at
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:542)
> ~[spring-beans-5.3.10.jar:5.3.10]
> at
> org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335)
> ~[spring-beans-5.3.10.jar:5.3.10]
> at
> org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
> ~[spring-beans-5.3.10.jar:5.3.10]
> at
> org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333)
> ~[spring-beans-5.3.10.jar:5.3.10]
> at
> org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208)
> ~[spring-beans-5.3.10.jar:5.3.10]
> at
> org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:944)
> ~[spring-beans-5.3.10.jar:5.3.10]
> at
> org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:918)
> ~[spring-context-5.3.10.jar:5.3.10]
> at
> org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:583)
> ~[spring-context-5.3.10.jar:5.3.10]
> at
> org.springframework.boot.SpringApplication.refresh(SpringApplication.java:754)
> ~[spring-boot-2.5.5.jar:2.5.5]
> at
> org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:434)
> ~[spring-boot-2.5.5.jar:2.5.5]
> at
> 

Re: Inaccurate checkpoint trigger time

2022-01-28 Thread Robert Metzger
Hi Paul,

where are you storing your checkpoints, and what's their size?

IIRC, Flink won't trigger a new checkpoint before the old ones haven't been
cleaned up, and if your checkpoints are large and stored on S3, it can take
a while to clean them up (especially with the Hadoop S3 plugin, using
presto s3 is faster).




On Thu, Jan 27, 2022 at 10:56 AM Paul Lam  wrote:

> Hi Yun,
>
> Sorry for the late reply. I finally found some time to investigate this
> problem further. I upgraded the job to 1.14.0, but it’s still the same.
>
> I’ve checked the debug logs, and I found that Zookeeper notifies watched
> event of checkpoint id changes very late [1]. Each time a checkpoint
> finished, it would take minutes before the Zookeeper client notices the
> checkpoint ID is changed.
>
> I suspect the checkpoint coordinator is blocking on incrementing
> checkpoint ID on Zookeeper [2]. But with no luck, there’s no many relevant
> logs can help me prove that.
>
> What do you think of this? Thanks a lot!
>
> [1] https://gist.github.com/link3280/5072a054a43b40ba28891837a8fdf995
> [2]
> https://github.com/apache/flink/blob/90e850301e672fc0da293abc55eb446f7ec68ffa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L743
>
> Best,
> Paul Lam
>
> 2021年11月23日 16:49,Paul Lam  写道:
>
> Hi Yun,
>
> Thanks a lot for your pointers! I’ll try it out as you suggested and then
> get back to you.
>
> Best,
> Paul Lam
>
> 2021年11月23日 16:32,Yun Tang  写道:
>
> Hi Paul,
>
> This is really weird, from what I know, flink-1.11.0 has a problem of
> handling min-pause time [1] and this should be resolved in flink-1.12.1.
>
> Could you open the debug log level for org.apache.flink.runtime.checkpoint
> and use jmap or byteman to get the field value
> of CheckpointCoordinator#lastCheckpointCompletionRelativeTime, 
> CheckpointRequestDecider#minPauseBetweenCheckpoints
> and SystemClock#relativeTimeMillis in method 
> CheckpointRequestDecider#nextTriggerDelayMillis
> [2] to see any unexpected behavior.
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-18856
> [2]
> https://github.com/apache/flink/blob/90e850301e672fc0da293abc55eb446f7ec68ffa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java#L182
>
>
> Best
> Yun Tang
>
> --
> *From:* Paul Lam 
> *Sent:* Tuesday, November 23, 2021 14:35
> *To:* user 
> *Subject:* Inaccurate checkpoint trigger time
>
> Hi,
>
> Recently I’ve noticed a job has nondeterministic checkpoint trigger time.
>
> The jobs is using Flink 1.12.1 with FsStateBackend and is of 650
> parallelism. It was configured to trigger checkpoint every 150 seconds with
> 0 pause time and no concurrent checkpoints. However there’re obvious errors
> in the checkpoint trigger times, as the actual interval may vary from 30
> seconds to 6 minutes.
>
> The jobmanager logs are good, and no error logs is found. Some of the
> output are as follow:
>
> 2021-11-23 13:51:46,438 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 1446 for job f432b8d90859db54f7a79ff29a563ee4 (47142264825 bytes
> in 22166 ms).
> 2021-11-23 13:57:21,021 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 1447 (type=CHECKPOINT) @ 1637647040653 for job
> f432b8d90859db54f7a79ff29a563ee4.
> 2021-11-23 13:57:43,761 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 1447 for job f432b8d90859db54f7a79ff29a563ee4 (46563195101 bytes
> in 21813 ms).
> 2021-11-23 13:59:09,387 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 1448 (type=CHECKPOINT) @ 1637647149157 for job
> f432b8d90859db54f7a79ff29a563ee4.
> 2021-11-23 13:59:31,370 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 1448 for job f432b8d90859db54f7a79ff29a563ee4 (45543757702 bytes
> in 20354 ms).
> 2021-11-23 14:06:37,916 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 1449 (type=CHECKPOINT) @ 1637647597704 for job
> f432b8d90859db54f7a79ff29a563ee4.
> 2021-11-23 14:07:03,157 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 1449 for job f432b8d90859db54f7a79ff29a563ee4 (45662471025 bytes
> in 23779 ms).
> 2021-11-23 14:07:05,838 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 1450 (type=CHECKPOINT) @ 1637647625640 for job
> f432b8d90859db54f7a79ff29a563ee4.
> 2021-11-23 14:07:30,748 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 1450 for job f432b8d90859db54f7a79ff29a563ee4 (46916136024 bytes
> in 22998 ms).
> 2021-11-23 14:13:09,089 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 1451 (type=CHECKPOINT) @ 1637647988831 for job
> f432b8d90859db54f7a79ff29a563ee4.
> 2021-11-23 14:13:38,411 INFO
> 

Re: Determinism of interval joins

2022-01-28 Thread Robert Metzger
Instead of using `reinterpretAsKeyedStream` can you use keyBey and see if
the behavior gets deterministic?

On Thu, Jan 27, 2022 at 9:49 PM Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> wrote:

> I'm not sure if the issue in [1] is relevant since it mentions the Table
> API, but it could be. Since stream1 and stream2 in my example have a long
> chain of operators behind, I presume they might "run" at very different
> paces.
>
> Oh and, in the context of my unit tests, watermarks should be
> deterministic, the input file is sorted, and the watermark strategies
> should essentially behave like the monotonous generator.
>
> [1] https://issues.apache.org/jira/browse/FLINK-24466
>
> Regards,
> Alexis.
>
> --
> *From:* Alexis Sarda-Espinosa 
> *Sent:* Thursday, January 27, 2022 1:30 PM
> *To:* user@flink.apache.org 
> *Subject:* Determinism of interval joins
>
>
> Hi everyone,
>
>
>
> I’m seeing a lack of determinism in unit tests when using an interval
> join. I am testing with both Flink 1.11.3 and 1.14.3 and the relevant bits
> of my pipeline look a bit like this:
>
>
>
> keySelector1 = …
>
> keySelector2 = …
>
>
>
> rightStream = stream1
>
>   .flatMap(…)
>
>   .keyBy(keySelector1)
>
>   .assignTimestampsAndWatermarks(strategy1)
>
>
>
> leftStream = stream2
>
>   .keyBy(keySelector2)
>
>   .assignTimestampsAndWatermarks(strategy2)
>
>
>
> joinedStream = DataStreamUtils.reinterpretAsKeyedStream(leftStream,
> keySelector2)
>
>   .intervalJoin(DataStreamUtils.reinterpretAsKeyedStream(rightStream,
> keySelector1))
>
>   .between(Time.minutes(-10L), Time.milliseconds(0L))
>
>   .lowerBoundExclusive()
>
>   .process(new IntervalJoinFunction(…))
>
>
>
> ---
>
>
>
> In my tests, I have a bounded source that loads demo data from a file and
> simulates the stream with a sink that collects results in memory. In the
> specific case of my IntervalJoinFunction, I’m seeing that it’s called a
> different amount of times in a non-deterministic way, sometimes I see 14
> calls to its processElement() method, others 8, others none at all and my
> output is empty; I count this by checking my logs with some tracing.
>
>
>
> Does anyone know why this is? Maybe I’m doing something wrong,
> particularly with reinterpretAsKeyedStream.
>
>
>
> Regards,
>
> Alexis.
>
>
>


Re: Flink native k8s integration vs. operator

2022-01-20 Thread Robert Metzger
Hi Alexis,

The usage of Custom Resource Definitions (CRDs). The main reason given to
> me was that such resources are global (for a given cluster) and that is not
> desired. I know that ultimately a CR based on a CRD can be scoped to a
> specific namespace, but customer is king…


I don't think this restriction applies to many organizations. K8s operators
are the de facto standard for deploying all kinds of software. There are
quite many projects that used to just have a Helm chart, that are now
switching over to provide operators, because they provide a much better
experience.
If you have more specifics on this concern that is relevant for the Flink
community, I'd like to hear that.


Kubernetes Service Accounts (SAs) with roles to create deployments/pods.
> This one is more understandable, particularly after the whole log4j
> debacle. Roles that manage solely deployment.scale subresources would be
> acceptable though.


This requirement is not strictly needed to deploy Flink on K8s. Only with
the native K8s integration of Flink, you need to give the Flink JVM a role
that allows creating other pods.


Best,
Robert

On Tue, Jan 18, 2022 at 5:18 PM Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> wrote:

> Hi everyone,
>
>
>
> Since I see this is getting some traction, I’d like to add a couple
> things. I had been developing a Kubernetes controller for Flink as a Proof
> of Concept at my company; I called it Flork because it was to be a Flink
> Orchestrator for Kubernetes. In the end, we will most likely not use this
> controller due to security concerns that were communicated to me. These
> concerns stem from the fact that our product would be used by customers in
> their own Kubernetes clusters, and many customers don’t want:
>
>
>
> - The usage of Custom Resource Definitions (CRDs). The main reason given
> to me was that such resources are global (for a given cluster) and that is
> not desired. I know that ultimately a CR based on a CRD can be scoped to a
> specific namespace, but customer is king…
>
>
>
> - Kubernetes Service Accounts (SAs) with roles to create deployments/pods.
> This one is more understandable, particularly after the whole log4j
> debacle. Roles that manage solely deployment.scale subresources would be
> acceptable though.
>
>
>
> I mention these in case they prove to be relevant for others in the
> current context. For us, it means we have to stick with something like
> standalone Kubernetes + reactive/adaptive.
>
>
>
> Nevertheless, the PoC I had was already functional and, while I would have
> to request permission to contribute the code to the community, it might be
> useful for these efforts. However, I’d first ask if there is actually
> interest in this code, considering these are some of the “features” it
> currently has:
>
>
>
> * The CRD relies on the Pod Template support included in Flink itself. As
> such, some of the fields in the CRD are “vanilla” pod specs, and the schema
> reflects that because it embeds a flattened version of the schema from [1].
> I’d also have a basic Helm chart ready.
>
>
>
> * The code is written in a mixture of Java and Kotlin, and is built with
> Gradle. I made heavy use of Kotlin Coroutines to implement some of the core
> logic in a non-blocking way.
>
>
>
> * The code already supports High Availability by leveraging Kubernetes
> leases and the corresponding helpers in Fabric8’s client.
>
>
>
> * The main deployment logic is delegated to Flink’s own flink-kubernetes
> module [2]. Nevertheless, my build shadows all the fabric8 classes and
> service definitions embedded in said module, so that the rest of the code
> can use other kubernetes-client versions independently.
>
>
>
> * The controller handles savepoint creation for redeployments after CR
> changes, e.g. upgrades. This would also work after controller fail-over
> with/without HA.
>
>
>
> * The code supports some extension for custom container images: classes
> defined in META-INF/services/ files are called as decorators for Flink’s
> conf file and/or the pod specs defined in the CR, and they could be copied
> to the image on top of an official base version.
>
>
>
> * A deployment mode without CRD could be supported --- I have some code
> that can run on top of the core controller and allows “embedding” a CR in a
> Config Map key. The translation between the CM and the core controller code
> is then done transparently.
>
>
>
> * I have a module that integrates the code with Inversion of Control
> containers such as Spring. I only used javax annotations (soon to be
> jakarta), so it’s not tied to Spring.
>
>
>
> Something I haven’t considered at all in my code is ingress for Flink’s UI.
>
>
>
> Let me know what you think.
>
>
>
> [1]
> https://github.com/kubernetes/kubernetes/blob/master/api/openapi-spec/swagger.json
>
> [2] https://github.com/apache/flink/tree/master/flink-kubernetes
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* Gyula Fóra 
> *Sent:* Montag, 17. Januar 2022 

Re: Flink (DataStream) in Kubernetes

2022-01-18 Thread Robert Metzger
Hi Jessy,

Which approach is suitable for a standalone deployment in Kubernetes? Do we
> have some best practises for running Flink applications on K8s ?


I would deploy Flink in Application Mode using the standalone K8s
deployment:
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/

We are planning to connect the source S1, S2 and S3 using Union Operator.
> And these sources have different parallelism settings, equal to the
> available kafka partitions. And the downstream process function has the
> same parallelism as the real-time kafka source S1. Is it a good idea to
> apply union on streams with different parallelisms ?.


I recommend just starting with a global parallelism, which is the same for
all operators. Only if you see performance issues, you can start
incrementally tweaking the parallelism of individual operators.
It is not a problem to have different parallelisms for the union operator.

All events required the entire rules for processing the data, hence keeping
> this in rocksdb is not possible. Is it a good approach to keep a large
> state in broadcast-state?.


I would not consider 740mb large state. That easily fits into memory.

Is there any relation between incremental checkpoints and maximum number of
> completed checkpoints (state.checkpoints.num-retained) ?


I don't think so.

Will the entire state be checkpointed every time irrespective of the delta
> between the checkpoints if I have enabled incremental checkpoints for my
> rocksdb state backend and set the maximum number of completed checkpoints
> to 1 ?


No, Flink will create incremental checkpoints.

On Tue, Jan 18, 2022 at 2:41 PM Jessy Ping 
wrote:

> Hi Team,
> Any insights for below mail will be helpful.
>
> Thanks
> Jessy
>
> On Fri, Jan 14, 2022, 11:09 PM Jessy Ping 
> wrote:
>
>> Hi Team,
>>
>> We are planning to run the below pipeline as a standalone Flink
>> application cluster on kubernetes. It will be better if the community can
>> share their insights regarding the below questions.
>>
>> [image: image.png]
>> We can describe the pipeline as follows,
>>
>>1. Combine the realtime streams from S1, enrichment data from S2 and
>>S3 using Union Operator. Partition the stream based on value1 for keeping
>>the enrichment data locally available.
>>2. Broadcast the rules to process the data from S4.
>>3. Connect the above two streams(1&2) and process the real time
>>events from S1 using the enrichment data from S2 and S3 stored in rocksDB
>>state as per the rules stored in broadcast state inside the keyed 
>> broadcast
>>process function.
>>4. Produce the transformed results to a Kafka Sink.
>>
>> Note: Kafka Source S1 has 32 partitions. Suppose we have 1 million
>> distinct keys and expect 10k events/s from S1.
>>
>> Approach 1: Application cluster with 16 task managers. Each task manager
>> has 2 slots and 2 CPUs.
>> Approach 2: Application cluster with 2 task managers. Each task manager
>> has 16 slots and 16 CPUs.
>>
>> *Questions*
>>
>>- Which approach is suitable for a standalone deployment in
>>Kubernetes? Do we have some best practises for running Flink applications
>>on K8s ?
>>- We are planning to connect the source S1, S2 and S3 using Union
>>Operator. And these sources have different parallelism settings, equal to
>>the available kafka partitions. And the downstream process function has 
>> the
>>same parallelism as the real-time kafka source S1. Is it a good idea to
>>apply union on streams with different parallelisms ?.
>>- The size of the broadcast state is around 20mb, so the checkpoint
>>size of the broadcast state will be 740mb ( maximum parallelism * size, 
>> 32*
>>20 ). All events required the entire rules for processing the data, hence
>>keeping this in rocksdb is not possible. Is it a good approach to keep a
>>large state in broadcast-state?.
>>- Is it a good practice to use a singleton pattern in Flink to create
>>a local cache of the rules inside the open method of process function ?. 
>> If
>>data losses due to restart i can repopulate the data using an external
>>call. Can I keep these kinds of local caches(created inside open method)
>>safely for the entire lifetime of a particular pod/task manager ?
>>- Is there any relation between incremental checkpoints and maximum
>>number of completed checkpoints (state.checkpoints.num-retained) ?
>>- Will the entire state be checkpointed every time irrespective of
>>the delta between the checkpoints if I have enabled incremental 
>> checkpoints
>>for my rocksdb state backend and set the maximum number of completed
>>checkpoints to 1 ?
>>
>> Thanks
>> Jessy
>>
>>


Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-07 Thread Robert Metzger
Hi Ayush,

I couldn't find the documentation you've mentioned. Can you send me a link
to it?

On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan 
wrote:

> Hi,
>
> Can you please let me know the alternatives of isEndOfStream() as now
> according to docs this method will no longer be used to determine the end
> of the stream.
>
> --
>  Ayush Chauhan
>  Data Platform
>  [image: mobile-icon]  +91 9990747111
>
>
> This email is intended only for the person or the entity to whom it is
> addressed. If you are not the intended recipient, please delete this email
> and contact the sender.
>


Re: Customize Kafka client (module.yaml)

2021-12-07 Thread Robert Metzger
Hi Jérémy,

In my understanding of the StateFun docs, you need to pass custom
properties using "ingress.spec.properties".
For example:

ingresses:
  - ingress:
  meta:
type: io.statefun.kafka/ingress
id: project.A/input
  spec:
properties:

  max.request.size: 11000

(or the nested variant?)



On Tue, Dec 7, 2021 at 4:31 PM Jérémy Albrecht  wrote:

> Hi All,
>
> I encounter a blocking problem linked to exchanging messages between
> Stateful functions.
> The context is: I am sending a very large payload from a Stateful Function
> to a Kafka topic. I am blocked by the Kafka client (I think) because here
> is the output of the statefun-manager container:
> Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The
> message is 6660172 bytes when serialized which is larger than the maximum
> request size you have configured with the max.request.size configuration.
>
> Now if I take a look at the documentation (
> https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/io/apache-kafka/)
>  they
> refer to the Confluent doc to customize the configuration of the Kafka
> client. It is unclear on how to implement this into the module.yaml file. I
> tried several ways:
>
> ingresses:
>   - ingress:
>   meta:
> type: io.statefun.kafka/ingress
> id: project.A/input
>   spec:
> max:
>   request:
> size: 104857600
> max.request.size: 11000
> message:
>   max:
> bytes: 104857600
> address: kafka:9092
> consumerGroupId: my-consumer-group
> startupPosition:
>   type: earliest
> topics:
>   - topic: entry # used for retrop-compatibility, to be removed in 
> next release
> valueType: project.A/Message
> targets:
>   - project.redacted/Entry
>
>
> None of the above solutions seems to be working.
> Does anyone have the ability to clarify what I am not doing correctly ?
>
> Thanks in advance,
> Jérémy
>


Re: Which issue or commit should i merge in flink-1.13.3 for buffer debloating?

2021-12-07 Thread Robert Metzger
Hi,

I guess all the commits mentioned in all the subtasks of this ticket will
give you the feature: https://issues.apache.org/jira/browse/FLINK-23451

Hower, I'm pretty sure that you can't just cherry-pick such a big feature
to an older Flink version.

I would rather try to fix the connector to upgrade to 1.14.

On Wed, Dec 8, 2021 at 4:07 AM vtygoss  wrote:

> Hi community!
>
>
> Because of the limitation of connector, i couldn't upgrade apache flink
> from version 1.13.3 to versin 1.14.0. But i really need the important
> feature of buffer debloating in 1.14.0 for heavy checkpoint at
> backpressure.
>
>
> So which issue or commit should i merge in flink-1.13.3 for buffer
> debloating?
>
>
> - [FLINK-24189] Perform buffer debloating per single gate
>
> - PR-17024 mentioned in  https://issues.apache.org/jira/browse/FLINK-23973
>
>
> Thanks for your any reply or suggestion.
>
>
> Best Regards!
>


Re: Weird operator ID check restore from checkpoint fails

2021-12-07 Thread Robert Metzger
811d3b279c8b26ed99ff0883b7630242 is the operator id.
If I'm not mistaken, running the job graph generation (e.g. the main
method) in DEBUG log level will show you all the IDs generated. This should
help you map this ID to your code.

On Wed, Dec 8, 2021 at 7:52 AM Dan Hill  wrote:

> Nothing changed (as far as I know).  It's the same binary and the same
> args.  It's Flink v1.12.3.  I'm going to switch away from auto-gen uids and
> see if that helps.  The job randomly started failing to checkpoint.  I
> cancelled the job and started it from the last successful checkpoint.
>
> I'm confused why `811d3b279c8b26ed99ff0883b7630242` is used and not the
> auto-generated uid.  That seems like a bug.
>
> On Tue, Dec 7, 2021 at 10:40 PM Robert Metzger 
> wrote:
>
>> Hi Dan,
>>
>> When restoring a savepoint/checkpoint, Flink is matching the state for
>> the operators based on the uuid of the operator. The exception says that
>> there is some state that doesn't match any operator. So from Flink's
>> perspective, the operator is gone.
>> Here is more information:
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#assigning-operator-ids
>>
>>
>> Somehow something must have changed in your job: Did you change the Flink
>> version?
>>
>> Hope this helps!
>>
>> On Wed, Dec 8, 2021 at 5:49 AM Dan Hill  wrote:
>>
>>> I'm restoring the job with the same binary and same flags/args.
>>>
>>> On Tue, Dec 7, 2021 at 8:48 PM Dan Hill  wrote:
>>>
>>>> Hi.  I noticed this warning has "operator
>>>> 811d3b279c8b26ed99ff0883b7630242" in it.  I assume this should be an
>>>> operator uid or name.  It looks like something else.  What is it?  Is
>>>> something corrupted?
>>>>
>>>>
>>>> org.apache.flink.runtime.client.JobInitializationException: Could not 
>>>> instantiate JobManager.
>>>>at 
>>>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)
>>>>at 
>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>>>>at 
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>at 
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.lang.IllegalStateException: Failed to rollback to 
>>>> checkpoint/savepoint 
>>>> s3a://my-flink-state/checkpoints/ce9b90eafde97ca4629c13936c34426f/chk-626. 
>>>> Cannot map checkpoint/savepoint state for operator 
>>>> 811d3b279c8b26ed99ff0883b7630242 to the new program, because the operator 
>>>> is not available in the new program. If you want to allow to skip this, 
>>>> you can set the --allowNonRestoredState option on the CLI.
>>>>at 
>>>> org.apache.flink.runtime.checkpoint.Checkpoints.throwNonRestoredStateException(Checkpoints.java:226)
>>>>at 
>>>> org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:190)
>>>>at 
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1627)
>>>>at 
>>>> org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:362)
>>>>at 
>>>> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:292)
>>>>at 
>>>> org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:249)
>>>>at 
>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:133)
>>>>at 
>>>> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)
>>>>at 
>>>> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)
>>>>at 
>>>> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:330)
>>>>at 
>>>> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)
>>>>at 
>>>> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)
>>>>at 
>>>> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:162)
>>>>at 
>>>> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)
>>>>at 
>>>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)
>>>>... 4 more
>>>>
>>>>


Re: Job Listener not working as expected

2021-12-07 Thread Robert Metzger
Hi Puneet,

Are you submitting the Flink jobs using the "/bin/flink" command line tool
to a cluster in session mode?
Maybe the command line tool is just "fire and forget" submitting the job to
the cluster, that's why the listeners are firing immediately.
Can you try to use "env.executeAsync()" instead of "execute()"? (Sorry, I
don't have time right now to experiment with this myself). In either case,
the command line tool needs to stay connected to the cluster to listen to
the job status.
What probably works is using the Application Mode, instead of Session Mode.
In AppMode, the main() method runs on the cluster.

Best,
Robert


On Wed, Dec 8, 2021 at 4:55 AM Puneet Duggal 
wrote:

> Hi,
>
> I have registered a job listener which notifies slack with JobId on
> successful submission. Also it notifies slack on successful/failed
> Execution. Now this job listener is working as expected when running on
> local IDE  , but somehow behaving unexpectedly when running on a cluster
> i.e. both *onJobSubmitted *and *onJobExecuted *are being called
> simultaneously on submitting a real time data streaming job. Currently,
> jobs are being deployed in session mode.
>
> Thanks,
> Puneet Duggal
>


Re: Weird operator ID check restore from checkpoint fails

2021-12-07 Thread Robert Metzger
Hi Dan,

When restoring a savepoint/checkpoint, Flink is matching the state for the
operators based on the uuid of the operator. The exception says that there
is some state that doesn't match any operator. So from Flink's perspective,
the operator is gone.
Here is more information:
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#assigning-operator-ids


Somehow something must have changed in your job: Did you change the Flink
version?

Hope this helps!

On Wed, Dec 8, 2021 at 5:49 AM Dan Hill  wrote:

> I'm restoring the job with the same binary and same flags/args.
>
> On Tue, Dec 7, 2021 at 8:48 PM Dan Hill  wrote:
>
>> Hi.  I noticed this warning has "operator
>> 811d3b279c8b26ed99ff0883b7630242" in it.  I assume this should be an
>> operator uid or name.  It looks like something else.  What is it?  Is
>> something corrupted?
>>
>>
>> org.apache.flink.runtime.client.JobInitializationException: Could not 
>> instantiate JobManager.
>>  at 
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)
>>  at 
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>  at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.IllegalStateException: Failed to rollback to 
>> checkpoint/savepoint 
>> s3a://my-flink-state/checkpoints/ce9b90eafde97ca4629c13936c34426f/chk-626. 
>> Cannot map checkpoint/savepoint state for operator 
>> 811d3b279c8b26ed99ff0883b7630242 to the new program, because the operator is 
>> not available in the new program. If you want to allow to skip this, you can 
>> set the --allowNonRestoredState option on the CLI.
>>  at 
>> org.apache.flink.runtime.checkpoint.Checkpoints.throwNonRestoredStateException(Checkpoints.java:226)
>>  at 
>> org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:190)
>>  at 
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1627)
>>  at 
>> org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:362)
>>  at 
>> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:292)
>>  at 
>> org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:249)
>>  at 
>> org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:133)
>>  at 
>> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)
>>  at 
>> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)
>>  at 
>> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:330)
>>  at 
>> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)
>>  at 
>> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)
>>  at 
>> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:162)
>>  at 
>> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)
>>  at 
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)
>>  ... 4 more
>>
>>


Re: [ANNOUNCE] Flink mailing lists archive service has migrated to Apache Archive service

2021-09-30 Thread Robert Metzger
@Matthias Pohl : I've also been annoyed by this 30
days limit, but I'm not aware of a way to globally change the default. I
would ask in #asfinfra in the asf slack.

On Thu, Sep 30, 2021 at 12:19 PM Till Rohrmann  wrote:

> Thanks for the hint with the managed search engines Matthias. I think this
> is quite helpful.
>
> Cheers,
> Till
>
> On Wed, Sep 15, 2021 at 4:27 PM Matthias Pohl 
> wrote:
>
> > Thanks Leonard for the announcement. I guess that is helpful.
> >
> > @Robert is there any way we can change the default setting to something
> > else (e.g. greater than 0 days)? Only having the last month available as
> a
> > default is kind of annoying considering that the time setting is quite
> > hidden.
> >
> > Matthias
> >
> > PS: As a workaround, one could use the gte=0d parameter which is encoded
> in
> > the URL (e.g. if you use managed search engines in Chrome or Firefox's
> > bookmark keywords:
> > https://lists.apache.org/x/list.html?user@flink.apache.org:gte=0d:%s).
> > That
> > will make all posts available right-away.
> >
> > On Mon, Sep 6, 2021 at 3:16 PM JING ZHANG  wrote:
> >
> > > Thanks Leonard for driving this.
> > > The information is helpful.
> > >
> > > Best,
> > > JING ZHANG
> > >
> > > Jark Wu  于2021年9月6日周一 下午4:59写道:
> > >
> > >> Thanks Leonard,
> > >>
> > >> I have seen many users complaining that the Flink mailing list doesn't
> > >> work (they were using Nabble).
> > >> I think this information would be very helpful.
> > >>
> > >> Best,
> > >> Jark
> > >>
> > >> On Mon, 6 Sept 2021 at 16:39, Leonard Xu  wrote:
> > >>
> > >>> Hi, all
> > >>>
> > >>> The mailing list archive service Nabble Archive was broken at the end
> > of
> > >>> June, the Flink community has migrated the mailing lists archives[1]
> to
> > >>> Apache Archive service by commit[2], you can refer [3] to know more
> > mailing
> > >>> lists archives of Flink.
> > >>>
> > >>> Apache Archive service is maintained by ASF thus the stability is
> > >>> guaranteed, it’s a web-based mail archive service which allows you to
> > >>> browse, search, interact, subscribe, unsubscribe, etc. with mailing
> > lists.
> > >>>
> > >>> Apache Archive service shows mails of the last month by default, you
> > can
> > >>> specify the date range to browse, search the history mails.
> > >>>
> > >>>
> > >>> Hope it would be helpful.
> > >>>
> > >>> Best,
> > >>> Leonard
> > >>>
> > >>> [1] The Flink mailing lists in Apache archive service
> > >>> dev mailing list archives:
> > >>> https://lists.apache.org/list.html?d...@flink.apache.org
> > >>> user mailing list archives :
> > >>> https://lists.apache.org/list.html?user@flink.apache.org
> > >>> user-zh mailing list archives :
> > >>> https://lists.apache.org/list.html?user...@flink.apache.org
> > >>> [2]
> > >>>
> >
> https://github.com/apache/flink-web/commit/9194dda862da00d93f627fd315056471657655d1
> > >>> [3] https://flink.apache.org/community.html#mailing-lists
> > >>
> > >>
> >
>


Re: [ANNOUNCE] Flink mailing lists archive service has migrated to Apache Archive service

2021-09-30 Thread Robert Metzger
@Matthias Pohl : I've also been annoyed by this 30
days limit, but I'm not aware of a way to globally change the default. I
would ask in #asfinfra in the asf slack.

On Thu, Sep 30, 2021 at 12:19 PM Till Rohrmann  wrote:

> Thanks for the hint with the managed search engines Matthias. I think this
> is quite helpful.
>
> Cheers,
> Till
>
> On Wed, Sep 15, 2021 at 4:27 PM Matthias Pohl 
> wrote:
>
> > Thanks Leonard for the announcement. I guess that is helpful.
> >
> > @Robert is there any way we can change the default setting to something
> > else (e.g. greater than 0 days)? Only having the last month available as
> a
> > default is kind of annoying considering that the time setting is quite
> > hidden.
> >
> > Matthias
> >
> > PS: As a workaround, one could use the gte=0d parameter which is encoded
> in
> > the URL (e.g. if you use managed search engines in Chrome or Firefox's
> > bookmark keywords:
> > https://lists.apache.org/x/list.html?u...@flink.apache.org:gte=0d:%s).
> > That
> > will make all posts available right-away.
> >
> > On Mon, Sep 6, 2021 at 3:16 PM JING ZHANG  wrote:
> >
> > > Thanks Leonard for driving this.
> > > The information is helpful.
> > >
> > > Best,
> > > JING ZHANG
> > >
> > > Jark Wu  于2021年9月6日周一 下午4:59写道:
> > >
> > >> Thanks Leonard,
> > >>
> > >> I have seen many users complaining that the Flink mailing list doesn't
> > >> work (they were using Nabble).
> > >> I think this information would be very helpful.
> > >>
> > >> Best,
> > >> Jark
> > >>
> > >> On Mon, 6 Sept 2021 at 16:39, Leonard Xu  wrote:
> > >>
> > >>> Hi, all
> > >>>
> > >>> The mailing list archive service Nabble Archive was broken at the end
> > of
> > >>> June, the Flink community has migrated the mailing lists archives[1]
> to
> > >>> Apache Archive service by commit[2], you can refer [3] to know more
> > mailing
> > >>> lists archives of Flink.
> > >>>
> > >>> Apache Archive service is maintained by ASF thus the stability is
> > >>> guaranteed, it’s a web-based mail archive service which allows you to
> > >>> browse, search, interact, subscribe, unsubscribe, etc. with mailing
> > lists.
> > >>>
> > >>> Apache Archive service shows mails of the last month by default, you
> > can
> > >>> specify the date range to browse, search the history mails.
> > >>>
> > >>>
> > >>> Hope it would be helpful.
> > >>>
> > >>> Best,
> > >>> Leonard
> > >>>
> > >>> [1] The Flink mailing lists in Apache archive service
> > >>> dev mailing list archives:
> > >>> https://lists.apache.org/list.html?d...@flink.apache.org
> > >>> user mailing list archives :
> > >>> https://lists.apache.org/list.html?u...@flink.apache.org
> > >>> user-zh mailing list archives :
> > >>> https://lists.apache.org/list.html?user-zh@flink.apache.org
> > >>> [2]
> > >>>
> >
> https://github.com/apache/flink-web/commit/9194dda862da00d93f627fd315056471657655d1
> > >>> [3] https://flink.apache.org/community.html#mailing-lists
> > >>
> > >>
> >
>


Re: Support ARM architecture

2021-09-22 Thread Robert Metzger
Hi,

afaik the only real blocker for ARM support was a rocksdb binary for arm.
This has been resolved and is scheduled to be released with 1.14.0:
https://issues.apache.org/jira/browse/FLINK-13598

If you have an ARM machine available, you could even help the community in
the release verification process and test the current release candidate:
https://lists.apache.org/thread.html/r21b181aff057d17c81839190e4e413574e07f36eeb8e59799392de2e%40%3Cdev.flink.apache.org%3E


On Wed, Sep 22, 2021 at 7:37 PM Patrick Angeles 
wrote:

> Hey all,
>
> Trying to follow FLINK-13448. Seems like all the subtasks, save for one on
> documentation, are completed... does this mean there will be an arm64
> binary available in the next release (1.14)?
>


Re: Many S3V4AuthErrorRetryStrategy warn logs while reading/writing from S3

2021-09-22 Thread Robert Metzger
Hey Andreas,

This could be related too
https://github.com/apache/hadoop/pull/110/files#diff-0a2e55a2f79ea4079eb7b77b0dc3ee562b383076fa0ac168894d50c80a95131dR950

I guess in Flink this would be

s3.endpoint: your-endpoint-hostname

Where your-endpoint-hostname is a region-specific endpoint, which you can
probably look up from the S3 docs.


On Wed, Sep 22, 2021 at 7:07 PM Hailu, Andreas  wrote:

> Hi,
>
>
>
> When reading/writing to and from S3 using the flink-fs-s3-hadoop plugin on
> 1.11.2, we observe a lot of these WARN log statements in the logs:
>
>
>
> *WARN  S3V4AuthErrorRetryStrategy - Attempting to re-send the request to
> s3.amazonaws.com  with AWS V4 authentication. To
> avoid this warning in the future, please use region-specific endpoint to
> access buckets located in regions that require V4 signing.*
>
>
>
> The applications complete successfully which is great, but I’m not sure
> what the root of the error is and I’m hesitant to silence it through our
> logging configurations. I saw something that looks similar here[1]. Is
> there a way for us to similarly have Flink’s AWS S3 client to use V4
> strategy to begin with?
>
>
>
> [1]
> https://stackoverflow.com/questions/39513518/aws-emr-writing-to-kms-encrypted-s3-parquet-files
>
>
>
> 
>
>
>
> *Andreas Hailu*
>
> *Data Lake Engineering *| Goldman Sachs & Co.
>
>
>
> --
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>


Re: flink rest endpoint creation failure

2021-09-22 Thread Robert Metzger
Hi,

Yes, "rest.bind-port" seems to be set to "35485" on the JobManager
instance. Can you double check the configuration that is used by Flink?
The jobManager is also printing the effective configuration on start up.
You'll probably see the value there as well.


On Wed, Sep 22, 2021 at 6:48 PM Curt Buechter  wrote:

> Hi,
> I'm getting an error that happens randomly when starting a flink
> application.
>
> For context, this is running in YARN on AWS. This application is one that
> converts from the Table API to the Stream API, so two flink
> applications/jobmanagers are trying to start up. I think what happens is
> that the rest api port is chosen, and is the same for both of the flink
> apps. If YARN chooses two different instances for the two task managers,
> they each work fine and start their rest api on the same port on their own
> respective machine. But, if YARN chooses the same instance for both job
> managers, they both try to start up the rest api on the same port on the
> same machine, and I get the error.
>
> Here is the error:
>
> 2021-09-22 15:47:27,724 ERROR 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Could not 
> start cluster entrypoint YarnJobClusterEntrypoint.
> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to 
> initialize the cluster entrypoint YarnJobClusterEntrypoint.
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:212)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:600)
>  [flink-dist_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:99)
>  [flink-dist_2.12-1.13.2.jar:1.13.2]
> Caused by: org.apache.flink.util.FlinkException: Could not create the 
> DispatcherResourceManagerComponent.
>   at 
> org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:275)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:250)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at java.security.AccessController.doPrivileged(Native Method) 
> ~[?:1.8.0_282]
>   at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_282]
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
>  ~[hadoop-common-3.2.1-amzn-3.jar:?]
>   at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   ... 2 more
> Caused by: java.net.BindException: Could not start rest endpoint on any port 
> in port range 35485
>   at 
> org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:234)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:172)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:250)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at java.security.AccessController.doPrivileged(Native Method) 
> ~[?:1.8.0_282]
>   at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_282]
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
>  ~[hadoop-common-3.2.1-amzn-3.jar:?]
>   at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>   ... 2 more
>
>
> And, here is part of the log from the other job manager, which successfully 
> started its rest api on the same port, just a few seconds earlier:
>
>
> 2021-09-22 15:47:20,690 INFO  
> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Rest 
> endpoint listening at ip-10-1-2-137.ec2.internal:35485
> 2021-09-22 15:47:20,691 INFO  
> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - 
> http://ip-10-1-2-137.ec2.internal:35485 was granted leadership with 
> 

Re: Unbounded Kafka Source

2021-09-22 Thread Robert Metzger
Hi,

What happens if you do not set any boundedness on the KafkaSource?
For a DataStream job in streaming mode, the Kafka source should be
unbounded.

>From reading the code, it seems that setting unbounded(latest) should not
trigger the behavior you mention ... but the Flink docs are not clearly
written [1], as it says that you can make a Kafka source bounded by calling
"setUnbounded" ... which is weird, because "setUnbounded" should not make
something bounded?!

Are there any log messages from the Source that can give us any hints?

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#boundedness

On Wed, Sep 22, 2021 at 5:37 PM Robert Cullen  wrote:

> I have an unbounded kafka source that has records written to it every
> second.  Instead of the job waiting to process the new messages it closes.
> How do I keep the stream open?
>
> KafkaSource dataSource = KafkaSource
> .builder()
> .setBootstrapServers(kafkaServer)
> .setTopics(Arrays.asList("fluentd"))
> .setGroupId("")
> .setDeserializer(new FluentdRecordDeserializer())
> //.setStartingOffsets(OffsetsInitializer.earliest())
> //.setBounded(OffsetsInitializer.latest())
> .setUnbounded(OffsetsInitializer.latest())
> .build();
>
>
>
>
> --
> Robert Cullen
> 240-475-4490
>


Re: Flink Performance Issue

2021-09-22 Thread Robert Metzger
Hi Kamaal,

I would first suggest understanding the performance bottleneck, before
applying any optimizations.

Idea 1: Are your CPUs fully utilized?
if yes, good, then scaling up will probably help
If not, then there's another inefficiency

Idea 2: How fast can you get the data into your job, without any processing?
You can measure this by submitting a simple Flink job that just reads the
data and writes it to a discarding sink. Either disable the operator
chaining to get metrics for the records per second, or add a custom mapper
in between that measures the throughput.
Ideally you see here that you can read all your data in a few seconds, if
not, then there's a problem getting your data in.

Idea 3: is your IO fully utilized ?( if you are checkpointing to RocksDB,
the disk can dramatically slow you down)
Idea 4: Are you under high memory pressure, and your JVMs are spending most
of their cycles garbage collecting?

My bet is you are not getting data into your cluster as fast as you think
(Idea 2)


On Wed, Sep 22, 2021 at 12:05 PM Mohammed Kamaal <
mohammed.kamaa...@gmail.com> wrote:

> Hi Arvid,
>
> The throughput has decreased further after I removed all the rebalance().
> The performance has decreased from 14 minutes for 20K messages to 20
> minutes for 20K messages.
>
> Below are the tasks that the flink application is performing. I am using
> keyBy and Window operation. Do you think am I making any mistake here or
> the way I am performing the keyBy or Window operation needs to be
> corrected?.
>
> //Add Source
> StreamExecutionEnvironment streamenv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> initialStreamData = streamenv.addSource(new
> FlinkKafkaConsumer<>(topicsProperties.getProperty(Common.CGM_STREAM_TOPIC),
> new *ObjectNodeJsonDeSerializerSchema()*,
> kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);
>
> DataStream cgmStreamData = initialStreamData.keyBy(value ->
> value.findValue("PERSON_ID").asText())
> .flatMap(new *SgStreamingTask()*).setParallelism(Common.FORTY_FIVE);
>
> DataStream artfctOverlapStream = cgmStreamData.keyBy(new
> CGMKeySelector()).countWindow(2, 1)
> .apply(new *ArtifactOverlapProvider()*
> ).setParallelism(Common.FORTY_FIVE).rebalance();
>
> DataStream streamWithSgRoc = artfctOverlapStream.keyBy(new
> CGMKeySelector()).countWindow(7, 1)
> .apply(new *SgRocProvider()*
> ).setParallelism(Common.FORTY_FIVE).rebalance();
>
> DataStream cgmExcursionStream =
> streamWithSgRoc.keyBy(new CGMKeySelector())
> .countWindow(Common.THREE, Common.ONE).apply(new
> *CGMExcursionProviderStream()*
> ).setParallelism(Common.FORTY_FIVE).rebalance();
>
> //Add Sink
> cgmExcursionStream.addSink(new FlinkKafkaProducer(
> topicsProperties.getProperty(Common.CGM_EVENT_TOPIC), new
> CGMDataCollectorSchema(),
> kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);
>
> *Implementation classes:-*
>
> //deserialize the json message received
> *ObjectNodeJsonDeSerializerSchema* implements
> KeyedDeserializationSchema{
> public ObjectNode deserialize(byte[] messageKey, byte[] message, String
> topic, int partition, long offset);
> }
>
> //Flapmap to check each message and apply validation
> public class *SgStreamingTask* extends RichFlatMapFunction CGM> {
> void flatMap(ObjectNode streamData, Collector out);
> }
>
> //persist three state variables and apply business logic
> public class *ArtifactOverlapProvider* extends RichFlatMapFunction Tuple2>
> implements WindowFunction {
> public void apply(String key, GlobalWindow window, Iterable values,
> Collector out);
> }
>
> //Apply business logic
> public class *SgRocProvider* implements WindowFunction GlobalWindow>{
> public void apply(String key, GlobalWindow window, Iterable values,
> Collector out);
> }
>
> //persist 3 state variables and apply business logic
> public class *CGMExcursionProviderStream* extends
> RichFlatMapFunction>
> implements WindowFunction{
> public void apply(String key, GlobalWindow window, Iterable values,
> Collector out);
>
> }
>
> Thanks
> Kamaal
>
>
> On Mon, Sep 6, 2021 at 9:57 PM Arvid Heise  wrote:
>
>> Hi Mohammed,
>>
>> something is definitely wrong in your setup. You can safely say that you
>> can process 1k records per second and core with Kafka and light processing,
>> so you shouldn't even need to go distributed in your case.
>>
>> Do you perform any heavy computation? What is your flatMap doing? Are you
>> emitting lots of small records from one big record?
>>
>> Can you please remove all rebalance and report back? Rebalance is
>> counter-productive if you don't exactly know that you need it.
>>
>> On Thu, Sep 2, 2021 at 1:36 PM Mohammed Kamaal <
>> mohammed.kamaa...@gmail.com> wrote:
>>
>>> Hi Fabian,
>>>
>>> Just an update,
>>>
>>> Problem 2:-
>>> 
>>> Caused by: org.apache.kafka.common.errors.NetworkException
>>> It is resolved. It was because we exceeded the number of allowed
>>> partitions for the kafka cluster (AWS MSK cluster). Have deleted
>>> unused topics 

Re: Documentation for deep diving into flink (data-streaming) job restart process

2021-09-10 Thread Robert Metzger
Thanks for the log.

>From the partial log that you shared with me, my assumption is that some
external resource manager is shutting down your cluster. Multiple
TaskManagers are disconnecting, and finally the job is switching into
failed state.
It seems that you are not stopping only one TaskManger, but all of them.

Why are you restarting a TaskManager?
How are you deploying Flink?

On Fri, Sep 10, 2021 at 12:46 AM Puneet Duggal 
wrote:

> Hi,
>
> Please find attached logfile regarding job not getting restarted on
> another task manager once existing task manager got restarted.
>
> Just FYI - We are using Fixed Delay Restart (5 times, 10s delay)
>
> On Thu, Sep 9, 2021 at 4:29 PM Robert Metzger  wrote:
>
>> Hi Puneet,
>>
>> Can you provide us with the JobManager logs of this incident? Jobs should
>> not disappear, they should restart on other Task Managers.
>>
>> On Wed, Sep 8, 2021 at 3:06 PM Puneet Duggal 
>> wrote:
>>
>>> Hi,
>>>
>>> So for past 2-3 days i have been looking for documentation which
>>> elaborates how flink takes care of restarting the data streaming job. I
>>> know all the restart and failover strategies but wanted to know how
>>> different components (Job Manager, Task Manager etc) play a role while
>>> restarting the flink data streaming job.
>>>
>>> I am asking this because recently in production.. when i restarted a
>>> task manger, all the jobs running on it, instead of getting restarted,
>>> disappeared. Within flink UI, couldn't tack those jobs in completed jobs as
>>> well. Logs also couldnt provide me with good enough information.
>>>
>>> Also if anyone can tell me what is the role of /tmp/executionGraphStore
>>> folder in Job Manager machine.
>>>
>>> Thanks
>>>
>>>
>>>


Re: Issue while creating Hive table from Kafka topic

2021-09-09 Thread Robert Metzger
mpl.invoke0(Native Method) at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498) at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
> at
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
> at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> ... 7 more
>
> On Thu, Sep 9, 2021 at 5:36 PM Robert Metzger  wrote:
>
>> Can you share the full stack trace, not just a part of it?
>>
>> On Thu, Sep 9, 2021 at 1:43 PM Harshvardhan Shinde <
>> harshvardhan.shi...@oyorooms.com> wrote:
>>
>>> Hi,
>>>
>>> I added the dependencies while trying to resolve the same issue, thought
>>> I was missing them.
>>>
>>> Thanks
>>>
>>> On Thu, Sep 9, 2021 at 4:26 PM Robert Metzger 
>>> wrote:
>>>
>>>> Hey,
>>>>
>>>> Why do you have these dependencies in your pom?
>>>>
>>>> 
>>>> 
>>>> org.apache.kafka
>>>> kafka-clients
>>>> 2.8.0
>>>> 
>>>>
>>>> 
>>>> org.apache.kafka
>>>> kafka_2.12
>>>> 2.8.0
>>>> 
>>>>
>>>>
>>>> They are not needed for using the Kafka connector of Flink (the flink
>>>> kafka connector dependencies pulls the required dependencies)
>>>>
>>>>
>>>> On Thu, Sep 9, 2021 at 12:02 PM Harshvardhan Shinde <
>>>> harshvardhan.shi...@oyorooms.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I'm trying a simple flink job that reads data from a kafka topic and
>>>>> creates a Hive table.
>>>>>
>>>>> I'm following the steps from here
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/hive/overview/#connecting-to-hive>
>>>>> .
>>>>>
>>>>> Here's my code:
>>>>>
>>>>> import org.apache.flink.table.api.EnvironmentSettings;
>>>>> import org.apache.flink.table.api.Table;
>>>>> import org.apache.flink.table.api.TableEnvironment;
>>>>> import org.apache.flink.table.catalog.hive.HiveCatalog;
>>>>>
>>>>> EnvironmentSettings settings = 
>>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>>> TableEnvironment tableEnv = TableEnvironment.create(settings);
>>>>>
>>>>> String name= "myhive";
>>>>> String defaultDatabase = "harsh_test";
>>>>> String hiveConfDir = "/etc/hive/conf";
>>>>>
>>>>> HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
>>>>> tableEnv.registerCatalog(name, hive);
>>>>>
>>>>> // set the HiveCatalog as the current catalog of the session
>>>>> tableEnv.useCatalog(name);
>>>>>
>>>>> tableEnv.executeSql("CREATE TABLE IF NOT EXISTS transactions (\n" +
>>>>>   "  `created_at` TIMESTAMP(3) METADATA FROM 'timestamp',\n" +
>>>>>   "  `partition` BIGINT METADATA VIRTUAL,\n" +
>>>>>   "  `offset` BIGINT METADATA VIRTUAL,\n" +
>>>>>   "account_id  BIGINT,\n" +
>>>>>   "amount  BIGINT,\n" +
>>>>>   "transaction_time TIMESTAMP(3),\n" +
>>>>>   "WATERMARK FOR transaction_time AS transaction_time - INTERVAL 
>>>>> '5' SECOND\n" +
>>>>>   ") WITH (\n" +
>>>>>   "'connector' = 'kafka',\n" +

Re: Job crashing with RowSerializer EOF exception

2021-09-09 Thread Robert Metzger
Hi Yuval,

EOF exceptions during serialization are usually an indication that some
serializers in the serializer chain is somehow broken.
What data type are you serializating? Does it include some type serializer
by a custom serializer, or Kryo, ... ?

On Thu, Sep 9, 2021 at 4:35 PM Yuval Itzchakov  wrote:

> Hi,
>
> Flink 1.13.2
> Scala 2.12.7
>
> Running an app in production, I'm running into the following exception
> that frequently fails the job:
>
> switched from RUNNING to FAILED with failure cause: java.io.IOException:
> Can't get next record for channel InputChannelInfo{gateIdx=0,
> inputChannelIdx=2}\n\tat
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:98)\n\tat
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)\n\tat
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:96)\n\tat
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)\n\tat
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)\n\tat
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)\n\tat
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)\n\tat
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)\n\tat
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)\n\tat
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)\n\tat
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)\n\tat
> java.base/java.lang.Thread.run(Thread.java:829)\nCaused by:
> java.io.EOFException\n\tat
> org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329)\n\tat
> org.apache.flink.types.StringValue.readString(StringValue.java:781)\n\tat
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73)\n\tat
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31)\n\tat
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:345)\n\tat
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:72)\n\tat
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)\n\tat
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)\n\tat
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)\n\tat
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:110)\n\tat
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93)\n\tat
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95)\n\t...
> 11
>
> Deserialization logic for the rows seems to be failing with an EOF
> exception. Any help on the best way to debug this or try to get more info
> would be great.
>
> Thanks.
> --
> Best Regards,
> Yuval Itzchakov.
>


Re: Issue while creating Hive table from Kafka topic

2021-09-09 Thread Robert Metzger
Can you share the full stack trace, not just a part of it?

On Thu, Sep 9, 2021 at 1:43 PM Harshvardhan Shinde <
harshvardhan.shi...@oyorooms.com> wrote:

> Hi,
>
> I added the dependencies while trying to resolve the same issue, thought I
> was missing them.
>
> Thanks
>
> On Thu, Sep 9, 2021 at 4:26 PM Robert Metzger  wrote:
>
>> Hey,
>>
>> Why do you have these dependencies in your pom?
>>
>> 
>> 
>> org.apache.kafka
>> kafka-clients
>> 2.8.0
>> 
>>
>> 
>> org.apache.kafka
>> kafka_2.12
>> 2.8.0
>> 
>>
>>
>> They are not needed for using the Kafka connector of Flink (the flink
>> kafka connector dependencies pulls the required dependencies)
>>
>>
>> On Thu, Sep 9, 2021 at 12:02 PM Harshvardhan Shinde <
>> harshvardhan.shi...@oyorooms.com> wrote:
>>
>>> Hi,
>>>
>>> I'm trying a simple flink job that reads data from a kafka topic and
>>> creates a Hive table.
>>>
>>> I'm following the steps from here
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/hive/overview/#connecting-to-hive>
>>> .
>>>
>>> Here's my code:
>>>
>>> import org.apache.flink.table.api.EnvironmentSettings;
>>> import org.apache.flink.table.api.Table;
>>> import org.apache.flink.table.api.TableEnvironment;
>>> import org.apache.flink.table.catalog.hive.HiveCatalog;
>>>
>>> EnvironmentSettings settings = 
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>> TableEnvironment tableEnv = TableEnvironment.create(settings);
>>>
>>> String name= "myhive";
>>> String defaultDatabase = "harsh_test";
>>> String hiveConfDir = "/etc/hive/conf";
>>>
>>> HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
>>> tableEnv.registerCatalog(name, hive);
>>>
>>> // set the HiveCatalog as the current catalog of the session
>>> tableEnv.useCatalog(name);
>>>
>>> tableEnv.executeSql("CREATE TABLE IF NOT EXISTS transactions (\n" +
>>>   "  `created_at` TIMESTAMP(3) METADATA FROM 'timestamp',\n" +
>>>   "  `partition` BIGINT METADATA VIRTUAL,\n" +
>>>   "  `offset` BIGINT METADATA VIRTUAL,\n" +
>>>   "account_id  BIGINT,\n" +
>>>   "amount  BIGINT,\n" +
>>>   "transaction_time TIMESTAMP(3),\n" +
>>>   "WATERMARK FOR transaction_time AS transaction_time - INTERVAL 
>>> '5' SECOND\n" +
>>>   ") WITH (\n" +
>>>   "'connector' = 'kafka',\n" +
>>>   "'topic' = 'flink-stream-table',\n" +
>>>   "'properties.bootstrap.servers' = ':9092',\n" +
>>>   "   'scan.startup.mode' = 'earliest-offset',\n" +
>>>   "'format'= 'csv'\n" +
>>>   ")");
>>>
>>> Table table = tableEnv.sqlQuery("Select * from transactions");
>>> table.execute().print();
>>>
>>> The code builds successfully, but I'm getting the following runtime
>>> error:
>>>
>>> Caused by: java.util.concurrent.CompletionException:
>>> java.lang.NoClassDefFoundError:
>>> org/apache/kafka/common/serialization/ByteArrayDeserializer at
>>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>>> at
>>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>>> at
>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
>>> ..
>>>
>>> Here are my pom.xml file contents:
>>>
>>> 
>>> http://maven.apache.org/POM/4.0.0; 
>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
>>>  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
>>> http://maven.apache.org/xsd/maven-4.0.0.xsd;>
>>> 4.0.0
>>>
>>> com.harsh.test
>>> harsh-flink-test
>>> 1.0-SNAPSHOT
>>> jar
>>>
>>> Flink Quickstart Job
>>> http://www.myorganization.org
>>>
>>> 
>>>

Re: Allocation-preserving scheduling and task-local recovery

2021-09-09 Thread Robert Metzger
Hi,
from my understanding of the code [1], the task scheduling first considers
the state location, and then uses the evenly spread out scheduling strategy
as a fall back. So in my understanding of the code, the local recovery
should have preference over the evenly spread out strategy.

If you can easily test it, I would still recommend removing the
"cluster.evenly-spread-out-slots" strategy, just to make sure my
understanding is really correct.

I don't think that's the case, but just to make sure: You are only
restarting a single task manager, right? The other task managers keep
running? (Afaik the state information is lost of a TaskManager restarts)

Sorry that I don't have a real answer here (yet). Is there anything
suspicious in the JobManager or TaskManager logs?


[1]
https://github.com/apache/flink/blob/release-1.10/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java

On Wed, Sep 8, 2021 at 9:44 PM Xiang Zhang  wrote:

> We also have this configuration set in case it makes any difference when
> allocation tasks: cluster.evenly-spread-out-slots.
>
> On 2021/09/08 18:09:52, Xiang Zhang  wrote:
> > Hello,
> >
> > We have an app running on Flink 1.10.2 deployed in standalone mode. We
> > enabled task-local recovery by setting both
> *state.backend.local-recovery *and
> > *state.backend.rocksdb.localdir*. The app has over 100 task managers and
> 2
> > job managers (active and passive).
> >
> > This is what we have observed. When we restarted a task manager, all
> tasks
> > got canceled (due to the default failover configuration
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/task_failure_recovery/#failover-strategies
> >).
> > Then these tasks were re-distributed among the task manager (e.g. some
> > tasks manager have more slots used than before restart). This caused all
> > task managers to download state from remote storage all over again.
> >
> > The same thing happened when we restarted a job manager. The job manager
> > failed over to the passive one successfully, however all tasks were
> > canceled and reallocated among the task managers again.
> >
> > My understanding is that if task-local recovery is enabled, Flink will
> try
> > to enable sticky assignment of tasks to previous task managers they run
> on.
> > This doesn't seem to be the case. My question is how we can enable
> > this allocation-preserving
> > scheduling
> > <
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/large_state_tuning/#allocation-preserving-scheduling
> >
> > when Flink handles failures.
> >
> > Thanks,
> > Xiang
> >
>


Re: Documentation for deep diving into flink (data-streaming) job restart process

2021-09-09 Thread Robert Metzger
Hi Puneet,

Can you provide us with the JobManager logs of this incident? Jobs should
not disappear, they should restart on other Task Managers.

On Wed, Sep 8, 2021 at 3:06 PM Puneet Duggal 
wrote:

> Hi,
>
> So for past 2-3 days i have been looking for documentation which
> elaborates how flink takes care of restarting the data streaming job. I
> know all the restart and failover strategies but wanted to know how
> different components (Job Manager, Task Manager etc) play a role while
> restarting the flink data streaming job.
>
> I am asking this because recently in production.. when i restarted a task
> manger, all the jobs running on it, instead of getting restarted,
> disappeared. Within flink UI, couldn't tack those jobs in completed jobs as
> well. Logs also couldnt provide me with good enough information.
>
> Also if anyone can tell me what is the role of /tmp/executionGraphStore
> folder in Job Manager machine.
>
> Thanks
>
>
>


Re: Issue while creating Hive table from Kafka topic

2021-09-09 Thread Robert Metzger
Hey,

Why do you have these dependencies in your pom?



org.apache.kafka
kafka-clients
2.8.0



org.apache.kafka
kafka_2.12
2.8.0



They are not needed for using the Kafka connector of Flink (the flink kafka
connector dependencies pulls the required dependencies)


On Thu, Sep 9, 2021 at 12:02 PM Harshvardhan Shinde <
harshvardhan.shi...@oyorooms.com> wrote:

> Hi,
>
> I'm trying a simple flink job that reads data from a kafka topic and
> creates a Hive table.
>
> I'm following the steps from here
> 
> .
>
> Here's my code:
>
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.TableEnvironment;
> import org.apache.flink.table.catalog.hive.HiveCatalog;
>
> EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> TableEnvironment tableEnv = TableEnvironment.create(settings);
>
> String name= "myhive";
> String defaultDatabase = "harsh_test";
> String hiveConfDir = "/etc/hive/conf";
>
> HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
> tableEnv.registerCatalog(name, hive);
>
> // set the HiveCatalog as the current catalog of the session
> tableEnv.useCatalog(name);
>
> tableEnv.executeSql("CREATE TABLE IF NOT EXISTS transactions (\n" +
>   "  `created_at` TIMESTAMP(3) METADATA FROM 'timestamp',\n" +
>   "  `partition` BIGINT METADATA VIRTUAL,\n" +
>   "  `offset` BIGINT METADATA VIRTUAL,\n" +
>   "account_id  BIGINT,\n" +
>   "amount  BIGINT,\n" +
>   "transaction_time TIMESTAMP(3),\n" +
>   "WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' 
> SECOND\n" +
>   ") WITH (\n" +
>   "'connector' = 'kafka',\n" +
>   "'topic' = 'flink-stream-table',\n" +
>   "'properties.bootstrap.servers' = ':9092',\n" +
>   "   'scan.startup.mode' = 'earliest-offset',\n" +
>   "'format'= 'csv'\n" +
>   ")");
>
> Table table = tableEnv.sqlQuery("Select * from transactions");
> table.execute().print();
>
> The code builds successfully, but I'm getting the following runtime error:
>
> Caused by: java.util.concurrent.CompletionException:
> java.lang.NoClassDefFoundError:
> org/apache/kafka/common/serialization/ByteArrayDeserializer at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
> ..
>
> Here are my pom.xml file contents:
>
> 
> http://maven.apache.org/POM/4.0.0; 
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
>  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
> http://maven.apache.org/xsd/maven-4.0.0.xsd;>
> 4.0.0
>
> com.harsh.test
> harsh-flink-test
> 1.0-SNAPSHOT
> jar
>
> Flink Quickstart Job
> http://www.myorganization.org
>
> 
> UTF-8
> 1.13.2
> 1.8
> 2.3.6
> 2.12
> ${java.version}
> ${java.version}
> 
>
> 
> 
> apache.snapshots
> Apache Development Snapshot Repository
> 
> https://repository.apache.org/content/repositories/snapshots/
> 
> false
> 
> 
> true
> 
> 
> 
>
> 
> 
> 
> 
> org.apache.flink
> flink-java
> ${flink.version}
> 
> 
> org.apache.flink
> 
> flink-streaming-java_${scala.binary.version}
> ${flink.version}
> 
>
> 
>
> 
>
> 
> 
> org.apache.flink
> 
> flink-connector-kafka_${scala.binary.version}
> ${flink.version}
> 
>
> 
> org.apache.flink
> 
> flink-table-api-java-bridge_${scala.binary.version}
> ${flink.version}
> 
>
> 
> org.apache.flink
> 
> flink-table-planner-blink_${scala.binary.version}
> ${flink.version}
> 
>
> 
> 
> org.apache.flink
> flink-table-planner_2.12
> 1.13.2
> 
>
>
> 
> 
> org.apache.flink
> 
> flink-connector-hive_${scala.binary.version}
> ${flink.version}
> 
>
> 
> 
> org.apache.hive
> hive-exec
> ${hive.version}
> 
>
> 
> 
> javax.servlet
> javax.servlet-api
> 

Re: Job manager crash

2021-09-09 Thread Robert Metzger
Is the kubernetes server you are using particularly busy? Maybe these
issues occur because the server is overloaded?

"Triggering checkpoint 2193 (type=CHECKPOINT) @ 1630681482667 for job
."
"Completed checkpoint 2193 for job  (474
bytes in 195 ms)."
"Triggering checkpoint 2194 (type=CHECKPOINT) @ 1630681492667 for job
."
"Completed checkpoint 2194 for job  (474
bytes in 161 ms)."
"Renew deadline reached after 60 seconds while renewing lock ConfigMapLock:
myNs - myJob-dispatcher-leader (1bcda6b0-8a5a-4969-b9e4-2257c4478572)"
"Stopping SessionDispatcherLeaderProcess."

At some point, the leader election mechanism in fabric8 seems to give up.


On Tue, Sep 7, 2021 at 10:05 AM mejri houssem 
wrote:

> hello,
>
> Here's other logs of the latest jm crash.
>
>
> Le lun. 6 sept. 2021 à 14:18, houssem  a écrit :
>
>> hello,
>>
>> I have three jobs running on my kubernetes cluster and each job has his
>> own cluster id.
>>
>> On 2021/09/06 03:28:10, Yangze Guo  wrote:
>> > Hi,
>> >
>> > The root cause is not "java.lang.NoClassDefFound". The job has been
>> > running but could not edit the config map
>> > "myJob--jobmanager-leader" and it
>> > seems finally disconnected with the API server. Is there another job
>> > with the same cluster id (myJob) ?
>> >
>> > I would also pull Yang Wang.
>> >
>> > Best,
>> > Yangze Guo
>> >
>> > On Mon, Sep 6, 2021 at 10:10 AM Caizhi Weng 
>> wrote:
>> > >
>> > > Hi!
>> > >
>> > > There is a message saying "java.lang.NoClassDefFound Error:
>> org/apache/hadoop/hdfs/HdfsConfiguration" in your log file. Are you
>> visiting HDFS in your job? If yes it seems that your Flink distribution or
>> your cluster is lacking hadoop classes. Please make sure that there are
>> hadoop jars in the lib directory of Flink, or your cluster has set the
>> HADOOP_CLASSPATH environment variable.
>> > >
>> > > mejri houssem  于2021年9月4日周六 上午12:15写道:
>> > >>
>> > >>
>> > >> Hello ,
>> > >>
>> > >> I am facing a JM crash lately. I am deploying a flink application
>> cluster on kubernetes.
>> > >>
>> > >> When i install my chart using helm everything works fine but after
>> some time ,the Jm starts to crash
>> > >>
>> > >> and then it gets deleted eventually after 5 restarts.
>> > >>
>> > >> flink version: 1.12.5 (upgraded recently from 1.12.2)
>> > >> HA mode : k8s
>> > >>
>> > >> Here's the full log of the JM attached file.
>> >
>>
>


Re: custom flink image error

2021-08-04 Thread Robert Metzger
Hey Joshua,

Can you first validate if the docker image you've built is valid by running
it locally on your machine?

I would recommend putting the s3 filesystem files into the plugins [1]
directory to avoid classloading issues.
Also, you don't need to build custom images if you want to use build-in
plugins [2]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/filesystems/plugins/
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#using-plugins

On Wed, Aug 4, 2021 at 3:06 PM Joshua Fan  wrote:

> Hi All
> I want to build a custom flink image to run on k8s, below is my Dockerfile
> content:
>
>> FROM apache/flink:1.13.1-scala_2.11
>> ADD ./flink-s3-fs-hadoop-1.13.1.jar /opt/flink/lib
>> ADD ./flink-s3-fs-presto-1.13.1.jar /opt/flink/lib
>>
> I just put the s3 fs dependency to the {flink home}/lib, and then I build
> the image and push it to the repo.
>
> When I submit the flink session from the custom image, a error will be
> reported like "exec /docker-entrypoint.sh failed: Exec format error".
>
> I googled a lot, but it seems no useful information.
>
> Thanks for your help.
>
> Yours sincerely
> Joshua
>


Re: Topic assignment across Flink Kafka Consumer

2021-08-04 Thread Robert Metzger
Hi Prasanna,

How are you checking the assignment of Kafka partitions to the consumers?

The FlinkKafkaConsumer doesn't have a rebalance() method, this is a generic
concept of the DataStream API. Is it possible that you are
somehow partitioning your data in your Flink job, and this is causing the
data distribution issues you are observing?


On Wed, Aug 4, 2021 at 4:00 PM Prasanna kumar 
wrote:

> Robert
>
> When we apply a rebalance method to the kafka consumer, it is assigning
> partitions of various topics evenly.
>
> But my only concern is that the rebalance method might have a performance
> impact .
>
> Thanks,
> Prasanna.
>
>
> On Wed, Aug 4, 2021 at 5:55 PM Prasanna kumar <
> prasannakumarram...@gmail.com> wrote:
>
>> Robert,
>>
>> Flink version 1.12.2.
>> Flink connector Kafka Version 2..12
>>
>> The partitions are assigned equally if we are reading from a single topic.
>>
>> Our Use case is to read from multiple topics [topics r4 regex pattern] we
>> use 6 topics and 1 partition per topic for this job.
>>
>> In this case , few of the kafka consumer tasks are not allocated.
>>
>> Thanks,
>> Prasanna.
>>
>> On Tue, 20 Jul 2021, 17:44 Robert Metzger,  wrote:
>>
>>> Hi Prasanna,
>>> which Flink version and Kafka connector are you using? (the
>>> "KafkaSource" or "FlinkKafkaConsumer"?)
>>>
>>> The partition assignment for the FlinkKafkaConsumer is defined here:
>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java#L27-L43
>>>
>>>
>>> I assume all your topics have one partition only. Still, the
>>> "startIndex" should be determined based on the hash of the topic name. My
>>> only explanation is that your unlucky with the distribution of the hashes.
>>> If this leads to performance issues, consider using topics with multiple
>>> partitions, change the name of the topics or increase the parallelism of
>>> your consumer.
>>>
>>>
>>>
>>>
>>> On Tue, Jul 20, 2021 at 7:53 AM Prasanna kumar <
>>> prasannakumarram...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> We have a Flink job reading from multiple Kafka topics based on a regex
>>>> pattern.
>>>>
>>>> What we have found out is that the topics are not shared between the
>>>> kafka consumers in an even manner .
>>>>
>>>> Example if there are 8 topics and 4 kafka consumer operators . 1
>>>> consumer is assigned 6 topics , 2 consumers assigned 1 each and the last
>>>> consumer is not assigned at all.
>>>>
>>>> This leads to inadequate usage of the resources.
>>>>
>>>> I could not find any setting/configuration which would make them as
>>>> even as possible.
>>>>
>>>> Let me know if there's a way to do the same.
>>>>
>>>> Thanks,
>>>> Prasanna.
>>>>
>>>


Re: Savepoint class refactor in 1.11 causing restore from 1.9 savepoint to fail

2021-08-04 Thread Robert Metzger
Hi Weston,

Oh indeed, you are right! I quickly tried restoring a 1.9 savepoint on a
1.11 runtime and it worked. So in principle this seems to be supported.

I'm including Timo into this thread, he has a lot of experience with the
serializers.

On Tue, Aug 3, 2021 at 6:59 PM Weston Woods  wrote:

> Robert,
>
>
>
> Thanks for your reply.How should I interpret the savepoint
> compatibility table here
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/upgrading/#compatibility-table
> if a 1.9 savepoint cannot be restored into a 1.11 runtime?
>
>
>
>
>
>
>
> *From: *Robert Metzger 
> *Date: *Tuesday, August 3, 2021 at 11:52 AM
> *To: *Weston Woods 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: Savepoint class refactor in 1.11 causing restore from 1.9
> savepoint to fail
>
>
>
> Hi Weston,
>
> I haven never looked into the savepoint migration code paths myself, but I
> know that savepoint migration across multiple versions is not supported
> (1.9 can only migrate to 1.10, not 1.11). We have test coverage for these
> migrations, and I would be surprised if this "Savepoint" class migration is
> not covered in these tests.
>
>
>
> Have you tried upgrading from 1.9 to 1.10, and then from 1.10 to 1.11?
>
>
>
> On Fri, Jul 30, 2021 at 11:53 PM Weston Woods  wrote:
>
> I am unable to restore a 1.9 savepoint into a 1.11 runtime for the very
> interesting reason that the Savepoint class was renamed and repackaged
> between those two releases.   Apparently a Kryo serializer has that class
> registered in the 1.9 runtime. I can’t think of a good reason for that
> class to be registered with Kryo; none of the job operators reference any
> such thing.   Yet there it is causing the following exception and
> preventing upgrade to a new runtime.
>
>


Re: Checkpoints fail when trying to copy from Kafka to S3 since "some tasks have been finished"

2021-08-03 Thread Robert Metzger
Hi Svend,
I'm a bit confused by this statement:

* In sreaming mode, with checkpoing but removing the `setBounded()` on the
> kafka source yields the same result


My expectation would be that the source runs forever, if it is not bounded.
Are you sure this error message is not coming from another task?


On Sat, Jul 31, 2021 at 11:23 AM Svend  wrote:

> Hi everyone,
>
> I'm failing to write a simple Flink 1.13.1 application with the DataStream
> that reads from kafka and writes to parquet.
>
> My intention is to run this as a job that reads what is currenlty in
> kafka, shuts down when reaching current end of each partition and picks up
> from there next time it's started.
>
> I've tried several variations, I can't get anything to work properly:
>
> * In streaming mode, enabling checkpoint and setting the kafkaSource to
> bounded (see code sample below), the application fails to perform
> checkpoint complaining about:
>
> "Checkpoint Timer Failed to trigger checkpoint for job ... since some
> tasks of job ... has been finished"
>
> => no parquet part gets written, no checkpoint gets written and no kafka
> offset get committed
>
> * In sreaming mode, with checkpoing but removing the `setBounded()` on the
> kafka source yields the same result
>
> * I also tried in batch mode, removing the checkpoint, switching the
> StreamingFileSink for a FileSink and using Kafka's "
> auto.commit.interval.ms" => in that case I'm getting some parquet output
> and kafka offsets are committed, but the application shuts down before
> flushing the offset of what has been read, s.t. the latest kafka events
> will be read again at the next start.
>
> This all sounds very basic, I see other people do this kind of thing
> (recently, [1]), and II was really expecting the combinaision of
> KafkaSource with StreamingFileSink and checkpointing to work, all those are
> streaming concepts. Hopefully I'm doing something wrong?
>
> [1]
> http://mail-archives.apache.org/mod_mbox/flink-user/202106.mbox/browser
>
> Thanks a lot in advance,
>
> Svend
>
> ```
> // I'm testing this by launching the app an IDE
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().registerTypeWithKryoSerializer(DynamicMessage.class,
> new DynamicProtobufKryoSerializer(params));
>
> env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
> env.enableCheckpointing(1000);
>
> env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-checkpoints");
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
> KafkaSource kafkaSource =
> KafkaSource.builder()
> .setBootstrapServers("localhost:9092")
> .setTopics("some-topic")
> .setGroupId("my-group")
> .setValueOnlyDeserializer(new DynamicProtobufFlinkSchema())
>
> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
> .setBounded(OffsetsInitializer.latest())
> .build();
>
> StreamingFileSink parquetSink = StreamingFileSink
> .forBulkFormat(
> Path.fromLocalFile(new
> File("/tmp/job-output/some-topic.parquet")),
> new ParquetWriterFactory<>((out) -> new
> ParquetDynamicMessageWriterBuilder(out, params).build()))
>.withRollingPolicy(OnCheckpointRollingPolicy.build())
> .build();
>
> env
> .fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka
> Source")
> .sinkTo(parquetSink);
>
> env.execute("my-job");
> ```
>


Re: Cleaning old incremental checkpoint files

2021-08-03 Thread Robert Metzger
Hi Robin,

Let's say you have two checkpoints #1 and #2, where #1 has been created by
an old version or your job, and #2 has been created by the new version.
When can you delete #1?
In #1, there's a directory "/shared" that contains data that is also used
by #2, because of the incremental nature of the checkpoints.

You can not delete the data in the /shared directory, as this data is
potentially still in use.

I know this is only a partial answer to your question. I'll try to find out
more details and extend my answer later.


On Thu, Jul 29, 2021 at 2:31 PM Robin Cassan 
wrote:

> Hi all!
>
> We've happily been running a Flink job in production for a year now, with
> the RocksDB state backend and incremental retained checkpointing on S3. We
> often release new versions of our jobs, which means we cancel the running
> one and submit another while restoring the previous jobId's last retained
> checkpoint.
>
> This works fine, but we also need to clean old files from S3 which are
> starting to pile up. We are wondering two things:
> - once the newer job has restored the older job's checkpoint, is it safe
> to delete it? Or will the newer job's checkpoints reference files from the
> older job, in which case deleting the old checkpoints might cause errors
> during the next restore?
> - also, since all our state has a 7 days TTL, is it safe to set a 7 or 8
> days retention policy on S3 which would automatically clean old files, or
> could we still need to retain files older than 7 days even with the TTL?
>
> Don't hesitate to ask me if anything is not clear enough!
>
> Thanks,
> Robin
>


Re: Savepoint class refactor in 1.11 causing restore from 1.9 savepoint to fail

2021-08-03 Thread Robert Metzger
Hi Weston,
I haven never looked into the savepoint migration code paths myself, but I
know that savepoint migration across multiple versions is not supported
(1.9 can only migrate to 1.10, not 1.11). We have test coverage for these
migrations, and I would be surprised if this "Savepoint" class migration is
not covered in these tests.

Have you tried upgrading from 1.9 to 1.10, and then from 1.10 to 1.11?

On Fri, Jul 30, 2021 at 11:53 PM Weston Woods  wrote:

> I am unable to restore a 1.9 savepoint into a 1.11 runtime for the very
> interesting reason that the Savepoint class was renamed and repackaged
> between those two releases.   Apparently a Kryo serializer has that class
> registered in the 1.9 runtime. I can’t think of a good reason for that
> class to be registered with Kryo; none of the job operators reference any
> such thing.   Yet there it is causing the following exception and
> preventing upgrade to a new runtime.
>
>
>
> Caused by: java.lang.IllegalStateException: Missing value for the key
> 'org.apache.flink.runtime.checkpoint.savepoint.Savepoint'
> at
> org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190)
> ~[flink-dist_2.11-1.11.3.jar:1.11.3]
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:86)
> ~[flink-dist_2.11-1.11.3.jar:1.11.3]
>
>
>
> There doesn’t seem to be any way to unregister a class from Kryo.   And
> the mechanism for dealing with missing classes looks to me like it has
> never worked as advertised.Instead of registering a dummy class for a
> missing class name a null gets registered instead, leading to the exception
> which prevents restoring the savepoint.   The code that returns a null
> instead of a dummy is here  -
> https://github.com/apache/flink/blob/e8cfe6701b9768d1f1fe4488640cba5f9b42d73f/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshotData.java#L263
>
>
>
> Resulting in this log.
>
>
>
> 2021-07-27 18:38:11,703 WARN
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData
> [] - Cannot find registered class
> org.apache.flink.runtime.checkpoint.savepoint.Savepoint for Kryo
> serialization in classpath; using a dummy class as a placeholder.
> java.lang.ClassNotFoundException:
> org.apache.flink.runtime.checkpoint.savepoint.Savepoint
>
>
>
> One way or another I need to be able to restore a 1.9 savepoint into
> 1.11.   Perhaps the Kryo registration needs to be cleansed from wherever it
> is lurking in the 1.9 savepoint,  or an effective dummy needs to be
> substituted when reading it into 1.11.
>
>
>
> Has anyone else encountered this problem, or have any advice to offer?
>
>
>
>
>
>
>
>
>
>
>
>
>


Re: Repository for all mailing list

2021-08-03 Thread Robert Metzger
Hey,

generally, the mailing lists (and JIRA) are indexed by search engines, in
particular Google. As long as you have a specific enough search string
(such as an exception message), you should find past problems and solutions.

You can also download the entire Flink mailing list archives. For example
the dev@ emails from January 2019 are here:
https://mail-archives.apache.org/mod_mbox/flink-dev/201901.mbox.

You can actually also analyze the data with Flink ;)
--> https://github.com/ververica/lab-flink-repository-analytics


On Tue, Aug 3, 2021 at 10:54 AM Dipanjan Mazumder  wrote:

> Hi ,
>
> I was wondering there were so many problems and there solutions were 
> discussed
> in mails , and do we have any jira issues repo where all these problems and
> there solutions a re maintained so that any user can checkout the issues in
> the repo to understand if their problems are already discussed and has a
> solution.
>
> I believe because these mails are directed to specific emails they must be
> recorded somewhere. If that is the case , where can we find them.
>
> Regards
> Dipanjan
>


Re: Issue with writing to Azure blob storage using StreamingFileSink and FileSink

2021-08-03 Thread Robert Metzger
Hey Sudhanva,
Have you configured IntelliJ to include dependencies with "Provided" Scope
when executing your main method?

I also noticed that you are using Flink 1.13.1 and 1.13.0 in your pom. its
probably not an issue in this case, but it can cause problems.

On Fri, Jul 30, 2021 at 10:29 AM Sudhanva Purushothama <
sudha...@coffeebeans.io> wrote:

> Hello,
>  I have been trying to Use StreamingFileSink to write to parquetFiles
> into azure blob storage. I am getting the following error. I did see in the
> ticket https://issues.apache.org/jira/browse/FLINK-17444 that support for
> StreamingFileSink is not yet provided.
>
> I am trying to run flink in IntelliJ and created the project using the
> maven archetype as mentioned in the docs and added flink-azure-fs-hadoop
>  Dependency
> Kindly let me know what I am missing also Is there a boiler plate code I
> can refer to?
> Thank you.
> Regards,
> Sudhanva
>
>


Re: Support for Microseconds in Avro Deserialization

2021-08-02 Thread Robert Metzger
Hey Joe,
thanks a lot for reaching out regarding this.
I have no explanation for why this exists, but since there's not ticket
about this yet, I filed one:
https://issues.apache.org/jira/browse/FLINK-23589
I also pinged some committers who can hopefully provide some
additional context.

I would propose to continue the discussion in Jira!

Thanks again!


On Mon, Aug 2, 2021 at 3:17 PM Joseph Lorenzini 
wrote:

> Hi all,
>
>
>
> The avro specification supports microseconds and reviewing the source code
> in org.apache.avro.LogicalTypes seems to indicate microsecond support.
> However, the conversion code in flink (see
> org.apache.flink.formats.avro.typeutils.AvroSchemaConverter#convertToSchema)
> has this checked:
>
>
>
> if (precision <= 3) {
>
> avroLogicalType = LogicalTypes.timestampMillis();
>
> } else {
>
> throw new IllegalArgumentException(
>
> "Avro does not support TIMESTAMP type "
>
> + "with precision: "
>
> + precision
>
> + ", it only supports precision less
> than 3.");
>
> }
>
>
>
> So it seems that flink only supports managing avro timestamps with at most
> millisecond precision. Does someone have a brief explanation about why this
> limitation exists? Depending on how complicated it is, I’d be willing to
> submit a PR to add that support in.
>
>
>
> Thanks,
>
> Joe
> Privileged/Confidential Information may be contained in this message. If
> you are not the addressee indicated in this message (or responsible for
> delivery of the message to such person), you may not copy or deliver this
> message to anyone. In such case, you should destroy this message and kindly
> notify the sender by reply email. Please advise immediately if you or your
> employer does not consent to Internet email for messages of this kind.
> Opinions, conclusions and other information in this message that do not
> relate to the official business of my firm shall be understood as neither
> given nor endorsed by it.
>


Re: OOM Metaspace after multiple jobs

2021-07-20 Thread Robert Metzger
Hi Alexis,

I hope I'm not stating the obvious, but have you checked this documentation
page:
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code
In particular the shutdown hooks we've introduced in Flink 1.13 could be
helpful for you (this is an example of how we use the hooks with the
Kinesis connector, which also produced leaks:
https://github.com/apache/flink/pull/14372/files)

Also, check this out:
https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks

(FYI: I won't be able to follow up on this thread for a while, because I'm
going on vacation soon)


On Fri, Jul 16, 2021 at 9:24 AM Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> wrote:

> Since I'm running in a container, I was able to copy some of the jars to
> Flink's lib folder. When it comes to gRPC, I don't know if there's any
> other good option due to possible issues with ThreadLocals:
> https://github.com/grpc/grpc-java/issues/8309
>
>
>
> Even then, I’m not sure that’s a complete solution. I added a class (in
> the lib folder) that logs loaded/unloaded class counts with
> ClassLoadingMXBean, and even though the number of classes loaded increases
> more slowly with each job, it still increases. In a heapdump I took before
> moving jars to /lib, I could see multiple instances (one per job, it seems)
> of some of my job’s classes (e.g. sources), and their GC roots were the
> Flink User Class Loader. I haven’t figured out why they would remain across
> different jobs.
>
>
>
> Regards,
>
> Alexis.
>
>
> --
>
> *From:* Alexis Sarda-Espinosa 
> *Sent:* Thursday, July 8, 2021 12:51 AM
> *To:* user@flink.apache.org 
> *Subject:* Re: OOM Metaspace after multiple jobs
>
>
>
> I now see there have been problems with this in the past:
>
>
>
> https://issues.apache.org/jira/browse/FLINK-16142
>
> https://issues.apache.org/jira/browse/FLINK-19005
>
>
>
> I actually use both JDBC and gRPC, so it seems this could indeed be an
> issue for me. Does anyone know if I can ensure my classes get cleaned up?
> In this scenario only my jobs would be running in the cluster, so I can
> have a bit more control.
>
>
>
> Regards,
>
> Alexis.
>
>
> --
>
> *From:* Alexis Sarda-Espinosa 
> *Sent:* Thursday, July 8, 2021 12:14 AM
> *To:* user@flink.apache.org 
> *Subject:* OOM Metaspace after multiple jobs
>
>
>
> Hello,
>
>
>
> I am currently testing a scenario where I would run the same job multiple
> times in a loop with different inputs each time. I am testing with a local
> Flink cluster v1.12.4. I initially got an OOM - Metaspace error, so I
> increased the corresponding memory in the TM's JVM (to 512m), but it still
> fails sometimes.
>
>
>
> I found this issue that talked about Python jobs:
> https://issues.apache.org/jira/browse/FLINK-20333, but there is a comment
> there saying that it would also affect Java jobs. The commit linked there
> seems to be concerned with Python only. Was this also fixed in 1.12.0 for
> Java?
>
>
>
> Is there anything I could do to force a more thorough class loader cleanup
> after each call to execute() ?
>
>
>
> Regards,
>
> Alexis.
>
>
>
>
>


Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-20 Thread Robert Metzger
+1 to this change!

When I was working on the reactive mode blog post [1] I also ran into this
issue, leading to a poor "out of the box" experience when scaling down.
For my experiments, I've chosen a timeout of 8 seconds, and the cluster has
been running for 76 days (so far) on Kubernetes.
I also consider this change somewhat low-risk, because we can provide a
quick fix for people running into problems.

[1]https://flink.apache.org/2021/05/06/reactive-mode.html


On Fri, Jul 16, 2021 at 7:05 PM Till Rohrmann  wrote:

> Hi everyone,
>
> Since Flink 1.5 we have the same heartbeat timeout and interval default
> values that are defined as heartbeat.timeout: 50s and heartbeat.interval:
> 10s. These values were mainly chosen to compensate for lengthy GC pauses
> and blocking operations that were executed in the main threads of Flink's
> components. Since then, there were quite some advancements wrt the JVM's
> GCs and we also got rid of a lot of blocking calls that were executed in
> the main thread. Moreover, a long heartbeat.timeout causes long recovery
> times in case of a TaskManager loss because the system can only properly
> recover after the dead TaskManager has been removed from the scheduler.
> Hence, I wanted to propose to change the timeout and interval to:
>
> heartbeat.timeout: 15s
> heartbeat.interval: 3s
>
> Since there is no perfect solution that fits all use cases, I would really
> like to hear from you what you think about it and how you configure these
> heartbeat options. Based on your experience we might actually come up with
> better default values that allow us to be resilient but also to detect
> failed components fast. FLIP-185 can be found here [1].
>
> [1] https://cwiki.apache.org/confluence/x/GAoBCw
>
> Cheers,
> Till
>


Re: Some question of RocksDB state backend on ARM os

2021-07-20 Thread Robert Metzger
I guess this is unlikely, because nobody is working on the mentioned
tickets.

I mentioned your request in the ticket, to raise awareness again, but I
would still consider it unlikely.

On Tue, Jul 20, 2021 at 1:57 PM Wanghui (HiCampus) 
wrote:

> Hi Robert:
>
>  Thank you for your reply.
>
> Will this feature be released in version 1.14?
>
> Best,
>
> Hui
>
> *发件人:* Robert Metzger [mailto:rmetz...@apache.org]
> *发送时间:* 2021年7月20日 19:45
> *收件人:* Wanghui (HiCampus) 
> *抄送:* user@flink.apache.org
> *主题:* Re: Some question of RocksDB state backend on ARM os
>
>
>
> The RocksDB version provided by Flink does not currently run on ARM.
>
>
>
> However, there are some efforts / hints:
> - https://stackoverflow.com/a/44573013/568695
>
> - https://issues.apache.org/jira/browse/FLINK-13448
>
> - https://issues.apache.org/jira/browse/FLINK-13598
>
>
>
> I would recommend voting and commenting on
> https://issues.apache.org/jira/browse/FLINK-13598 to raise awareness.
>
>
>
> On Tue, Jul 20, 2021 at 5:26 AM Wanghui (HiCampus) 
> wrote:
>
> Hi all:
>
>When I use RocksDB  as state backend on an aarch64 system, the
> following error occurs:
>
> 1.  Does the aarch64 system not support rocksdb?
>
> 2.  If not, is there a support plan for later versions of flink?
>
> Caused by: java.lang.Exception: Exception while creating
> StreamOperatorStateContext.
>
>  at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)
>
>  at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>
>  at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
>
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>
>  at java.lang.Thread.run(Thread.java:748)
>
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for
> KeyedProcessOperator_ae33e81d863e4093619373d1e1f77012_(1/1) from any of the
> 1 provided restore options.
>
>  at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>
>  at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:335)
>
>  at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:148)
>
>  ... 9 more
>
> Caused by: java.io.IOException: Could not load the native RocksDB library
>
>  at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:948)
>
>  at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:489)
>
>  at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:319)
>
>  at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>
>  at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>
>  ... 11 more
>
> Caused by: java.lang.UnsatisfiedLinkError:
> /tmp/rocksdb-lib-bd8659305e92a27fac27481baf57897b/librocksdbjni-linux64.so:
> /tmp/rocksdb-lib-bd8659305e92a27fac27481baf57897b/librocksdbjni-linux64.so:
> cannot open shared object file: No such file or directory (Possible cause:
> can't load AMD 64-bit .so on a AARCH64-bit platform)
>
>  at java.lang.ClassLoader$NativeLibrary.load(Native Method)
>
>  at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1934)
>
>  at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1817)
>
>  at java.lang.Runtime.load0(Runtime.java:810)
>
>  at java.lang.System.load(System.java:1088)
>
>  at
> org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
>
>  at
> org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
>
>  at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:923)
>
>  ... 15 more
>
> Best regards
>
> Hui Wang
>


Re: Topic assignment across Flink Kafka Consumer

2021-07-20 Thread Robert Metzger
Hi Prasanna,
which Flink version and Kafka connector are you using? (the "KafkaSource"
or "FlinkKafkaConsumer"?)

The partition assignment for the FlinkKafkaConsumer is defined here:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionAssigner.java#L27-L43


I assume all your topics have one partition only. Still, the "startIndex"
should be determined based on the hash of the topic name. My only
explanation is that your unlucky with the distribution of the hashes.
If this leads to performance issues, consider using topics with multiple
partitions, change the name of the topics or increase the parallelism of
your consumer.




On Tue, Jul 20, 2021 at 7:53 AM Prasanna kumar <
prasannakumarram...@gmail.com> wrote:

> Hi,
>
> We have a Flink job reading from multiple Kafka topics based on a regex
> pattern.
>
> What we have found out is that the topics are not shared between the kafka
> consumers in an even manner .
>
> Example if there are 8 topics and 4 kafka consumer operators . 1
> consumer is assigned 6 topics , 2 consumers assigned 1 each and the last
> consumer is not assigned at all.
>
> This leads to inadequate usage of the resources.
>
> I could not find any setting/configuration which would make them as even
> as possible.
>
> Let me know if there's a way to do the same.
>
> Thanks,
> Prasanna.
>


Re: Some question of RocksDB state backend on ARM os

2021-07-20 Thread Robert Metzger
The RocksDB version provided by Flink does not currently run on ARM.

However, there are some efforts / hints:
- https://stackoverflow.com/a/44573013/568695
- https://issues.apache.org/jira/browse/FLINK-13448
- https://issues.apache.org/jira/browse/FLINK-13598

I would recommend voting and commenting on
https://issues.apache.org/jira/browse/FLINK-13598 to raise awareness.

On Tue, Jul 20, 2021 at 5:26 AM Wanghui (HiCampus) 
wrote:

> Hi all:
>
>When I use RocksDB  as state backend on an aarch64 system, the
> following error occurs:
>
> 1.  Does the aarch64 system not support rocksdb?
>
> 2.  If not, is there a support plan for later versions of flink?
>
> Caused by: java.lang.Exception: Exception while creating
> StreamOperatorStateContext.
>
>  at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)
>
>  at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>
>  at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
>
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>
>  at java.lang.Thread.run(Thread.java:748)
>
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for
> KeyedProcessOperator_ae33e81d863e4093619373d1e1f77012_(1/1) from any of the
> 1 provided restore options.
>
>  at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>
>  at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:335)
>
>  at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:148)
>
>  ... 9 more
>
> Caused by: java.io.IOException: Could not load the native RocksDB library
>
>  at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:948)
>
>  at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:489)
>
>  at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:319)
>
>  at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>
>  at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>
>  ... 11 more
>
> Caused by: java.lang.UnsatisfiedLinkError:
> /tmp/rocksdb-lib-bd8659305e92a27fac27481baf57897b/librocksdbjni-linux64.so:
> /tmp/rocksdb-lib-bd8659305e92a27fac27481baf57897b/librocksdbjni-linux64.so:
> cannot open shared object file: No such file or directory (Possible cause:
> can't load AMD 64-bit .so on a AARCH64-bit platform)
>
>  at java.lang.ClassLoader$NativeLibrary.load(Native Method)
>
>  at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1934)
>
>  at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1817)
>
>  at java.lang.Runtime.load0(Runtime.java:810)
>
>  at java.lang.System.load(System.java:1088)
>
>  at
> org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
>
>  at
> org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
>
>  at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:923)
>
>  ... 15 more
>
> Best regards
>
> Hui Wang
>


Re: Flink RocksDB Performance

2021-07-20 Thread Robert Metzger
Your understanding of the problem is correct -- the serialization cost is
the reason for the high CPU usage.

What you can also try to optimize is the serializers you are using (by
using data types that are efficient to serialize). See also this blog post:
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html

On Fri, Jul 16, 2021 at 12:02 PM Vijay Bhaskar 
wrote:

> Yes absolutely. Unless we need a very large state order of GB rocks DB is
> not required. RocksDB is good only because the Filesystem is very bad at
> LargeState. In other words FileSystem performs much better than RocksDB
> upto GB's. After that the file system degrades compared to RocksDB. Its not
> that RocksDB is performing better
>
> Regards
> Bhaskar
>
> On Fri, Jul 16, 2021 at 3:24 PM Zakelly Lan  wrote:
>
>> Hi Li Jim,
>> Filesystem performs much better than rocksdb (by multiple times), but it
>> is only suitable for small states. Rocksdb will consume more CPU on
>> background tasks, cache management, serialization/deserialization and
>> compression/decompression. In most cases, performance of the Rocksdb will
>> meet the need.
>> For tuning, please check
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/large_state_tuning/#tuning-rocksdb
>> Moreover, you could leverage some tools such as the async-profiler(
>> https://github.com/jvm-profiling-tools/async-profiler) to figure out
>> which part consumes the most CPU.
>>
>> On Fri, Jul 16, 2021 at 3:19 PM Li Jim  wrote:
>>
>>> Hello everyone,
>>> I am using Flink 1.13.1 CEP Library and doing some pressure test.
>>> My message rate is about 16000 records per second.
>>> I find that it cant process more than 16000 records per second because
>>> the CPU cost is up to 100%(say 800% because I allocated 8 vcores to a
>>> taskmanager).
>>> I tried switch to filesystem mode, it gtt faster and cpu cost goes low.
>>> I understand this may because of serialization/deserialization cost in
>>> rocksdb, but in some reason we must use rocksdb as state backend.
>>> Any suggestion to optimize this issue?
>>>
>>>
>>>
>>>
>>>


Re: Subpar performance of temporal joins with RocksDB backend

2021-07-20 Thread Robert Metzger
Are you using remote disks for rocksdb? (I guess that's EBS on AWS) Afaik
there are usually limitations wrt to the IOPS you can perform.

I would generally recommend measuring where the bottleneck is coming from.
It could be that your CPUs are at 100%, then adding more machines / cores
will help (make sure that all CPU cores are in use by setting the
parallelism >= cores). -- with the rocksdb statebackend, Flink needs to
serialize all records. That's not necessary with the heap backend, because
the data is on the heap.
Or your bottleneck is the EBS / disk storage, where the bandwidth / IOPS is
at its limit.


On Mon, Jul 19, 2021 at 4:22 PM Adrian Bednarz 
wrote:

> Thanks Maciej, I think this has helped a bit. We are now at 2k/3k eps on a
> single node. Now, I just wonder if this isn't too slow for a single node
> and such a simple query.
>
> On Sat, Jul 10, 2021 at 9:28 AM Maciej Bryński  wrote:
>
>> Could you please set 2 configuration options:
>> - state.backend.rocksdb.predefined-options =
>> SPINNING_DISK_OPTIMIZED_HIGH_MEM
>> - state.backend.rocksdb.memory.partitioned-index-filters = true
>>
>> Regards,
>> Maciek
>>
>> sob., 10 lip 2021 o 08:54 Adrian Bednarz 
>> napisał(a):
>> >
>> > I didn’t tweak any RocksDB knobs. The only thing we did was to increase
>> managed memory to 12gb which was supposed to help RocksDB according to the
>> documentation. The rest stays at the defaults. Incremental checkpointing
>> was enabled as well but it made no difference in performance if we disabled
>> it.
>> >
>> > On Fri, 9 Jul 2021 at 20:43, Maciej Bryński  wrote:
>> >>
>> >> Hi Adrian,
>> >> Could you share your state backend configuration ?
>> >>
>> >> Regards,
>> >> Maciek
>> >>
>> >> pt., 9 lip 2021 o 19:09 Adrian Bednarz 
>> napisał(a):
>> >> >
>> >> > Hello,
>> >> >
>> >> > We are experimenting with lookup joins in Flink 1.13.0.
>> Unfortunately, we unexpectedly hit significant performance degradation when
>> changing the state backend to RocksDB.
>> >> >
>> >> > We performed tests with two tables: fact table TXN and dimension
>> table CUSTOMER with the following schemas:
>> >> >
>> >> > TXN:
>> >> >  |-- PROD_ID: BIGINT
>> >> >  |-- CUST_ID: BIGINT
>> >> >  |-- TYPE: BIGINT
>> >> >  |-- AMOUNT: BIGINT
>> >> >  |-- ITEMS: BIGINT
>> >> >  |-- TS: TIMESTAMP(3) **rowtime**
>> >> >  |-- WATERMARK FOR TS: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS
>> >> >
>> >> > CUSTOMER:
>> >> >  |-- ID: BIGINT
>> >> >  |-- STATE: BIGINT
>> >> >  |-- AGE: BIGINT
>> >> >  |-- SCORE: DOUBLE
>> >> >  |-- PRIMARY KEY: ID
>> >> >
>> >> > And the following query:
>> >> > select state, sum(amount) from txn t JOIN customer FOR SYSTEM_TIME
>> AS OF t.ts ON t.cust_id = customer.id group by state, TUMBLE(t.ts,
>> INTERVAL '1' SECOND)
>> >> >
>> >> > In our catalog, we reconfigured the customer table so that the
>> watermark is set to infinity on that side of the join. We generate data in
>> a round robin fashion (except for timestamp that grows with a step of 1 ms).
>> >> >
>> >> > We performed our experiments on a single c5.4xlarge machine with
>> heap and managed memory size set to 12gb with a blackhole sink. With 2 000
>> 000 fact records and 100 000 dimension records, a job with heap backend
>> finishes in 5 seconds whereas RocksDB executes in 1h 24m. For 400 000
>> dimension records it doesn't grow significantly but goes up to 1h 36m (the
>> job processes more records after all).
>> >> >
>> >> > We also checked what would happen if we reduced the amount of
>> customer ids to 1. Our expectation was that RocksDB will not offload
>> anything to disk anymore so the performance should be comparable with heap
>> backend. It was executed in 10 minutes.
>> >> >
>> >> > Is this something anybody experienced or something to be expected?
>> Of course, we assumed RocksDB to perform slower but 300 eps is below our
>> expectations.
>> >> >
>> >> > Thanks,
>> >> > Adrian
>> >>
>> >>
>> >>
>> >> --
>> >> Maciek Bryński
>>
>>
>>
>> --
>> Maciek Bryński
>>
>


Re: multiple jobs in same flink app

2021-06-23 Thread Robert Metzger
Thanks a lot for checking again. I just started Flink in Application mode
with a jar that contains two "executeAsync" submissions, and indeed two
jobs are running.

I think the problem in your case is that you are using High Availability (I
guess, because there are log statements from the
ZooKeeperLeaderRetrievalService). As you can see from the documentation [1]:

The Application Mode allows for multi-execute() applications but
> High-Availability is not supported in these cases. High-Availability in
> Application Mode is only supported for single-execute() applications.


See also: https://issues.apache.org/jira/browse/FLINK-19358

Sorry again that I gave you invalid information in my first answer.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/overview/




On Wed, Jun 23, 2021 at 8:50 AM Qihua Yang  wrote:

> Hi Robert,
>
> But I saw Flink doc shows application mode can run multiple jobs? Or I
> misunderstand it?
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/
>
>
>
> *Compared to the Per-Job mode, the Application Mode allows the submission of 
> applications consisting of multiple jobs. The order of job execution is not 
> affected by the deployment mode but by the call used to launch the job. Using 
> execute(), which is blocking, establishes an order and it will lead to the 
> execution of the "next" job being postponed until "this" job finishes. Using 
> executeAsync(), which is non-blocking, will lead to the "next" job starting 
> before "this" job finishes.*
>
>
> On Tue, Jun 22, 2021 at 11:13 PM Robert Metzger 
> wrote:
>
>> Hi Qihua,
>>
>> Application Mode is meant for executing one job at a time, not multiple
>> jobs on the same JobManager.
>> If you want to do that, you need to use session mode, which allows
>> managing multiple jobs on the same JobManager.
>>
>> On Tue, Jun 22, 2021 at 10:43 PM Qihua Yang  wrote:
>>
>>> Hi Arvid,
>>>
>>> Do you know if I can start multiple jobs for a single flink application?
>>>
>>> Thanks,
>>> Qihua
>>>
>>>
>>> On Thu, Jun 17, 2021 at 12:11 PM Qihua Yang  wrote:
>>>
>>>> Hi,
>>>>
>>>> I am using application mode.
>>>>
>>>> Thanks,
>>>> Qihua
>>>>
>>>> On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise  wrote:
>>>>
>>>>> Hi Qihua,
>>>>>
>>>>> Which execution mode are you using?
>>>>>
>>>>> On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang  wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Thank you for your reply. What I want is flink app has multiple jobs,
>>>>>> each job manage a stream. Currently our flink app has only 1 job that
>>>>>> manage multiple streams.
>>>>>> I did try env.executeAsyc(), but it still doesn't work. From the log,
>>>>>> when the second executeAsync() was called, it shows " *Job
>>>>>>  was recovered successfully.*"
>>>>>> Looks like the second executeAsync() recover the first job. Not start
>>>>>> a second job.
>>>>>>
>>>>>> Thanks,
>>>>>> Qihua
>>>>>>
>>>>>>
>>>>>> On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise  wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> env.execute("Job 1"); is a blocking call. You either have to use
>>>>>>> executeAsync or use a separate thread to submit the second job. If Job 1
>>>>>>> finishes then this would also work by having sequential execution.
>>>>>>>
>>>>>>> However, I think what you actually want to do is to use the same env
>>>>>>> with 2 topologies and 1 single execute like this.
>>>>>>>
>>>>>>> StreamExecutionEnvironment env =
>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>> DataStream stream1 = env.addSource(new
>>>>>>> SourceFunction());
>>>>>>> stream1.addSink(new FlinkKafkaProducer());
>>>>>>> DataStream stream2 = env.addSource(new
>>>>>>> SourceFunction());
>>>>>>> stream2.addSink(new DiscardingSink<>());
>>>>>>> env.execute("Job 1+2");
>>&g

Re: multiple jobs in same flink app

2021-06-23 Thread Robert Metzger
Hi Qihua,

Application Mode is meant for executing one job at a time, not multiple
jobs on the same JobManager.
If you want to do that, you need to use session mode, which allows managing
multiple jobs on the same JobManager.

On Tue, Jun 22, 2021 at 10:43 PM Qihua Yang  wrote:

> Hi Arvid,
>
> Do you know if I can start multiple jobs for a single flink application?
>
> Thanks,
> Qihua
>
>
> On Thu, Jun 17, 2021 at 12:11 PM Qihua Yang  wrote:
>
>> Hi,
>>
>> I am using application mode.
>>
>> Thanks,
>> Qihua
>>
>> On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise  wrote:
>>
>>> Hi Qihua,
>>>
>>> Which execution mode are you using?
>>>
>>> On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang  wrote:
>>>
 Hi,

 Thank you for your reply. What I want is flink app has multiple jobs,
 each job manage a stream. Currently our flink app has only 1 job that
 manage multiple streams.
 I did try env.executeAsyc(), but it still doesn't work. From the log,
 when the second executeAsync() was called, it shows " *Job
  was recovered successfully.*"
 Looks like the second executeAsync() recover the first job. Not start a
 second job.

 Thanks,
 Qihua


 On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise  wrote:

> Hi,
>
> env.execute("Job 1"); is a blocking call. You either have to use
> executeAsync or use a separate thread to submit the second job. If Job 1
> finishes then this would also work by having sequential execution.
>
> However, I think what you actually want to do is to use the same env
> with 2 topologies and 1 single execute like this.
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream stream1 = env.addSource(new
> SourceFunction());
> stream1.addSink(new FlinkKafkaProducer());
> DataStream stream2 = env.addSource(new
> SourceFunction());
> stream2.addSink(new DiscardingSink<>());
> env.execute("Job 1+2");
>
> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang  wrote:
>
>> Hi,
>> Does anyone know how to run multiple jobs in same flink application?
>> I did a simple test.  First job was started. I did see the log
>> message, but I didn't see the second job was started, even I saw the log
>> message.
>>
>> public void testJobs() throws Exception {
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> DataStream stream1 = env.addSource(new
>> SourceFunction());
>> stream1.addSink(new FlinkKafkaProducer());
>> printf("### first job");
>> env.execute("Job 1");
>>
>> env = StreamExecutionEnvironment.getExecutionEnvironment();
>> DataStream stream2 = env.addSource(new
>> SourceFunction());
>> stream2.addSink(new DiscardingSink<>());
>> printf("### second job");
>> env.execute("Job 2");
>> }
>>
>> Here is the log:
>> ### first job
>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
>> Job  is submitted.
>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
>> Submitting Job with JobId=.
>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>> Received JobGraph submission  (job1).
>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>> Submitting job  (job1).
>>
>> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting
>> execution of job job1 () under job master
>> id b03cde9dc2aebdb39c46cda4c2a94c07.
>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting
>> scheduling with scheduling strategy
>> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]
>> INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job
>> job1 () switched from state CREATED to
>> RUNNING.
>>
>> ### second job
>> WARN  com.doordash.flink.common.utils.LoggerUtil  - Class -
>> IndexWriter : ### second job
>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Resolved
>> ResourceManager address, beginning registration
>> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> Starting ZooKeeperLeaderRetrievalService
>> /leader//job_manager_lock.
>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
>> Registering job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp://
>> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job
>> .
>> 

Re: Flink parameter configuration does not take effect

2021-06-17 Thread Robert Metzger
Hi Jason,

I hope you don't mind that I brought back the conversation to the user@
mailing list, so that others can benefit from the information as well.

Thanks a lot for sharing your use case. I personally believe that Flink
should support invocations like "flink run -m yarn-cluster
xxx.FlinkStreamSQLDDLJob flink-stream-sql-ddl-1.0.0.jar ./config.json".
There is no fundamental reason why this can not be supported.

The Javadoc about tableEnv.getConfig() mentions that the config is only
about the "runtime behavior":
https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java#L1151
... but I see how this is not clearly defined.

As a short-term fix, I've proposed to clarify in the configuration table
which options are cluster vs job configurations:
https://issues.apache.org/jira/browse/FLINK-22257.

But in the long term, we certainly need to improve the user experience.


On Wed, Jun 16, 2021 at 3:31 PM Jason Lee  wrote:

> Dear Robert,
>
> For tasks running on the cluster, some parameter configurations are
> global, but some parameter configurations need to be customized, such as
> some memory settings of TaskManager. For tasks with different state sizes,
> we need to configure different parameters. These parameters should not  be
> configured in flink-config.yaml. But for the current Flink, these
> parameters cannot be configured through StreamExecutionEnvironment, and
> some parameters are not effective if set through StreamTableEnvironment.
>
> At the same time, Configuration is immutable after the task is started,
> which is correct, but I think some global parameters should also be
> specified in StreamExecutionEnvironment. At present, some parameters of
> checkpoint are also set globally, but they can be set through
> "StreamExecutionEnvironment .getCheckpointConfig().set()", then why can't
> the parameters of TaskManager's memory be set in this way? I think that
> setting the global parameters by "flink run -yD" is the same as setting by
> "StreamExecutionEnvironment". I am not sure if I understand it correctly.
>
> I agree with you. I think we need to specify in the configuration of the
> official document that those parameters are best configured in
> flink-config.yaml. Those parameters can be modified in
> "StreamExecutionEnvironment", and those can only be passed through others
> Modified in the way. I think the document will be clearer for users.
>
> Best,
> Jason
> JasonLee1781
> jasonlee1...@163.com
>
> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1=JasonLee1781=jasonlee1781%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png=%5B%22jasonlee1781%40163.com%22%5D>
> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制
>
> On 06/16/2021 21:04,Jason Lee 
> wrote:
>
> Dear Robert,
>
> Thanks for your answer
>
> Our Flink SQL task is deployed through Per job.
>
> We provide our users with a platform for developing Flink SQL tasks. We
> will write the user's SQL code and configuration parameters into a
> Config.json file. At the same time, we develop a Flink Jar task at the
> bottom to actually execute the user's SQL through the command line. To
> perform this task, for example, the following is our instruction to start a
> Flink SQL task: "flink run -m yarn-cluster xxx.FlinkStreamSQLDDLJob
> flink-stream-sql-ddl-1.0.0.jar ./config.json". In order to facilitate the
> user's personalized configuration parameters, we want to set user
> configuration parameters in the execution environment of the
> FlinkStreamSQLDDLJob class that we have implemented, such as the
> "taskmanager.memory.managed.fraction" parameter, but it is currently
> impossible to configure through the Flink execution environment These
> parameters, because they are not effective, can only be configured by flink
> run -yD.
>
> I think the configuration in the official document states that those
> parameters cannot be set through
> "StreamTableEnvironment.getConfig.getConfiguration().set()", but can only
> be set through flink run -yD or configured in flink-conf.yaml. If the
> current document does not explain it, it will not take effect if you use
> the "StreamTableEnvironment.getConfig.getConfiguration().set()" method to
> set some parameters. In order to increase the use of personalized
> configuration parameters for users, I think these instructions can appear
> in the Configuration of the official document.
>
> Best,
> Jason
>
> JasonLee1781
> jasonlee1...@163.com
>
> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1=JasonLee1

Re: Resource Planning

2021-06-17 Thread Robert Metzger
Hi,

since your state (150gb) seems to fit into memory (700gb), I would
recommend trying the HashMapStateBackend:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/#the-hashmapstatebackend
(unless you know that your state size is going to increase a lot soon).
But I guess you'll have a nice performance improvement.

At the moment I have no idea where else to look for the issue you are
describing, but it seems that there are a few things for you to try out to
optimize the resource allocation.

On Wed, Jun 16, 2021 at 7:23 PM Rommel Holmes 
wrote:

> Hi, Xintong and Robert
>
> Thanks for the reply.
>
> The checkpoint size for our job is 10-20GB since we are doing incremental
> checkpointing, if we do a savepoint, it can be as big as 150GB.
>
> 1) We will try to make Flink instance bigger.
> 2) Thanks for the pointer, we will take a look.
>
> 3) We do have CPU and memory monitoring, when it is backpressure, the CPU
> load increases from 25% to 50% with more spiky shape, but it is not 100%.
> As for memory, we monitored (Heap.Committed - Heap.Used) per host, when
> backpressure happened, the memory on host is still 500MB ish.
>
> What we observed is that when backpressure happened, the read state time
> slowness happened on one of the hosts, and on different task managers on
> this host. The read state time (one metrics we create and measure) on that
> host shoots up, from 0.x ms to 40-60 ms.
>
> We also observed that when this happens, the running compaction time for
> RocksDB on that host gets longer, from 1 minutes to over 2 minutes. other
> hosts are still 1minute ish.
>
> We also observed that when this happens, size of the active and unflushed
> immutable memtables metrics increased not as fast as before the
> backpressure.
>
> I can provide more context if you are interested. We are still debugging
> on this issue.
>
> Rommel
>
>
>
>
>
> On Wed, Jun 16, 2021 at 4:25 AM Robert Metzger 
> wrote:
>
>> Hi Thomas,
>>
>> My gut feeling is that you can use the available resources more
>> efficiently.
>>
>> What's the size of a checkpoint for your job (you can see that from the
>> UI)?
>>
>> Given that your cluster has has an aggregate of 64 * 12 = 768gb of memory
>> available, you might be able to do everything in memory (I might be off by
>> a few terabytes here, it all depends on your state size ;) )
>>
>> 1. In my experience, it is usually more efficient to have a few large
>> Flink instances than many small ones. Maybe try to run 12 TaskManagers (or
>> 11 to make the JM fit) with 58gb of memory (the JM can stick to the 7gb)
>> and see how Flink behaves.
>>
>> 2. I'd say it's a try and see process, with a few educated guesses. Maybe
>> check out this:
>> https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines
>> to get some inspiration for making some "back of the napkin" calculations
>> on the sizing requirements.
>>
>> 3. Do you have some monitoring of CPU / memory / network usage in place?
>> It would be interesting to see what the mentrics look like when
>> everything is ok vs when the job is backpressured.
>>
>> Best,
>> Robert
>>
>>
>> On Wed, Jun 16, 2021 at 3:56 AM Xintong Song 
>> wrote:
>>
>>> Hi Thomas,
>>>
>>> It would be helpful if you can provide the jobmanager/taskmanager logs,
>>> and gc logs if possible.
>>>
>>> Additionally, you may consider to monitor the cpu/memory related metrics
>>> [1], see if there's anything abnormal when the problem is observed.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/metrics.html
>>>
>>>
>>>
>>> On Wed, Jun 16, 2021 at 8:11 AM Thomas Wang  wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm trying to see if we have been given enough resources (i.e. CPU and
>>>> memory) to each task node to perform a deduplication job. Currently, the
>>>> job is not running very stable. What I have been observing is that after a
>>>> couple of days run, we will suddenly see backpressure happen on one
>>>> arbitrary ec2 instance in the cluster and when that happens, we will have
>>>> to give up the current state and restart the job with an empty state. We
>>>> can no longer take savepoint as it would timeout after 10 minutes, which is
>>>> understandable.
>>>>
>>>> Additional Observations
&g

Re: RocksDB CPU resource usage

2021-06-17 Thread Robert Metzger
If you are able to execute your job locally as well (with enough data), you
can also run it with a profiler and see the CPU cycles spent on
serialization (you can also use RocksDB locally)

On Wed, Jun 16, 2021 at 3:51 PM Padarn Wilson  wrote:

> Thanks Robert. I think it would be easy enough to test this hypothesis by
> making the same comparison with some simpler state inside the aggregation
> window.
>
> On Wed, 16 Jun 2021, 7:58 pm Robert Metzger,  wrote:
>
>> Depending on the datatypes you are using, seeing 3x more CPU usage seems
>> realistic.
>> Serialization can be quite expensive. See also:
>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
>> Maybe it makes sense to optimize there a bit.
>>
>> On Tue, Jun 15, 2021 at 5:23 PM JING ZHANG  wrote:
>>
>>> Hi Padarn,
>>> After switch stateBackend from filesystem to rocksdb, all reads/writes
>>> from/to backend have to go through de-/serialization to retrieve/store the
>>> state objects, this may cause more cpu cost.
>>> But I'm not sure it is the main reason leads to 3x CPU cost in your job.
>>> To find out the reason, we need more profile on CPU cost, such as Flame
>>> Graphs. BTW, starting with Flink 1.13, Flame Graphs are natively supported
>>> in Flink[1].
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/
>>>
>>> Best,
>>> JING ZHANG
>>>
>>> Padarn Wilson  于2021年6月15日周二 下午5:05写道:
>>>
>>>> Hi all,
>>>>
>>>> We have a job that we just enabled rocksdb on (instead of file
>>>> backend), and see that the CPU usage is almost 3x greater on (we had to
>>>> increase taskmanagers 3x to get it to run.
>>>>
>>>> I don't really understand this, is there something we can look at to
>>>> understand why CPU use is so high? Our state mostly consists of aggregation
>>>> windows.
>>>>
>>>> Cheers,
>>>> Padarn
>>>>
>>>


Re: Flink PrometheusReporter support for HTTPS

2021-06-16 Thread Robert Metzger
It seems like the PrometheusReporter doesn't support HTTPS.

The Flink reporter seems to be based on the HttpServer prometheus client. I
wonder if using the servlet client would allow us to add HTTPS support:
https://github.com/prometheus/client_java/blob/master/simpleclient_servlet/src/main/java/io/prometheus/client/exporter/MetricsServlet.java
/ https://github.com/prometheus/client_java#http
Running the Servlet inside an SSL enabled Jetty should do the trick.

If this is reliable, you could consider contributing this back to Flink.

On Sun, Jun 13, 2021 at 4:27 PM Ashutosh Uttam 
wrote:

> Hi Austin,
>
> I am deploying Flink on K8s with multiple Job Manager pods (For HA)  &
> Task Manager pods.
>
> Each JobManager & Task Manager are running an PrometheusReporter instance
> and using Prometheus’ service discovery support for Kubernetes to discover
> all pods (Job Manager & Task Manager) and expose the container as targets
>
> Please let me know if a reverse proxy can work on this deployment as we
> have multiple JMs & TMs and cannot use static scrape targets
>
> Regards,
> Ashutosh
>
> On Sun, Jun 13, 2021 at 2:25 AM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hi Ashutosh,
>>
>> How are you deploying your Flink apps? Would running a reverse proxy like
>> Nginx or Envoy that handles the HTTPS connection work for you?
>>
>> Best,
>> Austin
>>
>> On Sat, Jun 12, 2021 at 1:11 PM Ashutosh Uttam 
>> wrote:
>>
>>> Hi All,
>>>
>>> Does PrometheusReporter provide support for HTTPS?. I couldn't find any
>>> information in flink documentation.
>>>
>>> Is there any way we can achieve the same?
>>>
>>> Thanks & Regards,
>>> Ashutosh
>>>
>>>
>>>


Re: Confusions and suggestions about Configuration

2021-06-16 Thread Robert Metzger
Note to others on this mailing list. This email has also been sent with the
subject "Flink parameter configuration does not take effect" to this list.
I replied there, let's also discuss there.

On Tue, Jun 15, 2021 at 7:39 AM Jason Lee  wrote:

> Hi  everyone,
>
> When I was researching and using Flink recently, I found that the official
> documentation on how to configure parameters is confusing, mainly as
> follows:
>
> 1. In the Configuration module of the official document, the description
> and default value of each parameter are introduced. This is very clear, but
> it does not introduce the description of how those parameters can be
> configured. For example, when we use and develop Flink SQL tasks, we
> usually need to configure different parameters for each task for different
> Flink SQL tasks, such as taskmanager.memory.managed.fraction, but through
> the documentation I may only know that it can It is configured through the
> flink-conf.yaml file, but the parameter configuration can also be
> configured through flink run -yD taskmanager.memory.managed.fraction=0.45.
> I feel that this method can be described in the official document.
>
> 2. In addition, we usually use a DDL Jar package to execute Flink SQL
> tasks, but we found that some parameters are set by
> StreamTableEnvironment.getConfig().getConfiguration().setXXX(key, value).
> These parameters cannot take effect. For example,
> taskmanager.memory.managed.fraction cannot take effect if the parameter is
> set in the above way (the Note in TableConfig in the source code is as
> follows: Because options are read at different point in time when
> performing operations, it is recommended to set configuration options early
> after instantiating a table environment. ). And
> StreamExecutionEnvironment.getConfiguration() is protected, which leads to
> some parameters that cannot be set through the api. I feel that this is not
> reasonable. Because sometimes, we want to configure different parameters
> for different tasks in the form of Configuration.setxxx(key, value) in the
> api, instead of just configuring parameters through flink run -yD.
>
> In summary, for some normal tasks we can use the default parameter
> configuration, but for some tasks that require personalized configuration,
> especially Flink SQL tasks, I have a few suggestions on the use of
> configuration:
>
> 1. In the official document, I think it is necessary to add instructions
> on how to configure these parameters. For example, it can be configured not
> only in flink-conf.yaml, but also in the running command through flink run
> -yD, or whether there are other The parameters can be configured in the
> mode.
>
> 2. Regarding the api, I think that
> StreamTableEnvironment.getConfig().getConfiguration().setXXX(key, value)
> configures parameters in this way. It should be separately explained, which
> parameters are not effective if configured in this way, otherwise, Some
> parameters configured in this way will not take effect, which will cause
> confusion for users.
>
> 3. Questions about StreamExecutionEnvironment.getConfiguration() being
> protected. Will the community develop in later versions? Is there any
> effective way for users to set some parameters in the api and make them
> effective, such as configuring the taskmanager.memory.managed.fraction
> parameter.
>
> Regarding some of the above issues, maybe I did not describe it clearly
> enough, or because I did not understand the problem clearly, I hope to get
> a reply from the community.
>
> Best,
> Jason
> JasonLee1781
> jasonlee1...@163.com
>
> 
> 签名由 网易邮箱大师  定制
>
>


Re: Save state on a CoGroupFunction and recover it after a failure

2021-06-16 Thread Robert Metzger
Hi Felipe,

Which data source are you using?

> Then, in the MyCoGroupFunction there are only events of stream02

Are you storing events in your state?

> Is this the case where I have to use RichCoGroupFunction and save the
state by implementing the CheckpointedFunction?

If you want your state to be persisted with each checkpoint, and recovered
after a failure, ye .

On Tue, Jun 15, 2021 at 6:18 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi,
>
> I have a problem on my stream pipeline where the events on a
> CoGroupFunction are not restored after the application crashes. The
> application is like this:
>
> stream01.coGroup(stream02)
> .where(...).equalTo(...)
> .window(TumblingEventTimeWindows.of(1 minute))
> .apply(new MyCoGroupFunction())
> .process(new MyProcessFunction())
> .sink(new MySinkFunction)
>
> The checkpoint is configured to 20 seconds and the window is of 1 minute.
> I follow this sequence to reproduce the error:
> 1 - send 6 events to stream01
> 2 - after 25 seconds I send an event to make the application crash
> 3 - at this meantime the application recovers
> 4 - after 25 seconds I send 6 events to stream02
>
> Then, in the MyCoGroupFunction there are only events of stream02. Is this
> the case where I have to use RichCoGroupFunction and save the state by
> implementing the CheckpointedFunction? I am confused because
> the CoGroupFunction.coGroup() method is called only when the Window closes
> and then I see the output stream events of this operator. That is when
> the Collector.collect() is called.
>
> What I think is that the events are held in memory and when the window
> closes the CoGroupFunction.coGroup() is called. So I have to snapshot the
> state in an operator before the CoGroupFunction. Is that correct? In case
> anyone have a toy example of it (CoGroupFunction with Checkpoint and
> testing it in a unit test) could you please send me the link?
>
> Thanks,
> Felipe
>
>


Re: TypeInfo issue with Avro SpecificRecord

2021-06-16 Thread Robert Metzger
Thanks a lot for sharing the solution on the mailing list and in the
ticket.

On Tue, Jun 15, 2021 at 11:52 AM Patrick Lucas 
wrote:

> Alright, I figured it out—it's very similar to FLINK-13703, but instead of
> having to do with immutable fields, it's due to use of the Avro Gradle
> plugin option `gettersReturnOptional`.
>
> With this option, the generated code uses Optional for getters, but it's
> particularly useful with the option `optionalGettersForNullableFieldsOnly`.
> The presence of Optional-returning getters causes Flink's POJO analyzer to
> return null.
>
> I didn't run into this previously because I used both options never had
> nullable fields in my schemas!
>
> I don't suppose this would be considered a bug, but I'll leave a comment
> on the above issue.
>
> --
> Patrick Lucas
>
>
> On Mon, Jun 14, 2021 at 5:06 PM Patrick Lucas 
> wrote:
>
>> Hi,
>>
>> I have read [1] when it comes to using Avro for serialization, but I'm
>> stuck with a mysterious exception when Flink is doing type resolution.
>> (Flink 1.13.1)
>>
>> Basically, I'm able to use a SpecificRecord type in my source, but I am
>> unable to use different SpecificRecord types later in the pipeline, getting
>> an exception "Expecting type to be a PojoTypeInfo" from AvroTypeInfo[2].
>>
>> Let's say I have a schema "Foo" with one field "foo" of type "Bar", and
>> schema "Bar" with one field "message" of type "string". My input data is a
>> single Foo record of the form {"foo": {"message": "hi"}}.
>>
>> This works:
>>
>> env.fromElements(myInput).print();
>>
>> But this does not:
>>
>> env.fromElements(myInput).map(foo -> (Bar) foo.getFoo()).print();
>>
>> (nor does it work if I use a full MapFunction)
>>
>> Does anyone know what I might be running into here? If necessary, I can
>> put together a full reproducing.
>>
>> [1]
>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro
>> [2]
>> https://github.com/apache/flink/blob/release-1.13.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java#L72
>>
>> Thanks,
>> Patrick
>>
>


Re: RocksDB CPU resource usage

2021-06-16 Thread Robert Metzger
Depending on the datatypes you are using, seeing 3x more CPU usage seems
realistic.
Serialization can be quite expensive. See also:
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
Maybe it makes sense to optimize there a bit.

On Tue, Jun 15, 2021 at 5:23 PM JING ZHANG  wrote:

> Hi Padarn,
> After switch stateBackend from filesystem to rocksdb, all reads/writes
> from/to backend have to go through de-/serialization to retrieve/store the
> state objects, this may cause more cpu cost.
> But I'm not sure it is the main reason leads to 3x CPU cost in your job.
> To find out the reason, we need more profile on CPU cost, such as Flame
> Graphs. BTW, starting with Flink 1.13, Flame Graphs are natively supported
> in Flink[1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/
>
> Best,
> JING ZHANG
>
> Padarn Wilson  于2021年6月15日周二 下午5:05写道:
>
>> Hi all,
>>
>> We have a job that we just enabled rocksdb on (instead of file backend),
>> and see that the CPU usage is almost 3x greater on (we had to increase
>> taskmanagers 3x to get it to run.
>>
>> I don't really understand this, is there something we can look at to
>> understand why CPU use is so high? Our state mostly consists of aggregation
>> windows.
>>
>> Cheers,
>> Padarn
>>
>


Re: Please advise bootstrapping large state

2021-06-16 Thread Robert Metzger
Hi Marco,

The DataSet API will not run out of memory, as it spills to disk if the
data doesn't fit anymore.
Load is distributed by partitioning data.

Giving you advice depends a bit on the use-case. I would explore two major
options:
a) reading the data from postgres using Flink's SQL JDBC connector [1]. 200
GB is not much data. A 1gb network link needs ~30 minutes to transfer that
(125 megabytes / second)
b) Using the DataSet API and state processor API. I would first try to see
how much effort it is to read the data using the DataSet API (could be less
convenient than the Flink SQL JDBC connector).

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/


On Wed, Jun 16, 2021 at 6:50 AM Marco Villalobos 
wrote:

> I must bootstrap state from postgres (approximately 200 GB of data) and I
> notice that the state processor API requires the DataSet API in order to
> bootstrap state for the Stream API.
>
> I wish there was a way to use the SQL API and use a partitioned scan, but
> I don't know if that is even possible with the DataSet API.
>
> I never used the DataSet API, and I am unsure how it manages memory, or
> distributes load, when handling large state.
>
> Would it run out of memory if I map data from a JDBCInputFormat into a
> large DataSet and then use that to bootstrap state for my stream job?
>
> Any advice on how I should proceed with this would be greatly appreciated.
>
> Thank you.
>


Re: Flink SQL as DSL for flink CEP

2021-06-16 Thread Robert Metzger
Hi Dipanjan,

Using Flink SQL's MATCH_RECOGNIZE operator is certainly a good idea if you
are looking for a non-programmatic way to do CEP with Flink.

On Wed, Jun 16, 2021 at 6:44 AM Dipanjan Mazumder  wrote:

> Hi,
>
> Can we say that Flink SQL is kind of a DSL overlay on flink CEP , i
> mean i need a DSL for flink CEP , so that i can decouple the CEP rules from
> code and pass them dynamically to be applied on different data streams.
> Flink CEP doen't have any DSL implementation , so is it that Flink SQL can
> be used for the same purpose , where flink SQL has integration with Flink
> CEP and underlying flink SQL uses flink CEP for data processing. If that is
> the case can we use flink SQL as a streaming CEP DSL.
>
>
> Regards
> Dipanjan
>


Re: S3 + Parquet credentials issue

2021-06-16 Thread Robert Metzger
Thanks for the logs.

The OK job seems to read from "s3a://test-bucket/", while the KO job reads
from "s3a://bucket-test/". Could it be that you are just trying to access
the wrong bucket?

What I also found interesting from the KO Job TaskManager is this log
message:

Caused by: java.net.NoRouteToHostException: No route to host (Host
unreachable)
at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:1.8.0_171]
at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
~[?:1.8.0_171]
at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
~[?:1.8.0_171]
at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
~[?:1.8.0_171]
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[?:1.8.0_171]
at java.net.Socket.connect(Socket.java:589) ~[?:1.8.0_171]
at sun.net.NetworkClient.doConnect(NetworkClient.java:175) ~[?:1.8.0_171]
at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
~[?:1.8.0_171]
at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
~[?:1.8.0_171]
at sun.net.www.http.HttpClient.(HttpClient.java:242) ~[?:1.8.0_171]
at sun.net.www.http.HttpClient.New(HttpClient.java:339) ~[?:1.8.0_171]
at sun.net.www.http.HttpClient.New(HttpClient.java:357) ~[?:1.8.0_171]
at
sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)
~[?:1.8.0_171]
at
sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1199)
~[?:1.8.0_171]
at
sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)
~[?:1.8.0_171]
at
sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)
~[?:1.8.0_171]
at
com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:52)
~[blob_p-575afa7acc2fe3049b65534303a189df3afe9895-6c71352c89388f6a3754b9b72482e6d2:?]
at
com.amazonaws.internal.EC2ResourceFetcher.doReadResource(EC2ResourceFetcher.java:80)
~[blob_p-575afa7acc2fe3049b65534303a189df3afe9895-6c71352c89388f6a3754b9b72482e6d2:?]

Does your env allow access to all AWS resources?

On Tue, Jun 15, 2021 at 7:12 PM Angelo G.  wrote:

> Thank you Svend  and Till for your help.
>
> Sorry for the the late response.
>
> I'll try to give more information about the issue:
>
> I've not worked exactly in the situation you described, although I've had
>> to configure S3 access from a Flink application recently and here are a
>> couple of things I learnt along the way:
>> * You should normally not need to include flink-s3-fs-hadoop nor
>> hadoop-mapreduce-client-core in your classpath but should rather make
>> flink-s3-fs-hadoop available to Flink by putting it into the plugins
>> folder. The motivation for that is that this jar is a fat jar containing a
>> lot of hadoop and aws classes, s.t. including it in your classpath quickly
>> leads to conflicts. The plugins folder is associated with a separate
>> classpath, with helps avoiding those conflicts.
>>
> *Following your advice I've leave these dependencies out from the pom.
> Thank you for the explanation.*
>
>> * Under the hood, Fink is using the hadoop-aws library to connect to s3
>> => the documentation regarding how to configure it, and especially security
>> accesses, is available in [1]
>>
> *In our case, connection to S3 should be made via access/secret key pair. *
>
>> * Ideally, when running on AWS, your code should not be using
>> BasicAWSCredentialsProvider, but instead the application should assume a
>> role, which you associate with some IAM permission.  If that's your case,
>> the specific documentation for that situation is in [2]. If you're running
>> some test locally on your laptop, BasicAWSCredentialsProvider with some
>> key id and secret pointing to a dev account may be appropriate.
>>
> *Yes, in the Flink documentation is noted that IAM is the recommended way
> to access S3. But I am forced to use secret/access keys.  I'm not
> indicating in the flink-conf.yaml what credentials provider to use, the
> BasicAWSCredentialsProvider seems to be the default provider for Flink. But
> as we will see, this message is shown only when trying to read Parquet
> format. Other formats poses no problem.*
>
>> * As I understand it, any configuration entry in flink.yaml that starts
>> with "fs.s3a" is forwarded by Flink to hadoop-aws (I found that info in
>> [3]) => by reading documentation in [1] and [2] you might be able to figure
>> out which parameters are relevant to your case, which you can then set with
>> the mechanism just mentioned. For example, in my case, I simply add this to
>> flink.yaml:
>
> *My flink-yaml.conf is as follows:*
>
> taskmanager.memory.process.size: 1728m
> taskmanager.numberOfTaskSlots: 1
> parallelism.default: 1
> jobmanager.execution.failover-strategy: region
> fs.s3a.path-style: true
> fs.s3a.region: eu-west-3
> fs.s3a.bucket.testbucket.access.key: 
> fs.s3a.bucket.testbucket.secret.key: 
>
>
>> what Svend has written is very good advice. 

Re: Resource Planning

2021-06-16 Thread Robert Metzger
Hi Thomas,

My gut feeling is that you can use the available resources more efficiently.

What's the size of a checkpoint for your job (you can see that from the
UI)?

Given that your cluster has has an aggregate of 64 * 12 = 768gb of memory
available, you might be able to do everything in memory (I might be off by
a few terabytes here, it all depends on your state size ;) )

1. In my experience, it is usually more efficient to have a few large Flink
instances than many small ones. Maybe try to run 12 TaskManagers (or 11 to
make the JM fit) with 58gb of memory (the JM can stick to the 7gb) and see
how Flink behaves.

2. I'd say it's a try and see process, with a few educated guesses. Maybe
check out this:
https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines
to get some inspiration for making some "back of the napkin" calculations
on the sizing requirements.

3. Do you have some monitoring of CPU / memory / network usage in place?
It would be interesting to see what the mentrics look like when everything
is ok vs when the job is backpressured.

Best,
Robert


On Wed, Jun 16, 2021 at 3:56 AM Xintong Song  wrote:

> Hi Thomas,
>
> It would be helpful if you can provide the jobmanager/taskmanager logs,
> and gc logs if possible.
>
> Additionally, you may consider to monitor the cpu/memory related metrics
> [1], see if there's anything abnormal when the problem is observed.
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/metrics.html
>
>
>
> On Wed, Jun 16, 2021 at 8:11 AM Thomas Wang  wrote:
>
>> Hi,
>>
>> I'm trying to see if we have been given enough resources (i.e. CPU and
>> memory) to each task node to perform a deduplication job. Currently, the
>> job is not running very stable. What I have been observing is that after a
>> couple of days run, we will suddenly see backpressure happen on one
>> arbitrary ec2 instance in the cluster and when that happens, we will have
>> to give up the current state and restart the job with an empty state. We
>> can no longer take savepoint as it would timeout after 10 minutes, which is
>> understandable.
>>
>> Additional Observations
>>
>> When the backpressure happens, we see an increase in our state read time
>> (we are measuring it using a custom metric) from about 0.1 milliseconds to
>> 40-60 milliseconds on that specific problematic ec2 instance. We tried to
>> reboot that ec2 instance, so that the corresponding tasks would be assigned
>> to a different ec2 instance, but the problem persists.
>>
>> However, I’m not sure if this read time increase is a symptom or the
>> cause of the problem.
>>
>> Background about this deduplication job:
>>
>> We are making sessionization with deduplication on an event stream by a
>> session key that is embedded in the event. The throughput of the input
>> stream is around 50k records per second. The after-aggregation output is
>> around 8k records per second.
>>
>> We are currently using RocksDb-backend state with SSD support and in the
>> state, we are storing session keys with a TTL of 1 week. Based on the
>> current throughput, this could become really huge. I assume RocksDB would
>> flush to the disc as needed, but please correct me if I am wrong.
>>
>> Information about the cluster:
>>
>> I'm running on an AWS EMR cluster with 12 ec2 instances (r5d.2xlarge).
>> I'm using Flink 1.11.2 in Yarn session mode. Currently there is only 1 job
>> running in the Yarn session.
>>
>> Questions:
>>
>> 1. Currently, I'm starting the yarn session w/ 7g memory on both the Task
>> Manager and and the Job Manager, so that each Yarn container could get 1
>> CPU. Is this setting reasonable based on your experience?
>>
>> Here is the command I used to start the Yarn cluster:
>>
>> export HADOOP_CLASSPATH=`hadoop classpath` &&
>> /usr/lib/flink/bin/yarn-session.sh -jm 7g -tm 7g --detached
>>
>> 2. Is there a scientific way to tell what's the right amount of resources
>> I should give to an arbitrary job? Or is this a try and see kinda process?
>>
>> 3. Right now, I'm suspecting resources caused the job to run unstably,
>> but I'm not quite sure. Any other potential causes here? How should I debug
>> from here if resources are not the issue? Is there a way to detect memory
>> leaks?
>>
>> Thanks in advance!
>>
>> Thomas
>>
>>


Re: Flink parameter configuration does not take effect

2021-06-16 Thread Robert Metzger
Hi Jason,

How are you deploying your Flink SQL tasks? (are you using
per-job/application clusters, or a session cluster? )

I agree that the configuration management is not optimal in Flink. By
default, I would recommend assuming that all configuration parameters are
cluster settings, which require a cluster restart. Very few options (mostly
those listed in the "Execution" section) are job settings, which can be set
for each job.

Would it help if the table of configuration options in the documentation
would tag the configuration option (with "Cluster" and "Job" option types?)?
Secondly, the API should probably only expose an immutable Configuration
object, if the configuration is effectively immutable. I believe the option
to set configuration on the (Stream)(Table)Environment is mostly there for
local execution of Flink.


2. I agree, the docs are incomplete here (probably another symptom of the
fact that the whole configuration management in Flink is not optimal). I
see what I can do to improve the situation.

3. Except for local execution (everything runs in one JVM), I don't think
we'll add support for this anytime soon. Some of the cluster configuration
parameters just have to be global (like memory management), as they apply
to all jobs executed on a cluster.


This ticket could be related to your problems:
https://issues.apache.org/jira/browse/FLINK-21065

Let us know how you are deploying your Flink jobs, this will shed some more
light on the discussion!

Best,
Robert


On Wed, Jun 16, 2021 at 4:27 AM Jason Lee  wrote:

> Hi  everyone,
>
> When I was researching and using Flink recently, I found that the official
> documentation on how to configure parameters is confusing, and when I set
> the parameters in some ways, it does not take effect. mainly as follows:
>
> we usually use a DDL Jar package to execute Flink SQL tasks, but we found
> that some parameters are set by
> StreamTableEnvironment.getConfig().getConfiguration().setXXX(key, value).
> These parameters cannot take effect. For example,
> taskmanager.memory.managed.fraction cannot take effect if the parameter is
> set in the above way (the Note in TableConfig in the source code is as
> follows: Because options are read at different point in time when
> performing operations, it is recommended to set configuration options early
> after instantiating a table environment. ). And
> StreamExecutionEnvironment.getConfiguration() is protected, which leads to
> some parameters that cannot be set through the api. I feel that this is not
> reasonable. Because sometimes, we want to configure different parameters
> for different tasks in the form of Configuration.setxxx(key, value) in the
> api, instead of just configuring parameters through flink run -yD or
> flink-conf.yaml.
>
> In the Configuration module of the official document, the description and
> default value of each parameter are introduced. There is no relevant
> introduction about the parameter setting method in the official document
> Configuration module. I think this is not friendly enough for users,
> especially users who want to personalize some parameters. I feel that this
> method can be described in the official document.
>
> In summary, for some normal tasks we can use the default parameter
> configuration, but for some tasks that require personalized configuration,
> especially Flink SQL tasks, I have a few suggestions on the use of
> configuration:
>
> 1. Regarding the api, I think that
> StreamTableEnvironment.getConfig().getConfiguration().setXXX(key, value)
> configures parameters in this way. It should be separately explained, which
> parameters are not effective if configured in this way, otherwise, Some
> parameters configured in this way will not take effect, which will cause
> confusion for users.
>
> 2. In the official document, I think it is necessary to add instructions
> on how to configure these parameters. For example, it can be configured not
> only in flink-conf.yaml, but also in the running command through flink run
> -yD, or whether there are other The parameters can be configured in the
> mode.
>
> 3. Questions about StreamExecutionEnvironment.getConfiguration() being
> protected. Will the community develop in later versions? Is there any
> effective way for users to set some parameters in the api and make them
> effective, such as configuring the taskmanager.memory.managed.fraction
> parameter.
>
> Regarding some of the above issues, and why the parameter setting will not
> take effec. Maybe I did not describe it clearly enough, or because I did
> not understand the problem clearly, I hope to get a reply and discuss from
> the community.
>
> Best,
> Jason
>
> 李闯
> jasonlee1...@163.com
>
> 
> 签名由 网易邮箱大师 

Re: flink job exception analysis (netty related, readAddress failed. connection timed out)

2021-06-16 Thread Robert Metzger
Hi Yidan,
it seems that the attachment did not make it through the mailing list. Can
you copy-paste the text of the exception here or upload the log somewhere?



On Wed, Jun 16, 2021 at 9:36 AM yidan zhao  wrote:

> Attachment is the exception stack from flink's web-ui. Does anyone
> have also met this problem?
>
> Flink1.12 - Flink1.13.1.  Standalone Cluster, include 30 containers,
> each 28G mem.
>


Re: flink job exception analysis (netty related, readAddress failed. connection timed out)

2021-06-16 Thread Robert Metzger
Hi Yidan,
it seems that the attachment did not make it through the mailing list. Can
you copy-paste the text of the exception here or upload the log somewhere?



On Wed, Jun 16, 2021 at 9:36 AM yidan zhao  wrote:

> Attachment is the exception stack from flink's web-ui. Does anyone
> have also met this problem?
>
> Flink1.12 - Flink1.13.1.  Standalone Cluster, include 30 containers,
> each 28G mem.
>


Re: [VOTE] Release 1.13.1, release candidate #1

2021-05-28 Thread Robert Metzger
+1 (binding)

- Tried out reactive mode in from the scala 2.11 binary locally (with scale
up & stop with savepoint)
- reviewed website update
- randomly checked a jar file in the staging repo (flink-python jar looks
okay (I just checked superifically))




On Fri, May 28, 2021 at 5:16 AM Leonard Xu  wrote:

>
> +1 (non-binding)
>
> - verified signatures and hashes
> - built from source code with scala 2.11 succeeded
> - started a cluster, WebUI was accessible, ran a window word count job, no
> suspicious log output
> - ran some SQL jobs in SQL Client, the queries result is expected
> - the web PR looks good
>
> Best,
> Leonard
>
>
> > 在 2021年5月28日,10:25,Xingbo Huang  写道:
> >
> > +1 (non-binding)
> >
> > - verified checksums and signatures
> > - built from source code
> > - check apache-flink source/wheel package content
> > - run python udf job
> >
> > Best,
> > Xingbo
> >
> > Dawid Wysakowicz mailto:dwysakow...@apache.org>>
> 于2021年5月27日周四 下午9:45写道:
> > +1 (binding)
> >
> > verified signatures and checksums
> > built from sources and run an example, quickly checked Web UI
> > checked diff of pom.xml and NOTICE files from 1.13.0,
> > there were no version changes,
> > checked the updated licenses of javascript dependencies
> > Best,
> >
> > Dawid
> >
> > On 26/05/2021 11:15, Matthias Pohl wrote:
> >> Hi Dawid,
> >> +1 (non-binding)
> >>
> >> Thanks for driving this release. I checked the following things:
> >> - downloaded and build source code
> >> - verified checksums
> >> - double-checked diff of pom files between 1.13.0 and 1.13.1-rc1
> >> - did a visual check of the release blog post
> >> - started cluster and ran jobs (WindowJoin and WordCount); nothing
> >> suspicious found in the logs
> >> - verified change FLINK-22866 manually whether the issue is fixed
> >>
> >> Best,
> >> Matthias
> >>
> >> On Tue, May 25, 2021 at 3:33 PM Dawid Wysakowicz <
> dwysakow...@apache.org> 
> >> wrote:
> >>
> >>> Hi everyone,
> >>> Please review and vote on the release candidate #1 for the version
> 1.13.1,
> >>> as follows:
> >>> [ ] +1, Approve the release
> >>> [ ] -1, Do not approve the release (please provide specific comments)
> >>>
> >>>
> >>> The complete staging area is available for your review, which includes:
> >>> * JIRA release notes [1],
> >>> * the official Apache source release and binary convenience releases
> to be
> >>> deployed to dist.apache.org  [2], which are
> signed with the key with
> >>> fingerprint 31D2DD10BFC15A2D [3],
> >>> * all artifacts to be deployed to the Maven Central Repository [4],
> >>> * source code tag "release-1.13.1-rc1" [5],
> >>> * website pull request listing the new release and adding announcement
> >>> blog post [6].
> >>>
> >>> The vote will be open for at least 72 hours. It is adopted by majority
> >>> approval, with at least 3 PMC affirmative votes.
> >>>
> >>> Best,
> >>> Dawid
> >>>
> >>> [1]
> >>>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350058
> <
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350058
> >
> >>> [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.13.1-rc1/ <
> https://dist.apache.org/repos/dist/dev/flink/flink-1.13.1-rc1/>
> >>> [3] https://dist.apache.org/repos/dist/release/flink/KEYS <
> https://dist.apache.org/repos/dist/release/flink/KEYS>
> >>> [4]
> >>>
> https://repository.apache.org/content/repositories/orgapacheflink-1422/ <
> https://repository.apache.org/content/repositories/orgapacheflink-1422/>
> >>> [5] https://github.com/apache/flink/tree/release-1.13.1-rc1 <
> https://github.com/apache/flink/tree/release-1.13.1-rc1>
> >>> [6] https://github.com/apache/flink-web/pull/448 <
> https://github.com/apache/flink-web/pull/448>
> >>>
>
>


Re: Choice of time characteristic and performance

2021-05-22 Thread Robert Metzger
Hi Bob,

if you don't need any time characteristics, go with processing time.
Ingestion time will call System.currentTimeMillis() on every incoming
record, which is an somewhat expensive call.
Event time (and ingestion time) will attach a long field to each record,
making the records 8 bytes larger and the serialization a bit more involved.

Hope this helps!

On Fri, May 14, 2021 at 11:54 PM Bob Tiernay  wrote:

> I was wondering if the choice of time characteristic (ingestion, processing
> or event time) makes a difference to the performance of a job that isn't
> using windowing or process functions. For example, in such a job is it
> advisable to disable auto wartermarking and use the default? Or is this in
> combination an explicit choice of one characteristic more optimal?
>
> More generally it would be good to know how this choice effects a job.
>
> Anyone have any details or empirical evidence about this?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Stop command failure

2021-05-22 Thread Robert Metzger
Hi,
can you provide the jobmanager log of that run? it seems that the operation
timed out. The JobManager log will help us to give some insights into the
root cause.

On Tue, May 18, 2021 at 1:42 PM V N, Suchithra (Nokia - IN/Bangalore) <
suchithra@nokia.com> wrote:

> Hi,
>
>
>
> Stop command is failing with below error with apache flink 1.12.3 version.
> Could you pls help.
>
>
>
> log":"[Flink-RestClusterClient-IO-thread-2]
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel
> Force-closing a channel whose registration task was not accepted by an
> event loop: [id: 0x4fb1c35c]"}
>
> java.util.concurrent.RejectedExecutionException: event executor terminated
>
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:926)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:353)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:346)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:828)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:818)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.register(AbstractChannel.java:471)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>
> at
> org.apache.flink.shaded.netty4.io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:87)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>
> at
> org.apache.flink.shaded.netty4.io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:81)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>
> at
> org.apache.flink.shaded.netty4.io.netty.channel.MultithreadEventLoopGroup.register(MultithreadEventLoopGroup.java:86)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>
> at
> org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:323)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>
> at
> org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:155)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>
> at
> org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:139)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>
> at
> org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:123)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>
> at
> org.apache.flink.runtime.rest.RestClient.submitRequest(RestClient.java:421)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>
> at
> org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:344)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>
> at
> org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:258)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$null$23(RestClusterClient.java:777)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
> [?:?]
>
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> [?:?]
>
> at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
> [?:?]
>
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:649)
> [?:?]
>
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
> [?:?]
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> [?:?]
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> [?:?]
>
> at java.lang.Thread.run(Thread.java:829) [?:?]
>
> {"type":"log","host":"${env:CLOG_HOST}","level":"ERROR","systemid":"${env:CLOG_SYSTEMID}","system":"${env:CLOG_SYSTEM}","time":"2021-05-18T10:32:04.934Z","timezone":"UTC","log":"[Flink-RestClusterClient-IO-thread-2]
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
> Failed to submit a listener notification task. Event loop shut down?"}
>
> java.util.concurrent.RejectedExecutionException: event executor terminated
>
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:926)
> ~[flink-dist_2.11-1.12.3.jar:1.12.3]
>
> at
> 

Re: Fastest way for decent lookup JOIN?

2021-05-22 Thread Robert Metzger
Hi Theo,

Since you are running Flink locally it would be quite easy to attach a
profiler to Flink to see where most of the CPU cycles are burned (or: check
if you are maybe IO bound?) .. this could provide us with valuable data on
deciding for the next steps.

On Tue, May 18, 2021 at 5:26 PM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:

> Hi there,
>
> I have the following (probably very common) usecase: I have some lookup
> data ( 100 million records ) which change only slowly (in the range of some
> thousands per day). My event stream is in the order of tens of billions
> events per day and each event needs to be enriched from the 100 million
> lookup source. For the JOIN, I don't need any event time related stuff,
> just the newest version at the time of enrichment shall be taken into
> account.
>
> As a user used to the DataStream API but unfamiliar with SQL API, I built
> a small MVP. I used a connected stream and put the enrichment data into
> keyed (heap)state. My RAM is enough to hold all the data in memory (once in
> prod at least). I first streamed in all 100 million records, then I started
> the performance measurement by streaming in just 3 million events to be
> enriched against the 100 million records. I was a bit stunned that the
> enrichment of all events took about 40 seconds on my local machine. I built
> up a similar MVP in Spark where I put the 100 million records into a
> (pre-partioned to the JOIN column) hive table, the 3 million test events
> into a parquetfile and then run an outer join which also took about 40
> seconds on my local machine (consuming only 16GB of RAM). I somehow
> expected Flink to be much faster as I hold the enrichment data already in
> memory (state) and at least on the localhost, there is no real networking
> involved.
>
> I then thought about the problems with the DataStream API: My 100 million
> events are read from an uncompressed CSV file which is 25GB in size.
> Deserialized to Java POJOs, I guess the POJOs would take 100GB heap space.
> [Actually, I run the tests in Spark with all 100million records and this
> Flink test with only 20 Million records due to too much memory used, so the
> 100GB is an estimation from 20 million records taking 20GB heap space].
> When I stopped parsing my enrichment data to POJOs but extracted only the
> enrichment (join) attribute and kept the remaining part of the data as a
> simple string, the java heap taken was only about 25GB again for all
> 100million records. Not only that, my enrichment JOIN now took only 30
> seconds to complete all records. My thought now is: I probably shouldn't
> use DataStream API with Java POJOs here, but Flink SQL API with "Row"
> classes? I remember I once read some blog with how Flink internally
> optimizes its data strucutres and can reuse certain stuff when using SQL
> API and so on.
>
> Before I am going to try out several variants now, my question is: What do
> you think is the fastest/most efficient way to enrich slowly changing data
> with the latest version (Processing time temporal table JOIN) [When memory
> isn't a big problem once deployed to the cluster]? Do you recommend to use
> the SQL API? With which type of JOIN? (Processing time temporal table?) and
> hold enrichment table fully in Flink managed memory (Can I express this via
> SQL API?) or do I need to use some external "LookupTableSource"? Once I run
> my application in the cluster, I suspect a "LookupTableSource" to introduce
> some communication overhead vs. querying Flink State directly? If you
> recommend DataStream API to be used: Should I read via SQL connectors and
> work with "Rows" in state? What kind of performance tunings should I take
> into account here (reuseObjects, disableChaining, ...)?
>
> Best regards
> Theo
>


Re: Parallelism in Production: Best Practices

2021-05-22 Thread Robert Metzger
Hi Yaroslav,

My recommendation is to go with the 2nd pattern you've described, but I
only have limited insights into real world production workloads.

Besides the parallelism configuration, I also recommend looking into slot
sharing groups, and maybe disabling operator chaining.
I'm pretty sure some of Flink's large production users have shared
information about this in past Flink Forward talks .. but it is difficult
to find answers (unless you spend a lot of time on YouTube).

Best,
Robert


On Wed, May 19, 2021 at 8:01 PM Yaroslav Tkachenko <
yaroslav.tkache...@shopify.com> wrote:

> Hi everyone,
>
> I'd love to learn more about how different companies approach specifying
> Flink parallelism. I'm specifically interested in real, production
> workloads.
>
> I can see a few common patterns:
>
> - Rely on default parallelism, scale by changing parallelism for the whole
> pipeline. I guess it only works if the pipeline doesn't have obvious
> bottlenecks. Also, it looks like the new reactive mode makes specifying
> parallelism for an operator obsolete (
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/elastic_scaling/#configuration
> )
>
> - Rely on default parallelism for most of the operators, but override it
> for some. For example, it doesn't make sense for a Kafka source to have
> parallelism higher than the number of partitions it consumes. Some custom
> sinks could choose lower parallelism to avoid overloading their
> destinations. Some transformation steps could choose higher parallelism to
> distribute the work better, etc.
>
> - Don't rely on default parallelism and configure parallelism
> explicitly for each operator. This requires very good knowledge of each
> operator in the pipeline, but it could lead to very good performance.
>
> Is there a different pattern that I miss? What do you use? Feel free to
> share any resources.
>
> If you do specify it explicitly, what do you think about the reactive
> mode? Will you use it?
>
> Also, how often do you change parallelism? Do you set it once and forget
> once the pipeline is stable? Do you keep re-evaluating it?
>
> Thanks.
>


Re: Savepoint/checkpoint confusion

2021-05-22 Thread Robert Metzger
Hi Igor,

In my understanding, checkpoints are managed by the system (Flink decides
when to create and delete them), while savepoints are managed by the user
(they decide when to create and delete them).
Indeed, only checkpoints can be incremental (if that feature is enabled).

>  it's made on-demand and the state backend can be changed (since 1.13).
Is this correct?

Yes

On Thu, May 20, 2021 at 4:46 PM Igor Basov  wrote:

> Hey Robert,
> Thanks for the answer! But then I guess the only difference between
> savepoints and checkpoints is that checkpoints are structurally state
> dependent and can be incremental, but otherwise they are functionally
> equivalent. So functionally savepoint can be considered a full checkpoint
> which provides 2 additional benefits: it's made on-demand and the state
> backend can be changed (since 1.13). Is this correct?
>
> On Thu, 20 May 2021 at 05:35, Robert Metzger  wrote:
>
>> Hey Igor,
>>
>> 1) yes, reactive mode indeed does the same.
>> 2) No, HA mode is only storing some metadata in ZK about the leadership
>> and latest checkpoints, but the checkpoints itself are the same. They
>> should be usable for a changed job graph (if the state matches the
>> operators by setting the UUIDs [1]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/production_ready/#set-uuids-for-all-operators
>>
>>
>> On Fri, May 7, 2021 at 10:13 PM Igor Basov  wrote:
>>
>>> Hello,
>>> I got confused about usage of savepoints and checkpoints in different
>>> scenarios.
>>> I understand that checkpoints' main purpose is fault tolerance, they are
>>> more lightweight and don't support changing job graph, parallelism or state
>>> backend when restoring from them, as mentioned in the latest 1.13 docs:
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#difference-to-savepoints
>>>
>>> At the same time:
>>> 1) Reactive scaling mode (in 1.13) uses checkpoints exactly for that -
>>> rescaling.
>>> 2) There are use cases like here:
>>>
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/What-happens-when-a-job-is-rescaled-td39462.html
>>> where people seem to be using retained checkpoints instead of savepoints
>>> to do manual job restarts with rescaling.
>>> 3) There are claims like here:
>>>
>>> https://lists.apache.org/thread.html/4299518f4da2810aa88fe6b21f841880b619f3f8ac264084a318c034%40%3Cuser.flink.apache.org%3E
>>> that in HA setup JobManager is able to restart from a checkpoint even if
>>> operators are added/removed or parallelism is changed (in this case I'm not
>>> sure if the checkpoints used by HA JM in `high-availability.storageDir` is
>>> the same thing as usual checkpoints).
>>>
>>> So I guess the questions are:
>>> 1) Can retained checkpoints be safely used for manual restarting and
>>> rescaling a job?
>>> 2) Are checkpoints made by HA JM structurally different from the usual
>>> ones? Can they be used to restore a job with a changed job graph?
>>>
>>> Thank you,
>>> Igor
>>>
>>>
>>


Re: Savepoint/checkpoint confusion

2021-05-20 Thread Robert Metzger
Hey Igor,

1) yes, reactive mode indeed does the same.
2) No, HA mode is only storing some metadata in ZK about the leadership and
latest checkpoints, but the checkpoints itself are the same. They should be
usable for a changed job graph (if the state matches the operators by
setting the UUIDs [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/production_ready/#set-uuids-for-all-operators


On Fri, May 7, 2021 at 10:13 PM Igor Basov  wrote:

> Hello,
> I got confused about usage of savepoints and checkpoints in different
> scenarios.
> I understand that checkpoints' main purpose is fault tolerance, they are
> more lightweight and don't support changing job graph, parallelism or state
> backend when restoring from them, as mentioned in the latest 1.13 docs:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#difference-to-savepoints
>
> At the same time:
> 1) Reactive scaling mode (in 1.13) uses checkpoints exactly for that -
> rescaling.
> 2) There are use cases like here:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/What-happens-when-a-job-is-rescaled-td39462.html
> where people seem to be using retained checkpoints instead of savepoints
> to do manual job restarts with rescaling.
> 3) There are claims like here:
>
> https://lists.apache.org/thread.html/4299518f4da2810aa88fe6b21f841880b619f3f8ac264084a318c034%40%3Cuser.flink.apache.org%3E
> that in HA setup JobManager is able to restart from a checkpoint even if
> operators are added/removed or parallelism is changed (in this case I'm not
> sure if the checkpoints used by HA JM in `high-availability.storageDir` is
> the same thing as usual checkpoints).
>
> So I guess the questions are:
> 1) Can retained checkpoints be safely used for manual restarting and
> rescaling a job?
> 2) Are checkpoints made by HA JM structurally different from the usual
> ones? Can they be used to restore a job with a changed job graph?
>
> Thank you,
> Igor
>
>


Re: taskmanager initialization failed

2021-05-17 Thread Robert Metzger
Hi Suchithra,

this is very likely a version mixup: Can you make sure all jars in your
classpath are Flink 1.11.1?

On Mon, May 17, 2021 at 2:05 PM V N, Suchithra (Nokia - IN/Bangalore) <
suchithra@nokia.com> wrote:

> Hi,
>
>
>
> With flink 1.11.1 version, taskmanager initialization is failing with
> below error. Could you please help to debug the issue.
>
>
>
> log":"[main] org.apache.flink.runtime.io.network.netty.NettyConfig
> NettyConfig [server address: /0.0.0.0, server port: 4121, ssl enabled:
> false, memory segment size (bytes): 32768, transport type: AUTO, number of
> server threads: 4 (manual), number of client threads: 4 (manual), server
> connect backlog: 0 (use Netty's default), client connect timeout (sec):
> 120, send/receive buffer size (bytes): 0 (use Netty's default)]"}
>
> log":"[main] org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> TaskManager initialization failed."}
>
> java.lang.NoSuchFieldError: FORCE_PARTITION_RELEASE_ON_CONSUMPTION
>
> at
> org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration.fromConfiguration(NettyShuffleEnvironmentConfiguration.java:221)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>
> at
> org.apache.flink.runtime.io.network.NettyShuffleServiceFactory.createShuffleEnvironment(NettyShuffleServiceFactory.java:61)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>
> at
> org.apache.flink.runtime.io.network.NettyShuffleServiceFactory.createShuffleEnvironment(NettyShuffleServiceFactory.java:49)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerServices.createShuffleEnvironment(TaskManagerServices.java:373)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerServices.fromConfiguration(TaskManagerServices.java:270)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManager(TaskManagerRunner.java:375)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.(TaskManagerRunner.java:157)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:305)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerSecurely$2(TaskManagerRunner.java:320)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>
> at java.security.AccessController.doPrivileged(Native Method)
> ~[?:?]
>
> at javax.security.auth.Subject.doAs(Subject.java:423) ~[?:?]
>
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
> ~[flink-shaded-hadoop-2-uber-2.6.5-7.0.jar:2.6.5-7.0]
>
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerSecurely(TaskManagerRunner.java:319)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:297)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>
>
>
> Thanks,
>
> Suchithra
>


Re: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: IOException: 1 time,

2021-05-05 Thread Robert Metzger
Hi Ragini,

Since this exception is coming from the Hbase client, I assume the issue
has nothing to do with Flink directly.
I would recommend carefully studying the HBase client configuration
parameters, maybe setup a simple Java application that "hammers" data into
Hbase at a maximum rate to understand the impact of different combinations
of configuration parameters.

Best,
Robert

On Tue, May 4, 2021 at 5:05 AM Ragini Manjaiah 
wrote:

> Hi ,
> One of my flink applications needs to get and put records from HBASE for
> every event while processing in real time . When there are less events the
> application process without any issues. when the number of events
> increases we start hitting with the below mentioned exception .Can these
> exceptions bring down the throughput and start to build lag . What are the
> parameters we can tune at HBASE /flink side to overcome this exception . We
> are seeing 7000/sec hits as minimum hits to HBase when load is normal. The
> hbase table 3 region server
>
>
> org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 
> action: IOException: 1 time,
>   at 
> org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.makeException(AsyncProcess.java:258)
>   at 
> org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.access$2000(AsyncProcess.java:238)
>   at 
> org.apache.hadoop.hbase.client.AsyncProcess.waitForAllPreviousOpsAndReset(AsyncProcess.java:1817)
>   at 
> org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:240)
>   at 
> org.apache.hadoop.hbase.client.BufferedMutatorImpl.flush(BufferedMutatorImpl.java:190)
>   at org.apache.hadoop.hbase.client.HTable.flushCommits(HTable.java:1434)
>   at org.apache.hadoop.hbase.client.HTable.put(HTable.java:1018)
>   at org......xx(xxx.java:202)
>   at 
> org......xxx.xxx(xxx.java:144)
>   at 
> org......xxx.(x.java:30)
>   at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:745)
>
>


Re: Interacting with flink-jobmanager via CLI in separate pod

2021-05-05 Thread Robert Metzger
l.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
> Caused by: 
> org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException: 
> connection timed out: /10.43.0.1:30081
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:261)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
> ngleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) 
> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
>
>
> On Wed, May 5, 2021 at 6:59 AM Robert Metzger  wrote:
>
>> Hi,
>> can you check the client log in the "log/" directory?
>> The Flink client will try to access the K8s API server to retrieve the
>> endpoint of the jobmanager. For that, the pod needs to have permissions
>> (through a service account) to make such calls to K8s. My hope is that the
>> logs or previous messages are giving an indication into what Flink is
>> trying to do.
>> Can you also try running on DEBUG log level? (should be the
>> log4j-cli.properties file).
>>
>>
>>
>> On Tue, May 4, 2021 at 3:17 PM Robert Cullen 
>> wrote:
>>
>>> I have a flink cluster running in kubernetes, just the basic
>>> installation with one JobManager and two TaskManagers. I want to interact
>>> with it via command line from a separate container ie:
>>>
>>> root@flink-client:/opt/flink# ./bin/flink list --target 
>>> kubernetes-application -Dkubernetes.cluster-id=job-manager
>>>
>>> How do you interact in the same kubernetes instance via CLI (Not from
>>> the desktop)?  This is the exception:
>>>
>>> 
>>>  The program finished with the following exception:
>>>
>>> java.lang.RuntimeException: 
>>> org.apache.flink.client.deployment.ClusterRetrieveException: Could not get 
>>> the rest endpoint of job-manager
>>> at 
>>> org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$0(KubernetesClusterDescriptor.java:103)
>>> at 
>>> org.apache.flink.kubernetes.KubernetesClusterDescriptor.retrieve(KubernetesClusterDescriptor.java:145)
>>> at 
>>> org.apache.flink.kubernetes.KubernetesClusterDescriptor.retrieve(KubernetesClusterDescriptor.java:67)
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1001)
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.list(CliFrontend.java:427)
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1060)
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>>> at 
>>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>>> Caused by: org.apache.flink.client.deployment.ClusterRetrieveException: 
>>> Could not get the rest endpoint of job-manager
>>> ... 9 more
>>> root@flink-client:/opt/flink#
>>>
>>> --
>>> Robert Cullen
>>> 240-475-4490
>>>
>>
>
> --
> Robert Cullen
> 240-475-4490
>


Re: Interacting with flink-jobmanager via CLI in separate pod

2021-05-05 Thread Robert Metzger
Hi,
can you check the client log in the "log/" directory?
The Flink client will try to access the K8s API server to retrieve the
endpoint of the jobmanager. For that, the pod needs to have permissions
(through a service account) to make such calls to K8s. My hope is that the
logs or previous messages are giving an indication into what Flink is
trying to do.
Can you also try running on DEBUG log level? (should be the
log4j-cli.properties file).



On Tue, May 4, 2021 at 3:17 PM Robert Cullen  wrote:

> I have a flink cluster running in kubernetes, just the basic installation
> with one JobManager and two TaskManagers. I want to interact with it via
> command line from a separate container ie:
>
> root@flink-client:/opt/flink# ./bin/flink list --target 
> kubernetes-application -Dkubernetes.cluster-id=job-manager
>
> How do you interact in the same kubernetes instance via CLI (Not from the
> desktop)?  This is the exception:
>
> 
>  The program finished with the following exception:
>
> java.lang.RuntimeException: 
> org.apache.flink.client.deployment.ClusterRetrieveException: Could not get 
> the rest endpoint of job-manager
> at 
> org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$0(KubernetesClusterDescriptor.java:103)
> at 
> org.apache.flink.kubernetes.KubernetesClusterDescriptor.retrieve(KubernetesClusterDescriptor.java:145)
> at 
> org.apache.flink.kubernetes.KubernetesClusterDescriptor.retrieve(KubernetesClusterDescriptor.java:67)
> at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1001)
> at org.apache.flink.client.cli.CliFrontend.list(CliFrontend.java:427)
> at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1060)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at 
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: org.apache.flink.client.deployment.ClusterRetrieveException: Could 
> not get the rest endpoint of job-manager
> ... 9 more
> root@flink-client:/opt/flink#
>
> --
> Robert Cullen
> 240-475-4490
>


Re: [ANNOUNCE] Apache Flink 1.13.0 released

2021-05-04 Thread Robert Metzger
Thanks a lot to everybody who has contributed to the release, in particular
the release managers for running the show!


On Tue, May 4, 2021 at 8:54 AM Konstantin Knauf 
wrote:

> Thank you Dawid and Guowei! Great job everyone :)
>
> On Mon, May 3, 2021 at 7:11 PM Till Rohrmann  wrote:
>
>> This is great news. Thanks a lot for being our release managers Dawid and
>> Guowei! And also thanks to everyone who has made this release possible :-)
>>
>> Cheers,
>> Till
>>
>> On Mon, May 3, 2021 at 5:46 PM vishalovercome 
>> wrote:
>>
>>> This is a very big release! Many thanks to the flink developers for their
>>> contributions to making Flink as good a framework that it is!
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>
>
> --
>
> Konstantin Knauf | Head of Product
>
> +49 160 91394525
>
>
> Follow us @VervericaData Ververica 
>
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
> Wehner
>


Re: Correctly serializing "Number" as state in ProcessFunction

2021-04-26 Thread Robert Metzger
Quick comment on the kryo type registration and the messages you are
seeing: The messages are expected: What the message is saying is that we
are not serializing the type using Flink's POJO serializer, but we are
falling back to Kryo.
Since you are registering all the instances of Number that you are using
(Integer, Double), you'll get better performance (or at least less CPU
load) with Kryo. So if you want to keep using Kryo, you are doing
everything right (and you generally won't be able to use Flink's POJO
serializer for Number-types).

On Fri, Apr 23, 2021 at 7:07 PM Miguel Araújo 
wrote:

> Thanks for your replies. I agree this is a somewhat general problem.
> I posted it here as I was trying to register the valid subclasses in Kryo
> but I couldn't get the message to go away, i.e., everything worked
> correctly but there was the complaint that GenericType serialization was
> being used.
>
> This is how I was registering these types:
>
> env.getConfig.registerKryoType(classOf[java.lang.Integer])
> env.getConfig.registerKryoType(classOf[java.lang.Double])
>
> and this is the message I got on every event:
>
> flink-task-manager_1  | 2021-04-23 16:48:29.274 [Processor Function 1
> (1/2)#0] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - No
> fields were detected for class java.lang.Number so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
>
> In the meanwhile, I've changed my approach to reuse a protobuf type I
> already had as part of my input event.
>
> Once again, thanks for your replies because they gave me the right
> perspective.
>
>
>
> Arvid Heise  escreveu no dia quarta, 21/04/2021 à(s)
> 18:26:
>
>> Hi Miguel,
>>
>> as Klemens said this is a rather general problem independent of Flink:
>> How do you map Polymorphism in serialization?
>>
>> Flink doesn't have an answer on its own, as it's discouraged (A Number
>> can have arbitrary many subclasses: how do you distinguish them except by
>> classname? That adds a ton of overhead.). The easiest solution in your case
>> is to convert ints into double.
>> Or you use Kryo which dictionary encodes the classes and also limits the
>> possible subclasses.
>>
>> On Tue, Apr 20, 2021 at 11:13 AM Klemens Muthmann <
>> klemens.muthm...@cyface.de> wrote:
>>
>>> Hi,
>>>
>>> I guess this is more of a Java Problem than a Flink Problem. If you want
>>> it quick and dirty you could implement a class such as:
>>>
>>> public class Value {
>>> private boolean isLongSet = false;
>>> private long longValue = 0L;
>>> private boolean isIntegerSet = false;
>>> private int intValue = 0;
>>>
>>>public Value(final long value) {
>>>setLong(value);
>>>}
>>>
>>> public void setLong(final long value) |
>>> longValue = value;
>>> isLongSet = true;
>>>}
>>>
>>>public long getLong() {
>>>if(isLongSet) {
>>>return longValue
>>>}
>>>}
>>>
>>>// Add same methods for int
>>>// to satisfy POJO requirements you will also need to add a
>>> no-argument constructor as well as getters and setters for the boolean flags
>>> }
>>>
>>> I guess a cleaner solution would be possible using a custom Kryo
>>> serializer as explained here:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html
>>>
>>> Regards
>>>   Klemens
>>>
>>>
>>>
>>> > Am 20.04.2021 um 10:34 schrieb Miguel Araújo <
>>> miguelaraujo...@gmail.com>:
>>> >
>>> > Hi everyone,
>>> >
>>> > I have a ProcessFunction which needs to store different number types
>>> for different keys, e.g., some keys need to store an integer while others
>>> need to store a double.
>>> >
>>> > I tried to use java.lang.Number as the type for the ValueState, but I
>>> got the expected "No fields were detected for class java.lang.Number so it
>>> cannot be used as a POJO type and must be processed as GenericType."
>>> >
>>> > I have the feeling that this is not the right approach, but the exact
>>> type to be stored is only known at runtime which makes things a bit
>>> trickier. Is there a way to register these classes correctly, or Is it
>>> preferable to use different ValueState's for different types?
>>> >
>>> > Thanks,
>>> > Miguel
>>>
>>>


Re: Flink missing Kafka records

2021-04-26 Thread Robert Metzger
Hi Dan,

Can you describe under which conditions you are missing records (after a
machine failure, after a Kafka failure, after taking and restoring from a
savepoint, ...).
Are many records missing? Are "the first records" or the "latest records"
missing? Any individual records missing, or larger blocks of data?

I don't think that there's a bug in Flink or the Kafka connector. Maybe its
just a configuration or systems design issue.


On Sun, Apr 25, 2021 at 9:56 AM Dan Hill  wrote:

> Hi!
>
> Have any other devs noticed issues with Flink missing Kafka records with
> long-running Flink jobs?  When I re-run my Flink job and start from the
> earliest Kafka offset, Flink processes the events correctly.  I'm using
> Flink v1.11.1.
>
> I have a simple job that takes records (Requests) from Kafka and
> serializes them to S3.  Pretty basic.  No related issues in the text logs.
> I'm hoping I just have a configuration issue.  I'm guessing idleness is
> working in a way that I'm not expecting.
>
> Any ideas?
> - Dan
>
>
> void createLogRequestJob(StreamExecutionEnvironment env) throws Exception {
>
>   Properties kafkaSourceProperties =
> getKafkaSourceProperties("logrequest");
>
>   SingleOutputStreamOperator rawRequestInput = env.addSource(
>
> new FlinkKafkaConsumer(getInputRequestTopic(),
> getProtoDeserializationSchema(Request.class), kafkaSourceProperties))
>
>   .uid("source-request")
>
>   .name("Request")
>
>   .assignTimestampsAndWatermarks(
>
>
> WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness).withIdleness(Duration.ofMinutes(1)));
>
>
>   executeLogRequest(rawRequestInput);
>
>   env.execute("log-request");
>
> }
>
>
> void executeLogRequest(SingleOutputStreamOperator
> rawRequestInput) {
>
>   AvroWriterFactory factory =
> getAvroWriterFactory(Request.class);
>
>   rawRequestInput.addSink(StreamingFileSink
>
>   .forBulkFormat(new Path(getS3OutputDirectory(), "raw/request"),
> factory)
>
>   .withBucketAssigner(new DateHourBucketAssigner(request ->
> request.getTiming().getEventApiTimestamp()))
>
>   .withRollingPolicy(OnCheckpointRollingPolicy.build())
>
>   .withOutputFileConfig(createOutputFileConfig())
>
>   .build())
>
> .uid("sink-s3-raw-request")
>
> .name("S3 Raw Request");
>
> }
>
>
>
>


Re: The wrong Options of Kafka Connector, will make the cluster can not run any job

2021-04-26 Thread Robert Metzger
Thanks a lot for your message. This could be a bug in Flink. It seems that
the archival of the execution graph is failing because some classes are
unloaded.

What I observe from your stack traces is that some classes are loaded from
flink-dist_2.11-1.11.2.jar, while other classes are loaded from
template-common-jar-0.0.1. Maybe Flink is closing the usercode classloader,
and this is causing the exception during the archival of the execution
graph. Can you make sure that the core Flink classes are only in your
classpath once (in flink-dist), and the template-common-jar-0.0.1 doesn't
contain the runtime Flink classes? (for example by setting the Flink
dependencies to provided when using the maven-shade-plugin).

For the issue while submitting the job, I can not provide you any further
help, because you haven't posted the exception that occurred in the REST
handler. Could you post this exception here as well?

Best wishes,
Robert



On Sun, Apr 25, 2021 at 2:44 PM chenxuying  wrote:

> environment:
>
> flinksql 1.12.2
>
> k8s session mode
>
> description:
>
> I got follow error log when my kafka connector port was wrong
>
> >
>
> 2021-04-25 16:49:50
>
> org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms
> expired before the position for partition filebeat_json_install_log-3 could
> be determined
>
> >
>
>
> I got follow error log when my kafka connector ip was wrong
>
> >
>
> 2021-04-25 20:12:53
>
> org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
>
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
>
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
>
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
>
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
>
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
>
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
>
> at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>
> at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
>
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout
> expired while fetching topic metadata
>
> >
>
>
> When the job was cancelled,there was follow error log:
>
> >
>
> 2021-04-25 08:53:41,115 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job
> v2_ods_device_action_log (fcc451b8a521398b10e5b86153141fbf) switched from
> state CANCELLING to CANCELED.
>
> 2021-04-25 08:53:41,115 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Stopping
> checkpoint coordinator for job 

Re: kafka consumers partition count and parallelism

2021-04-26 Thread Robert Metzger
Hey Prashant,
the Kafka Consumer parallelism is constrained by the number of partitions
the topic(s) have. If you have configured the Kafka Consumer in Flink with
a parallelism of 100, but your topic has only 20 partitions, 80 consumer
instances in Flink will be idle.

On Mon, Apr 26, 2021 at 2:54 AM Prashant Deva  wrote:

> Can a Kafka Consumer Source have more tasks run in parallel than the
> number of partitions for the topic it is the source of? Or is the max
> parallelism of the source constrained by max partitions of the topic?
>
>
>
>


Re: Checkpoint error - "The job has failed"

2021-04-26 Thread Robert Metzger
Hi Dan,

can you provide me with the JobManager logs to take a look as well? (This
will also tell me which Flink version you are using)



On Mon, Apr 26, 2021 at 7:20 AM Dan Hill  wrote:

> My Flink job failed to checkpoint with a "The job has failed" error.  The
> logs contained no other recent errors.  I keep hitting the error even if I
> cancel the jobs and restart them.  When I restarted my jobmanager and
> taskmanager, the error went away.
>
> What error am I hitting?  It looks like there is bad state that lives
> outside the scope of a job.
>
> How often do people restart their jobmanagers and taskmanager to deal with
> errors like this?
>


Re: Running Apache Flink 1.12 on Apple Silicon M1 MacBook Pro

2021-04-17 Thread Robert Metzger
Thanks a lot for the logs. I filed a ticket to track the issue:
https://issues.apache.org/jira/browse/FLINK-22331
I hope somebody with M1 hardware will soon have time to look into it.

On Fri, Apr 16, 2021 at 11:02 AM Klemens Muthmann <
klemens.muthm...@cyface.de> wrote:

> Hi,
>
> I’ve appended you two log files. One is from a run without the Rosetta 2
> compatibility layer while the other is with. As I said it would be great if
> everything works without Rosetta 2, but at first it might be sufficient to
> make it work with the compatibility layer.
>
> Regards
>  Klemens
>
>
> Am 15.04.2021 um 21:29 schrieb Robert Metzger :
>
> Hi,
>
> a DEBUG log of the client would indeed be nice.
> Can you adjust this file:
>
> conf/log4j-cli.properties
>
> to the following contents: (basically TRACE logging with netty logs
> enabled)
>
>
>
> 
> #  Licensed to the Apache Software Foundation (ASF) under one
> #  or more contributor license agreements.  See the NOTICE file
> #  distributed with this work for additional information
> #  regarding copyright ownership.  The ASF licenses this file
> #  to you under the Apache License, Version 2.0 (the
> #  "License"); you may not use this file except in compliance
> #  with the License.  You may obtain a copy of the License at
> #
> #  http://www.apache.org/licenses/LICENSE-2.0
> #
> #  Unless required by applicable law or agreed to in writing, software
> #  distributed under the License is distributed on an "AS IS" BASIS,
> #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> #  See the License for the specific language governing permissions and
> # limitations under the License.
>
> 
>
> # Allows this configuration to be modified at runtime. The file will be
> checked every 30 seconds.
> monitorInterval=30
>
> rootLogger.level = TRACE
> rootLogger.appenderRef.file.ref = FileAppender
>
> # Log all infos in the given file
> appender.file.name = FileAppender
> appender.file.type = FILE
> appender.file.append = false
> appender.file.fileName = ${sys:log.file}
> appender.file.layout.type = PatternLayout
> appender.file.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x -
> %m%n
>
> # Log output from org.apache.flink.yarn to the console. This is used by the
> # CliFrontend class when using a per-job YARN cluster.
> logger.yarn.name = org.apache.flink.yarn
> logger.yarn.level = INFO
> logger.yarn.appenderRef.console.ref = ConsoleAppender
> logger.yarncli.name = org.apache.flink.yarn.cli.FlinkYarnSessionCli
> logger.yarncli.level = INFO
> logger.yarncli.appenderRef.console.ref = ConsoleAppender
> logger.hadoop.name = org.apache.hadoop
> logger.hadoop.level = INFO
> logger.hadoop.appenderRef.console.ref = ConsoleAppender
>
> # Make sure hive logs go to the file.
> logger.hive.name = org.apache.hadoop.hive
> logger.hive.level = INFO
> logger.hive.additivity = false
> logger.hive.appenderRef.file.ref = FileAppender
>
> # Log output from org.apache.flink.kubernetes to the console.
> logger.kubernetes.name = org.apache.flink.kubernetes
> logger.kubernetes.level = INFO
> logger.kubernetes.appenderRef.console.ref = ConsoleAppender
>
> appender.console.name = ConsoleAppender
> appender.console.type = CONSOLE
> appender.console.layout.type = PatternLayout
> appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c
> %x - %m%n
>
> # suppress the warning that hadoop native libraries are not loaded
> (irrelevant for the client)
> logger.hadoopnative.name = org.apache.hadoop.util.NativeCodeLoader
> logger.hadoopnative.level = OFF
>
> # Suppress the irrelevant (wrong) warnings from the Netty channel handler
> #logger.netty.name =
> org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
> #logger.netty.level = OFF
>
>
>
> And then submit a job locally, and send me the respective log file
> (containing the "client" string in the file name).
>
> Thanks a lot, and stay healthy through the pandemic!
>
> Best,
> Robert
>
>
> On Thu, Apr 15, 2021 at 9:12 PM Klemens Muthmann <
> klemens.muthm...@cyface.de> wrote:
>
>> Hi,
>>
>> Since kindergarden time is shortened due to the pandemic I only get four
>> hours of work into each day and I am supposed to do eight. So unfortunately
>> I will not be able to develop a fix at the moment. -.- I am happy to
>> provide any debug log you need or test adaptations and provide fixes as
>> pull requests. But I will sadl

Re: Question about state processor data outputs

2021-04-16 Thread Robert Metzger
Hi,
I assumed you are using the DataStream API, because you mentioned the
streaming sink. But you also mentioned the state processor API (which I
ignored a bit).

I wonder why you are using the state processor API. Can't you use the
streaming job that created the state also for writing it to files using the
StreamingFileSink?

If you want to stick to the DataSet API, then I guess you have to implement
a custom (File)OutputFormat.


On Fri, Apr 16, 2021 at 5:37 AM Chen-Che Huang  wrote:

> Hi Robert,
>
> Thanks for your code. It's really helpful!
>
> However, with the readKeyedState api of state processor, we get dataset
> for our data instead of datastream and it seems the dataset doesn't support
> streamfilesink (not addSink method like datastream). If not, I need to
> transform the dataset to a datastream. I'm not sure it's doable based on
> https://www.alibabacloud.com/blog/deep-insights-into-flink-sql-flink-advanced-tutorials_596628.
> If it's doable, then I'll be able to solve our problem with applying
> streamfilesink to the transformed dataset.
>
> Best wishes,
> Chen-Che Huang
>
> On 2021/04/15 19:23:43, Robert Metzger  wrote:
> > Hey Chen-Che Huang,
> >
> > I guess the StreamingFileSink is what you are looking for. It is
> documented
> > here:
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
> > I drafted a short example (that is not production ready), which does
> > roughly what you are asking for:
> > https://gist.github.com/rmetzger/7d5dbdaa118c63f5875c8c9520cc311d
> >
> > Hope this helps!
> >
> > Best,
> > Robert
> >
> >
> > On Thu, Apr 15, 2021 at 11:33 AM Chen-Che Huang 
> wrote:
> >
> > > Hi all,
> > >
> > > We're going to use state processor to make our keyedstate data to be
> > > written to different files based on the keys. More specifically, we
> want
> > > our data to be written to files key1.txt, key2.txt, ..., and keyn.txt
> where
> > > the value with the same key is stored in the same file. In each file,
> the
> > > data may be stored as follows. As far as I know, I need to implement
> my own
> > > Sink (org.apache.flink.streaming.api.functions.sink.RichSinkFunction)
> to
> > > meet the requirement. However, I wonder is there a native way to
> achieve
> > > this without implementing my own Sink because using official solution
> is
> > > usually more efficient and reliable than doing it by myself.  Many
> thanks
> > > for any comment.
> > >
> > > key1.txt
> > > key1 value11
> > > key1 value21
> > > key1 value31
> > >
> > > key2.txt
> > > key2 value21
> > > key2 value22
> > > key2 value23
> > >
> > > Best wishes,
> > > Chen-Che Huang
> > >
> >
>


Re: Running Apache Flink 1.12 on Apple Silicon M1 MacBook Pro

2021-04-15 Thread Robert Metzger
Hi,

a DEBUG log of the client would indeed be nice.
Can you adjust this file:

conf/log4j-cli.properties

to the following contents: (basically TRACE logging with netty logs enabled)



#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.


# Allows this configuration to be modified at runtime. The file will be
checked every 30 seconds.
monitorInterval=30

rootLogger.level = TRACE
rootLogger.appenderRef.file.ref = FileAppender

# Log all infos in the given file
appender.file.name = FileAppender
appender.file.type = FILE
appender.file.append = false
appender.file.fileName = ${sys:log.file}
appender.file.layout.type = PatternLayout
appender.file.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x -
%m%n

# Log output from org.apache.flink.yarn to the console. This is used by the
# CliFrontend class when using a per-job YARN cluster.
logger.yarn.name = org.apache.flink.yarn
logger.yarn.level = INFO
logger.yarn.appenderRef.console.ref = ConsoleAppender
logger.yarncli.name = org.apache.flink.yarn.cli.FlinkYarnSessionCli
logger.yarncli.level = INFO
logger.yarncli.appenderRef.console.ref = ConsoleAppender
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.hadoop.appenderRef.console.ref = ConsoleAppender

# Make sure hive logs go to the file.
logger.hive.name = org.apache.hadoop.hive
logger.hive.level = INFO
logger.hive.additivity = false
logger.hive.appenderRef.file.ref = FileAppender

# Log output from org.apache.flink.kubernetes to the console.
logger.kubernetes.name = org.apache.flink.kubernetes
logger.kubernetes.level = INFO
logger.kubernetes.appenderRef.console.ref = ConsoleAppender

appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x
- %m%n

# suppress the warning that hadoop native libraries are not loaded
(irrelevant for the client)
logger.hadoopnative.name = org.apache.hadoop.util.NativeCodeLoader
logger.hadoopnative.level = OFF

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
#logger.netty.name =
org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
#logger.netty.level = OFF



And then submit a job locally, and send me the respective log file
(containing the "client" string in the file name).

Thanks a lot, and stay healthy through the pandemic!

Best,
Robert


On Thu, Apr 15, 2021 at 9:12 PM Klemens Muthmann 
wrote:

> Hi,
>
> Since kindergarden time is shortened due to the pandemic I only get four
> hours of work into each day and I am supposed to do eight. So unfortunately
> I will not be able to develop a fix at the moment. -.- I am happy to
> provide any debug log you need or test adaptations and provide fixes as
> pull requests. But I will sadly have no time to do any research into the
> problem. :( So for now I guess I will be using one of our Linux servers to
> test the Flink Pipelines until Silicon is supported.
>
> Nevertheless, thanks for your answer. If there is anything I can provide
> you to narrow down the problem, I am happy to help.
>
> Regards
>  Klemens
>
> Am 15.04.2021 um 20:59 schrieb Robert Metzger :
>
> Hey Klemens,
>
> I'm sorry that you are running into this. Looks like you are the first (of
> probably many people) who use Flink on a M1 chip.
>
> If you are up for it, we would really appreciate a fix for this issue, as
> a contribution to Flink.
> Maybe you can distill the problem into an integration test, so that you
> can look into fixes right from your IDE. (It seems that the RestClient is
> causing the problems. The client is used by the command line interface to
> upload the job to the cluster (that's not happening when executing the job
> from the IDE))
> My first guess is that a newer netty version might be required? Or maybe
> there's some DEBUG log output that's helpful in understanding the issue?
>
>
>
>
> On Tue, Apr 1

Re: Question about state processor data outputs

2021-04-15 Thread Robert Metzger
Hey Chen-Che Huang,

I guess the StreamingFileSink is what you are looking for. It is documented
here:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
I drafted a short example (that is not production ready), which does
roughly what you are asking for:
https://gist.github.com/rmetzger/7d5dbdaa118c63f5875c8c9520cc311d

Hope this helps!

Best,
Robert


On Thu, Apr 15, 2021 at 11:33 AM Chen-Che Huang  wrote:

> Hi all,
>
> We're going to use state processor to make our keyedstate data to be
> written to different files based on the keys. More specifically, we want
> our data to be written to files key1.txt, key2.txt, ..., and keyn.txt where
> the value with the same key is stored in the same file. In each file, the
> data may be stored as follows. As far as I know, I need to implement my own
> Sink (org.apache.flink.streaming.api.functions.sink.RichSinkFunction) to
> meet the requirement. However, I wonder is there a native way to achieve
> this without implementing my own Sink because using official solution is
> usually more efficient and reliable than doing it by myself.  Many thanks
> for any comment.
>
> key1.txt
> key1 value11
> key1 value21
> key1 value31
>
> key2.txt
> key2 value21
> key2 value22
> key2 value23
>
> Best wishes,
> Chen-Che Huang
>


Re: Running Apache Flink 1.12 on Apple Silicon M1 MacBook Pro

2021-04-15 Thread Robert Metzger
Hey Klemens,

I'm sorry that you are running into this. Looks like you are the first (of
probably many people) who use Flink on a M1 chip.

If you are up for it, we would really appreciate a fix for this issue, as a
contribution to Flink.
Maybe you can distill the problem into an integration test, so that you can
look into fixes right from your IDE. (It seems that the RestClient is
causing the problems. The client is used by the command line interface to
upload the job to the cluster (that's not happening when executing the job
from the IDE))
My first guess is that a newer netty version might be required? Or maybe
there's some DEBUG log output that's helpful in understanding the issue?




On Tue, Apr 13, 2021 at 5:34 PM Klemens Muthmann 
wrote:

> Hi,
>
> I've just tried to run the basic example for Apache Flink
> 
>  on
> an Apple Mac Pro with the new M1 Processor. I only need this for
> development purposes. The actual thing is going to run on a Linux server
> later on, so I would not mind if it only runs using the Rosetta
> compatibility layer. Unfortunately it failed with the following Stack Trace:
>
> flink-1.12.2 ./bin/flink run examples/streaming/WordCount.jar
> Executing WordCount example with default input data set.
> Use --input to specify file input.
> Printing result to stdout. Use --output to specify output path.
> WARNING: An illegal reflective access operation has occurred
> WARNING: Illegal reflective access by 
> org.apache.flink.api.java.ClosureCleaner 
> (file:/Users/muthmann/Development/Flink/flink-1.12.2/lib/flink-dist_2.11-1.12.2.jar)
>  to field java.lang.String.value
> WARNING: Please consider reporting this to the maintainers of 
> org.apache.flink.api.java.ClosureCleaner
> WARNING: Use --illegal-access=warn to enable warnings of further illegal 
> reflective access operations
> WARNING: All illegal access operations will be denied in a future release
>
> 
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Failed to execute job 'Streaming WordCount'.
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
> at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
> at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at 
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: org.apache.flink.util.FlinkException: Failed to execute job 
> 'Streaming WordCount'.
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1918)
> at 
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
> at 
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
> at 
> org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:97)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
> ... 8 more
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
> submit JobGraph.
> at 
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:400)
> at 
> java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
> at 
> java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
> at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at 
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
> at 
> 

Re: Flink Hadoop config on docker-compose

2021-04-15 Thread Robert Metzger
Hi,

I'm not aware of any known issues with Hadoop and Flink on Docker.

I also tried what you are doing locally, and it seems to work:

flink-jobmanager| 2021-04-15 18:37:48,300 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Starting
StandaloneSessionClusterEntrypoint.
flink-jobmanager| 2021-04-15 18:37:48,338 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Install
default filesystem.
flink-jobmanager| 2021-04-15 18:37:48,375 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Install
security context.
flink-jobmanager| 2021-04-15 18:37:48,404 INFO
 org.apache.flink.runtime.security.modules.HadoopModule   [] - Hadoop
user set to flink (auth:SIMPLE)
flink-jobmanager| 2021-04-15 18:37:48,408 INFO
 org.apache.flink.runtime.security.modules.JaasModule [] - Jaas
file will be created as /tmp/jaas-811306162058602256.conf.
flink-jobmanager| 2021-04-15 18:37:48,415 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
Initializing cluster services.

Here's my code:

https://gist.github.com/rmetzger/0cf4ba081d685d26478525bf69c7bd39

Hope this helps!

On Wed, Apr 14, 2021 at 5:37 PM Flavio Pompermaier 
wrote:

> Hi everybody,
> I'm trying to set up reading from HDFS using docker-compose and Flink
> 1.11.3.
> If I pass 'env.hadoop.conf.dir' and 'env.yarn.conf.dir'
> using FLINK_PROPERTIES (under environment section of the docker-compose
> service) I see in the logs the following line:
>
> "Could not find Hadoop configuration via any of the supported method"
>
> If I'm not wrong, this means that the HADOOP_CONF_DIR is actually not
> generated by the run scripts.
> Indeed, If I add HADOOP_CONF_DIR and YARN_CONF_DIR (always under
> environment section of the docker-compose service) I don't see that line.
>
> Is this the expected behavior?
>
> Below the relevant docker-compose service I use (I've removed the content
> of HADOOP_CLASSPATH content because is too long and I didn't report the
> taskmanager that is similar):
>
> flink-jobmanager:
> container_name: flink-jobmanager
> build:
>   context: .
>   dockerfile: Dockerfile.flink
>   args:
> FLINK_VERSION: 1.11.3-scala_2.12-java11
> image: 'flink-test:1.11.3-scala_2.12-java11'
> ports:
>   - "8091:8081"
>   - "8092:8082"
> command: jobmanager
> environment:
>   - |
> FLINK_PROPERTIES=
> jobmanager.rpc.address: flink-jobmanager
> rest.port: 8081
> historyserver.web.port: 8082
> web.upload.dir: /opt/flink
> env.hadoop.conf.dir: /opt/hadoop/conf
> env.yarn.conf.dir: /opt/hadoop/conf
>   - |
> HADOOP_CLASSPATH=...
>   - HADOOP_CONF_DIR=/opt/hadoop/conf
>   - YARN_CONF_DIR=/opt/hadoop/conf
> volumes:
>   - 'flink_shared_folder:/tmp/test'
>   - 'flink_uploads:/opt/flink/flink-web-upload'
>   - 'flink_hadoop_conf:/opt/hadoop/conf'
>   - 'flink_hadoop_libs:/opt/hadoop-3.2.1/share'
>
>
> Thanks in advance for any support,
> Flavio
>


Re: Flink Taskmanager failure recovery and large state

2021-04-06 Thread Robert Metzger
Hey Yaroslav,

GCS is a somewhat popular filesystem that should work fine with Flink.

It seems that the initial scale of a bucket is 5000 read requests per
second (https://cloud.google.com/storage/docs/request-rate), your job
should be at roughly the same rate (depending on how fast your job restarts
in the restart loop).

You could try to tweak the GCS configuration parameters, such as increasing
"fs.gs.http.read-timeout" and "fs.gs.http.max.retry". (see
https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md
for all available options)


The "ExecutionGraphException: The execution attempt
6f3651a49344754a1e7d1fb20cf2cba3 was not found." error looks weird, but it
should not cause the restarts.


On Fri, Apr 2, 2021 at 2:15 AM Guowei Ma  wrote:

> Hi, Yaroslav
>
> AFAIK Flink does not retry if the download checkpoint from the storage
> fails. On the other hand the FileSystem already has this retry mechanism
> already. So I think there is no need for flink to retry.
> I am not very sure but from the log it seems that the gfs's retry is
> interrupted by some reason. So I think we could get more insight if we
> could find the first fail cause.
>
> Best,
> Guowei
>
>
> On Fri, Apr 2, 2021 at 12:07 AM Yaroslav Tkachenko <
> yaroslav.tkache...@shopify.com> wrote:
>
>> Hi Guowei,
>>
>> I thought Flink can support any HDFS-compatible object store like the
>> majority of Big Data frameworks. So we just added
>> "flink-shaded-hadoop-2-uber" and "gcs-connector-latest-hadoop2"
>> dependencies to the classpath, after that using "gs" prefix seems to be
>> possible:
>>
>> state.checkpoints.dir: gs:///flink-checkpoints
>> state.savepoints.dir: gs:///flink-savepoints
>>
>> And yes, I noticed that retries logging too, but I'm not sure if it's
>> implemented on the Flink side or the GCS connector side? Probably need to
>> dive deeper into the source code. And if it's implemented on the GCS
>> connector side, will Flink wait for all the retries? That's why I asked
>> about the potential timeout on the Flink side.
>>
>> The JM log doesn't have much besides from what I already posted. It's
>> hard for me to share the whole log, but the RocksDB initialization part can
>> be relevant:
>>
>> 16:03:41.987 [cluster-io-thread-3] INFO
>>  org.apache.flink.runtime.jobmaster.JobMaster - Using job/cluster config to
>> configure application-defined state backend:
>> RocksDBStateBackend{checkpointStreamBackend=File State Backend
>> (checkpoints: 'gs:///flink-checkpoints', savepoints:
>> 'gs:///flink-savepoints', asynchronous: TRUE, fileStateThreshold:
>> 1048576), localRocksDbDirectories=[/rocksdb],
>> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4,
>> writeBatchSize=2097152}
>> 16:03:41.988 [cluster-io-thread-3] INFO
>>  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using
>> predefined options: FLASH_SSD_OPTIMIZED.
>> 16:03:41.988 [cluster-io-thread-3] INFO
>>  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using
>> application-defined options factory:
>> DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=4,
>> state.backend.rocksdb.block.blocksize=16 kb,
>> state.backend.rocksdb.block.cache-size=64 mb}}.
>> 16:03:41.988 [cluster-io-thread-3] INFO
>>  org.apache.flink.runtime.jobmaster.JobMaster - Using application-defined
>> state backend: RocksDBStateBackend{checkpointStreamBackend=File State
>> Backend (checkpoints: 'gs:///flink-checkpoints', savepoints:
>> 'gs:///flink-savepoints', asynchronous: TRUE, fileStateThreshold:
>> 1048576), localRocksDbDirectories=[/rocksdb],
>> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4,
>> writeBatchSize=2097152}
>>
>> Thanks!
>>
>> On Thu, Apr 1, 2021 at 2:30 AM Guowei Ma  wrote:
>>
>>> Hi, Yaroslav
>>>
>>> AFAIK there is no official GCS FileSystem support in FLINK.  Does the
>>> GCS is implemented by yourself?
>>> Would you like to share the whole log of jm?
>>>
>>> BTW: From the following log I think the implementation has already some
>>> retry mechanism.
>>> >>> Interrupted while sleeping before retry. Giving up after 1/10
>>> retries for
>>> 'gs:///flink-checkpoints/150a406a50d20e1ee77422d25ef28d
>>>
>>> Best,
>>> Guowei
>>>
>>>
>>> On Thu, Apr 1, 2021 at 12:50 PM Yaroslav Tkachenko <
>>> yaroslav.tkache...@shopify.com> wrote:
>>>
 Hi everyone,

 I'm wondering if people have experienced issues with Taskmanager
 failure recovery when dealing with a lot of state.

 I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints
 and checkpoints. ~150 task managers with 4 slots each.

 When I run a pipeline without much state and kill one of the
 taskmanagers, it takes a few minutes to recover (I see a few restarts), but
 eventually when a new replacement taskmanager is registered with the
 jobmanager things go back to healthy.

 But when I run a pipeline with a lot of state (1TB+) and kill one of
 

Re: Checkpoint timeouts at times of high load

2021-04-06 Thread Robert Metzger
It could very well be that your job gets stuck in a restart loop for some
reason. Can you either post the full TaskManager logs here, or try to
figure out yourself why the first checkpoint that timed out, timed out?
Backpressure or blocked operators are a common cause for this. In your
case, it could very well be that the Kafka producer is not confirming the
checkpoint due to the Kafka transactions. If backpressure is causing this,
consider enabling unaligned checkpoints. It could also be. the case that
the transactions of Kafka are too slow, causing backpressure and checkpoint
timeouts?!



On Mon, Apr 5, 2021 at 9:57 AM Geldenhuys, Morgan Karl <
morgan.geldenh...@tu-berlin.de> wrote:

> Thank you for the information. I have a feeling this is more to do with
> EXACTLY_ONCE kafka producers and transactions not playing nice with
> checkpoints and a timeout happens. The jobs seem to fail and hit this
> restart and fail loop. Looking in the logs, taskmanager logs grow very
> large with the same messages repeating over and over again. Ive attacked a
> file for this. The two lines that give me pause are:
>
>
>
> *Closing the Kafka producer with timeoutMillis = 0 ms. *
>
> *Proceeding to force close the producer since pending requests could not
> be completed within timeout 0 ms.*
>
>
> I'm not really sure which timeout this is but it looks like there is a
> timeout loop happening here.
>
>
> The Kafka producer has been configured as such (the transaction timeout
> has been set on the kafka server to match the producer):
>
>
> Properties kafkaProducerProps = new Properties();
> kafkaProducerProps.setProperty("bootstrap.servers", brokerList);
> kafkaProducerProps.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
> "360");
> kafkaProducerProps.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
>  "5");
> kafkaProducerProps.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
> UUID.randomUUID().toString());
> kafkaProducerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
> "true");
> kafkaProducerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, "500");
> kafkaProducerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
> kafkaProducerProps.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, 
> "33554432");
> kafkaProducerProps.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
> "3");
> kafkaProducerProps.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
> "12");
>
> FlinkKafkaProducer myProducer =
> new FlinkKafkaProducer<>(
> producerTopic,
> (KafkaSerializationSchema) (value, aLong) -> {
> return new ProducerRecord<>(producerTopic, value.getBytes());
> },
> kafkaProducerProps,
> Semantic.EXACTLY_ONCE,
> 10);
>
>
> And checkpoints have been configured as such:
>
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // configuring RocksDB state backend to use HDFS
> String backupFolder = props.getProperty("hdfs.backupFolder");
> StateBackend backend = new RocksDBStateBackend(backupFolder, true);
> env.setStateBackend(backend);
> // start a checkpoint based on supplied interval
> env.enableCheckpointing(checkpointInterval);
> // set mode to exactly-once (this is the default)
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> // make sure progress happen between checkpoints
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(checkpointInterval);
> // checkpoints have to complete within two minute, or are discarded
> env.getCheckpointConfig().setCheckpointTimeout(38);
> //env.getCheckpointConfig().setTolerableCheckpointFailureNumber();
> // no external services which could take some time to respond, therefore 1
> // allow only one checkpoint to be in progress at the same time
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> // enable externalized checkpoints which are deleted after job cancellation
> env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
>
>
> Additionally, each taskmanager has been configured with 4GB of memory,
> there is a sliding window of 10 seconds with a slide of 1 second, and the
> cluster setup is using flink native.
>
>
> Any hints would be much appreciated!
>
>
> Regards,
>
> M.
>
>
> --
> *From:* Guowei Ma 
> *Sent:* 01 April 2021 14:19
> *To:* Geldenhuys, Morgan Karl
> *Cc:* user
> *Subject:* Re: Checkpoint timeouts at times of high load
>
> Hi,
> I think there are many reasons that could lead to the checkpoint timeout.
> Would you like to share some detailed information of checkpoint? For
> example, the detailed checkpoint information from the web.[1]  And which
> Flink version do you use?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/checkpoint_monitoring.html
>
> Best,
> Guowei
>
>
> On Thu, Apr 1, 2021 at 4:33 PM Geldenhuys, Morgan Karl <
> 

Re: Question about checkpoints and savepoints

2021-03-29 Thread Robert Metzger
Mh, did you also check the TaskManger logs?
I'm not aware of any known or issues in the past in that direction, the
codepaths for checkpoint / savepoint are fairly similar when it comes to
storing the data.

You could also try to run Flink on DEBUG log level, maybe that reveals
something?!


On Fri, Mar 26, 2021 at 1:37 PM Robert Cullen  wrote:

> Here’s a snippet from the logs, there are no errors in the logs
>
> 2021-03-23 13:11:52,247 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - 
> 
> 2021-03-23 13:11:52,249 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  
> Preconfiguration:
> 2021-03-23 13:11:52,249 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
>
> JM_RESOURCE_PARAMS extraction logs:
> jvm_params: -Xmx2097152000 -Xms2097152000 -XX:MaxMetaspaceSize=268435456
> logs: INFO  [] - Loading configuration property: jobmanager.rpc.address, 
> flink-jobmanager
> INFO  [] - Loading configuration property: taskmanager.numberOfTaskSlots, 4
> INFO  [] - Loading configuration property: blob.server.port, 6124
> INFO  [] - Loading configuration property: jobmanager.rpc.port, 6123
> INFO  [] - Loading configuration property: taskmanager.rpc.port, 6122
> INFO  [] - Loading configuration property: queryable-state.proxy.ports, 6125
> INFO  [] - Loading configuration property: jobmanager.memory.heap.size, 2000m
> INFO  [] - Loading configuration property: taskmanager.memory.task.heap.size, 
> 2000m
> INFO  [] - Loading configuration property: taskmanager.memory.managed.size, 
> 3000m
> INFO  [] - Loading configuration property: parallelism.default, 2
> INFO  [] - Loading configuration property: state.backend, filesystem
> INFO  [] - Loading configuration property: state.checkpoints.dir, 
> s3://flink/checkpoints
> INFO  [] - Loading configuration property: state.savepoints.dir, 
> s3://flink/savepoints
> INFO  [] - Loading configuration property: s3.endpoint, 
> http://cmdaa-minio:9000
> INFO  [] - Loading configuration property: s3.path-style-access, true
> INFO  [] - Loading configuration property: s3.path.style.access, true
> INFO  [] - Loading configuration property: s3.access-key, cmdaa123
> INFO  [] - Loading configuration property: s3.secret-key, **
> INFO  [] - Final Master Memory configuration:
> INFO  [] -   Total Process Memory: 2.587gb (2777561320 bytes)
> INFO  [] - Total Flink Memory: 2.078gb (2231369728 bytes)
> INFO  [] -   JVM Heap: 1.953gb (2097152000 bytes)
> INFO  [] -   Off-heap: 128.000mb (134217728 bytes)
> INFO  [] - JVM Metaspace:  256.000mb (268435456 bytes)
> INFO  [] - JVM Overhead:   264.889mb (277756136 bytes)
>
>
> On Fri, Mar 26, 2021 at 4:03 AM Robert Metzger 
> wrote:
>
>> Hi,
>>
>> has the "state.savepoints.dir" configuration key the same value as
>> "state.checkpoints.dir"?
>> If not, can you post your configuration keys, and the invocation how you
>> trigger a savepoint?
>> Have you checked the logs? Maybe there's an error message?
>>
>> On Thu, Mar 25, 2021 at 7:17 PM Robert Cullen 
>> wrote:
>>
>>> When I run a job on my Kubernetes session cluster only the checkpoint
>>> directories are created but not the savepoints. (Filesystem configured to
>>> S3 Minio)  Any ideas?
>>>
>>> --
>>> Robert Cullen
>>> 240-475-4490
>>>
>>
>
> --
> Robert Cullen
> 240-475-4490
>


Re: [BULK]Re: [SURVEY] Remove Mesos support

2021-03-29 Thread Robert Metzger
gt;>>> >> From: Xintong Song  tonysong...@gmail.com>>
> >>>> >> Date: Sunday, October 25, 2020 at 10:57 PM
> >>>> >> To: dev mailto:d...@flink.apache.org>>, user
> <
> >>>> >> user@flink.apache.org<mailto:user@flink.apache.org>>
> >>>> >> Cc: Lasse Nedergaard  >>>> >> lassenedergaardfl...@gmail.com>>,  >>>> >> p.nar...@criteo.com>>
> >>>> >> Subject: Re: [SURVEY] Remove Mesos support
> >>>> >>
> >>>> >>
> >>>> >>
> >>>> >> Thanks for sharing the information with us, Piyush an Lasse.
> >>>> >>
> >>>> >>
> >>>> >>
> >>>> >> @Piyush
> >>>> >>
> >>>> >>
> >>>> >>
> >>>> >> Thanks for offering the help. IMO, there are currently several
> problems
> >>>> >> that make supporting Flink on Mesos challenging for us.
> >>>> >>
> >>>> >>   1.  Lack of Mesos experts. AFAIK, there are very few people (if
> not
> >>>> >> none) among the active contributors in this community that are
> familiar
> >>>> >> with Mesos and can help with development on this component.
> >>>> >>   2.  Absence of tests. Mesos does not provide a testing cluster,
> like
> >>>> >> `MiniYARNCluster`, making it hard to test interactions between
> Flink and
> >>>> >> Mesos. We have only a few very simple e2e tests running on Mesos
> deployed
> >>>> >> in a docker, covering the most fundamental workflows. We are not
> sure how
> >>>> >> well those tests work, especially against some potential corner
> cases.
> >>>> >>   3.  Divergence from other deployment. Because of 1 and 2, the new
> >>>> >> efforts (features, maintenance, refactors) tend to exclude Mesos if
> >>>> >> possible. When the new efforts have to touch the Mesos related
> components
> >>>> >> (e.g., changes to the common resource manager interfaces), we have
> to be
> >>>> >> very careful and make as few changes as possible, to avoid
> accidentally
> >>>> >> breaking anything that we are not familiar with. As a result, the
> component
> >>>> >> diverges a lot from other deployment components (K8s/Yarn), which
> makes it
> >>>> >> harder to maintain.
> >>>> >>
> >>>> >> It would be greatly appreciated if you can help with either of the
> above
> >>>> >> issues.
> >>>> >>
> >>>> >>
> >>>> >>
> >>>> >> Additionally, I have a few questions concerning your use cases at
> Criteo.
> >>>> >> IIUC, you are going to stay on Mesos in the foreseeable future,
> while
> >>>> >> keeping the Flink version up-to-date? What Flink version are you
> currently
> >>>> >> using? How often do you upgrade (e.g., every release)? Would you
> be good
> >>>> >> with keeping the Flink on Mesos component as it is (means that
> deployment
> >>>> >> and resource management improvements may not be ported to Mesos),
> while
> >>>> >> keeping other components up-to-date (e.g., improvements from
> programming
> >>>> >> APIs, operators, state backens, etc.)?
> >>>> >>
> >>>> >>
> >>>> >>
> >>>> >> Thank you~
> >>>> >>
> >>>> >> Xintong Song
> >>>> >>
> >>>> >>
> >>>> >>
> >>>> >>
> >>>> >>
> >>>> >> On Sat, Oct 24, 2020 at 2:48 AM Lasse Nedergaard <
> >>>> >> lassenedergaardfl...@gmail.com lassenedergaardfl...@gmail.com>>
> >>>> >> wrote:
> >>>> >>
> >>>> >> Hi
> >>>> >>
> >>>> >>
> >>>> >>
> >>>> >> At Trackunit We have been using Mesos for long time but have now
> moved to
> >>>> >> k8s.
> >>>> >>
> >>>> >> Med venlig hilsen / Best regards
> >>>> >>
> >>&

Re: Hadoop is not in the classpath/dependencies

2021-03-26 Thread Robert Metzger
Hey Matthias,

Maybe the classpath contains hadoop libraries, but not the HDFS libraries?
The "DistributedFileSystem" class needs to be accessible to the
classloader. Can you check if that class is available?

Best,
Robert

On Thu, Mar 25, 2021 at 11:10 AM Matthias Seiler <
matthias.sei...@campus.tu-berlin.de> wrote:

> Hello everybody,
>
> I set up a a Flink (1.12.1) and Hadoop (3.2.1) cluster on two machines.
> The job should store the checkpoints on HDFS like so:
> ```java
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(15000, CheckpointingMode.EXACTLY_ONCE);
> env.setStateBackend(new FsStateBackend("hdfs://node-1:9000/flink"));
> ```
>
> Unfortunately, the JobManager throws
> ```
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
> find a file system implementation for scheme 'hdfs'. The scheme is not
> directly supported by Flink and no Hadoop file system to support this
> scheme could be loaded. For a full list of supported file systems,
> please see
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
> // ...
> Caused by:
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is
> not in the classpath/dependencies.
> ```
> and I don't understand why.
>
> `echo $HADOOP_CLASSPATH` returns the path of Hadoop libraries with
> wildcards. Flink's JobManger prints the classpath which includes
> specific packages from these Hadoop libraries. Besides that, Flink
> creates the state directories on HDFS, but no content.
>
> Thank you for any advice,
> Matthias
>
>


  1   2   3   4   5   6   7   8   9   10   >