Re: Query regarding ClassCastException

2022-03-28 Thread Nicolaus Weidner
Hi Mahima,

have you checked whether the problematic dependency (seems to be avro) is
contained in Flink's /lib folder in your case? I would suggest to check on
JobManagers and TaskManagers, since you say it's in the business logic.

Have you tried the suggestions mentioned in the docs? Those would be:
- Either make sure the dependency is only contained in the jar of your job,
and *not* in the Flink /lib folder
- Or make sure you do not use dynamic classloading at all
- Or try disabling inverted classloading to see if this changes something
- Or shade away the dependency

I don't have experience with Avro caching behavior; the first two
suggestions are probably more relevant there, according to the docs.

Best,
Nico

On Fri, Mar 25, 2022 at 5:22 AM Mahima Agarwal 
wrote:

> Hi Team,
>
> Just a follow up mail regarding the query.
>
> Any leads would be appreciated.
>
> Thanks and Regards
> Mahima Agarwal
>
>
>
> On Mon, Mar 21, 2022 at 2:56 PM Mahima Agarwal 
> wrote:
>
>> Hi Team,
>>
>> I am getting a ClassCastException in my flink job's business logic when
>> using the below code to clone an object with newBuilder.
>>
>> MyClass outEvent = new MyClass();
>> outEvent = (MyClass) MyClass.newBuilder(inEvent).build();
>>
>> Exception : Caused by: java.lang.ClassCastException:
>> com.avro.mon.nfConsumerIdentification cannot be cast to
>> com.avro.mon.nfConsumerIdentification
>>
>> Also, In Flink documention I found a description of similar problem -
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/debugging/debugging_classloading/#x-cannot-be-cast-to-x-exceptions
>>
>> But, as per the solution mentioned in the document we could not package
>> the avro jar in the job, we can only add the avro jar to the Flink /lib
>> folder.
>>
>> Is there any way we can clear the cache in Avro or any other way we can
>> solve this?
>>
>> Any leads would be appreciated.
>>
>> Thanks and Regards
>> Mahima Agarwal
>>
>


Re: Flink metrics via permethous or opentelemerty

2022-02-24 Thread Nicolaus Weidner
Hi Sigalit,

first of all, have you read the docs page on metrics [1], and in particular
the Prometheus section on metrics reporters [2]?
Apart from that, there is also a (somewhat older) blog post about
integrating Flink with Prometheus, including a link to a repo with example
code [3].

Hope that helps to get you started!
Best,
Nico

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/metrics/#metrics
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/metric_reporters/#prometheus
[3] https://flink.apache.org/features/2019/03/11/prometheus-monitoring.html

On Wed, Feb 23, 2022 at 8:42 AM Sigalit Eliazov  wrote:

> Hello. I am looking for a way to expose flink metrics via opentelemerty to
> the gcp could monitoring dashboard.
> Does anyone has experience with that?
>
> If it is not directly possible we thought about using permethous as a
> middlewere.  If you have experience with that i would appreciate any
> guidance.
>
> Thanks
>


Re: java.lang.Exception: Job leader for job id 0efd8681eda64b072b72baef58722bc0 lost leadership.

2022-02-24 Thread Nicolaus Weidner
Hi Jai,

Do writes to ValueStates/MapStates have a direct on churn of the Flink
> State or is the data buffered in between?
>

Writes to keyed state go directly to RocksDB. So there shouldn't be any
memory issues with buffers overflowing or similar. In general, more memory
should increase performance (larger cache sizes before having to write to
disk), but less memory shouldn't cause crashes.

Since the errors you encountered are not that specific, can you provide
full logs surrounding such incidents? There is not much to go on without
further info.

Best,
Nico

>


Re: Trouble sinking to Kafka

2022-02-23 Thread Nicolaus Weidner
Hi Marco,

The documentation kind of suggestion this is the cause:
> https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kafka.html
>
> However, I think the documentation could benefit with a few examples and
> scenarios that can ill-considered configurations.
>

Matthias already provided a nice explanation of the usage of Kafka
transactions in EXACTLY_ONCE mode (thanks Matthias!), so I just want to
comment on the docs: I think you are right that this gotcha could be
pointed out explicitly. In fact, it is pointed out in the 1.14 docs [1]
(last sentence of the EXACTLY_ONCE
description), so others obviously thought the same.
If you feel it should still be added to the 1.12 docs, feel free to open a
Jira ticket or even provide a PR yourself :-) [2].

Best,
Nico

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#fault-tolerance
[2] https://flink.apache.org/contributing/how-to-contribute.html


>


Re: Trouble sinking to Kafka

2022-02-23 Thread Nicolaus Weidner
Hi Marco,

I'm no expert on the Kafka producer, but I will try to help. [1] seems to
have a decent explanation of possible error causes for the error you
encountered.
Which leads me to two questions:


if (druidProducerTransactionMaxTimeoutMs > 0) {
>
>   properties.setProperty("transaction.max.timeout.ms", 
> Integer.toString(druidProducerTransactionMaxTimeoutMs));
>}
>if (druidProducerTransactionTimeoutMs > 0) {
>   properties.setProperty("transaction.timeout.ms", 
> Integer.toString(druidProducerTransactionTimeoutMs));
>}
>
>
Have you tried increasing the timeout settings, to see if transactions
timed out?


>properties.setProperty("transactional.id", "local.druid");
>
>
Do you use multiple producers (parallelism > 1)? It seems you always set
the same transactional.id, which I expect causes problems when you have
multiple producer instances (see "zombie fencing" in [2]). In that case,
just make sure they are unique.

And one additional question: Does the error occur consistently, or only
occasionally?

Best,
Nico

[1]
https://stackoverflow.com/questions/53058715/what-is-reason-for-getting-producerfencedexception-during-producer-send
[2] https://stackoverflow.com/a/52304789

>


Re: java.lang.Exception: Job leader for job id 0efd8681eda64b072b72baef58722bc0 lost leadership.

2022-02-23 Thread Nicolaus Weidner
Hi Jai,

On Tue, Feb 22, 2022 at 9:19 PM Jai Patel 
wrote:

> It seems like the errors are similar to those discussed here:
> - https://issues.apache.org/jira/browse/FLINK-14316
> - https://cdmana.com/2020/11/20201116104527255b.html
>

I couldn't find any other existing issue apart from the one you already
linked. Just to be sure: Which Flink version are you using? Is it one where
the reported issue is fixed?

As for the issue itself, it looks like the connection between JobManager
and TaskManager was lost, though I can't tell why. Do you have full logs
from JobManager and TaskManager surrounding such an incident?


> When looking at the memory structure it looks like all memory is below
> 100% except for managed memory.  We have 9.1GB of managed memory for each
> of our 6 task managers and I estimate that our total Flink State is 600GB.
> Is it okay for run with that little memory for that much State?
>

Are you using RocksDB or HashMap state backend [1]? I assume it's RocksDB,
since with HashMapStateBackend, state size is limited by memory size (and
you are way above that). Did you check out the memory configuration
recommendations in the docs [2, 3]?
In principle (assuming RocksDB is used), I don't think the amount of memory
should be an issue (at least it shouldn't cause crashes). The logs would
help to understand what's happening.

Best,
Nico

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_tuning/#configure-memory-for-state-backends
[3]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/large_state_tuning/#tuning-rocksdb-memory


Re: Scala class ExecutionContext not found on my Java + Flink project

2021-11-26 Thread Nicolaus Weidner
Hi Felipe,

glad this helped and you found the problematic dependencies!

Best,
Nico

On Fri, Nov 26, 2021 at 12:32 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> hey Nico, you nailed it :tada:
>
> after doing "./mvnw dependency:tree | less" I saw scala 2.13 inside
> spring-kafka-test. I excluded it and the error was gone.
>
> 
>   org.springframework.kafka
>   spring-kafka-test
>   
>   
>   
> 
>   com.fasterxml.jackson.module
>   jackson-module-scala_2.13
> 
> 
>   org.scala-lang.modules
>   scala-collection-compat_2.13
> 
> 
>   org.scala-lang.modules
>   scala-java8-compat_2.13
> 
> 
>   com.typesafe.scala-logging
>   scala-logging_2.13
> 
>   
> 
>
> Thank you very much!
> Felipe
>
> *--*
> *-- Felipe Gutierrez*
> *-- skype: felipe.o.gutierrez*
>
>
> On Fri, Nov 26, 2021 at 12:08 PM Nicolaus Weidner <
> nicolaus.weid...@ververica.com> wrote:
>
>> Hi Felipe,
>>
>> can you check for Scala 2.13 dependencies on your classpath (parasitic
>> was only introduced in Scala 2.13)?  Not sure if that comes from Spring
>> Boot 2.6 or something else. If you use Flink 1.13 or lower, it will just
>> use whatever version is on the classpath, so you have to make sure that
>> only Scala 2.11 or 2.12 (depending on your Flink version) is on the
>> classpath, else clashes can occur.
>>
>> ... that said, even with Flink 1.14+, while the RPC system should not
>> suffer this problem anymore, other components may.
>>
>> Best,
>> Nico
>>
>> On Fri, Nov 26, 2021 at 11:21 AM Nicolaus Weidner <
>> nicolaus.weid...@ververica.com> wrote:
>>
>>> Hi Felipe,
>>>
>>> just a quick question to make sure: did you do a full rebuild of your
>>> project after changing the Spring boot version?
>>>
>>> Best,
>>> Nico
>>>
>>> On Thu, Nov 25, 2021 at 8:01 PM Felipe Gutierrez <
>>> felipe.o.gutier...@gmail.com> wrote:
>>>
>>>> Hi community,
>>>>
>>>> I am using Flink 1.11 + Java 8 and I was updating my application from
>>>> Spring boot 1 to spring boot 2.6. Then my Integration Test of Flink + Kafka
>>>> started giving me this error: "java.lang.NoClassDefFoundError:
>>>> scala/concurrent/ExecutionContext$parasitic$". The older version of spring
>>>> boot 1 I didn't have this error. The exact line on my INT test is where I
>>>> use:
>>>>
>>>> org.apache.flink.runtime.minicluster.TestingMiniCluster flink;
>>>> flink.start();
>>>>
>>>> I also have this line that I am thinking to replace. but I don't think
>>>> it is related
>>>> import scala.concurrent.duration.FiniteDuration;
>>>> private FiniteDuration timeout = new FiniteDuration(15L,
>>>> TimeUnit.SECONDS);
>>>>
>>>> I tried to add this dependency but it didn't make any difference.
>>>> 
>>>>   org.scala-lang
>>>>   scala-library
>>>>   2.12.0
>>>> 
>>>>
>>>> Does anybody have an idea of what I am missing?
>>>>
>>>> Full stack trace:
>>>> java.lang.NoClassDefFoundError:
>>>> scala/concurrent/ExecutionContext$parasitic$
>>>>
>>>> at
>>>> scala.concurrent.java8.FuturesConvertersImplCompat$.InternalCallbackExecutor(FuturesConvertersImplCompat.scala:7)
>>>> at
>>>> scala.compat.java8.FutureConverters$.toJava(FutureConverters.scala:72)
>>>> at
>>>> scala.compat.java8.FutureConverters$FutureOps$.toJava$extension(FutureConverters.scala:195)
>>>> at akka.pattern.Patterns$.ask(Patterns.scala:94)
>>>> at akka.pattern.Patterns.ask(Patterns.scala)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.SupervisorActor.startAkkaRpcActor(SupervisorActor.java:173)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcService.registerAkkaRpcActor(AkkaRpcService.java:293)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcService.startServer(AkkaRpcService.java:221)
>>>> at org.apache.flink.runtime.rpc.RpcEndpoint.(RpcEndpoint.java:129)
>>>> at
>>>> org.apache.flink.runtime.metrics.dump.MetricQueryService.(MetricQueryService.java:75)
>>>> at
>>>> org.apache.flink.runtime.metrics.dump.MetricQueryService.createMetricQueryService(MetricQueryService.java:252)
>>>> at
>>>> org.apache.flink.runtime.metrics.MetricRegistryImpl.startQueryService(MetricRegistryImpl.java:182)
>>>> at
>>>> org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:307)
>>>>
>>>> Thanks,
>>>> Felipe
>>>> *--*
>>>> *-- Felipe Gutierrez*
>>>> *-- skype: felipe.o.gutierrez*
>>>>
>>>


Re: Scala class ExecutionContext not found on my Java + Flink project

2021-11-26 Thread Nicolaus Weidner
Hi Felipe,

can you check for Scala 2.13 dependencies on your classpath (parasitic was
only introduced in Scala 2.13)?  Not sure if that comes from Spring Boot
2.6 or something else. If you use Flink 1.13 or lower, it will just use
whatever version is on the classpath, so you have to make sure that only
Scala 2.11 or 2.12 (depending on your Flink version) is on the classpath,
else clashes can occur.

... that said, even with Flink 1.14+, while the RPC system should not
suffer this problem anymore, other components may.

Best,
Nico

On Fri, Nov 26, 2021 at 11:21 AM Nicolaus Weidner <
nicolaus.weid...@ververica.com> wrote:

> Hi Felipe,
>
> just a quick question to make sure: did you do a full rebuild of your
> project after changing the Spring boot version?
>
> Best,
> Nico
>
> On Thu, Nov 25, 2021 at 8:01 PM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> Hi community,
>>
>> I am using Flink 1.11 + Java 8 and I was updating my application from
>> Spring boot 1 to spring boot 2.6. Then my Integration Test of Flink + Kafka
>> started giving me this error: "java.lang.NoClassDefFoundError:
>> scala/concurrent/ExecutionContext$parasitic$". The older version of spring
>> boot 1 I didn't have this error. The exact line on my INT test is where I
>> use:
>>
>> org.apache.flink.runtime.minicluster.TestingMiniCluster flink;
>> flink.start();
>>
>> I also have this line that I am thinking to replace. but I don't think it
>> is related
>> import scala.concurrent.duration.FiniteDuration;
>> private FiniteDuration timeout = new FiniteDuration(15L,
>> TimeUnit.SECONDS);
>>
>> I tried to add this dependency but it didn't make any difference.
>> 
>>   org.scala-lang
>>   scala-library
>>   2.12.0
>> 
>>
>> Does anybody have an idea of what I am missing?
>>
>> Full stack trace:
>> java.lang.NoClassDefFoundError:
>> scala/concurrent/ExecutionContext$parasitic$
>>
>> at
>> scala.concurrent.java8.FuturesConvertersImplCompat$.InternalCallbackExecutor(FuturesConvertersImplCompat.scala:7)
>> at scala.compat.java8.FutureConverters$.toJava(FutureConverters.scala:72)
>> at
>> scala.compat.java8.FutureConverters$FutureOps$.toJava$extension(FutureConverters.scala:195)
>> at akka.pattern.Patterns$.ask(Patterns.scala:94)
>> at akka.pattern.Patterns.ask(Patterns.scala)
>> at
>> org.apache.flink.runtime.rpc.akka.SupervisorActor.startAkkaRpcActor(SupervisorActor.java:173)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcService.registerAkkaRpcActor(AkkaRpcService.java:293)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcService.startServer(AkkaRpcService.java:221)
>> at org.apache.flink.runtime.rpc.RpcEndpoint.(RpcEndpoint.java:129)
>> at
>> org.apache.flink.runtime.metrics.dump.MetricQueryService.(MetricQueryService.java:75)
>> at
>> org.apache.flink.runtime.metrics.dump.MetricQueryService.createMetricQueryService(MetricQueryService.java:252)
>> at
>> org.apache.flink.runtime.metrics.MetricRegistryImpl.startQueryService(MetricRegistryImpl.java:182)
>> at
>> org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:307)
>>
>> Thanks,
>> Felipe
>> *--*
>> *-- Felipe Gutierrez*
>> *-- skype: felipe.o.gutierrez*
>>
>


Re: Scala class ExecutionContext not found on my Java + Flink project

2021-11-26 Thread Nicolaus Weidner
Hi Felipe,

just a quick question to make sure: did you do a full rebuild of your
project after changing the Spring boot version?

Best,
Nico

On Thu, Nov 25, 2021 at 8:01 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi community,
>
> I am using Flink 1.11 + Java 8 and I was updating my application from
> Spring boot 1 to spring boot 2.6. Then my Integration Test of Flink + Kafka
> started giving me this error: "java.lang.NoClassDefFoundError:
> scala/concurrent/ExecutionContext$parasitic$". The older version of spring
> boot 1 I didn't have this error. The exact line on my INT test is where I
> use:
>
> org.apache.flink.runtime.minicluster.TestingMiniCluster flink;
> flink.start();
>
> I also have this line that I am thinking to replace. but I don't think it
> is related
> import scala.concurrent.duration.FiniteDuration;
> private FiniteDuration timeout = new FiniteDuration(15L, TimeUnit.SECONDS);
>
> I tried to add this dependency but it didn't make any difference.
> 
>   org.scala-lang
>   scala-library
>   2.12.0
> 
>
> Does anybody have an idea of what I am missing?
>
> Full stack trace:
> java.lang.NoClassDefFoundError:
> scala/concurrent/ExecutionContext$parasitic$
>
> at
> scala.concurrent.java8.FuturesConvertersImplCompat$.InternalCallbackExecutor(FuturesConvertersImplCompat.scala:7)
> at scala.compat.java8.FutureConverters$.toJava(FutureConverters.scala:72)
> at
> scala.compat.java8.FutureConverters$FutureOps$.toJava$extension(FutureConverters.scala:195)
> at akka.pattern.Patterns$.ask(Patterns.scala:94)
> at akka.pattern.Patterns.ask(Patterns.scala)
> at
> org.apache.flink.runtime.rpc.akka.SupervisorActor.startAkkaRpcActor(SupervisorActor.java:173)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcService.registerAkkaRpcActor(AkkaRpcService.java:293)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcService.startServer(AkkaRpcService.java:221)
> at org.apache.flink.runtime.rpc.RpcEndpoint.(RpcEndpoint.java:129)
> at
> org.apache.flink.runtime.metrics.dump.MetricQueryService.(MetricQueryService.java:75)
> at
> org.apache.flink.runtime.metrics.dump.MetricQueryService.createMetricQueryService(MetricQueryService.java:252)
> at
> org.apache.flink.runtime.metrics.MetricRegistryImpl.startQueryService(MetricRegistryImpl.java:182)
> at
> org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:307)
>
> Thanks,
> Felipe
> *--*
> *-- Felipe Gutierrez*
> *-- skype: felipe.o.gutierrez*
>


Re: Will Flink loss some old Keyed State when changing the parallelism

2021-11-26 Thread Nicolaus Weidner
Hi,

to rescale, you should take a savepoint, stop the job, then restart from
the savepoint with your new desired parallelism. This way, no data will be
lost.

Best,
Nico

On Thu, Nov 25, 2021 at 10:53 AM 杨浩  wrote:

> Will Flink loss some old Keyed State when changing the parallelism, like 2
> -> 5, or 5->3?
>
>
>
>
>
>
>


Re: Possibility of supporting Reactive mode for native Kubernetes application mode

2021-11-03 Thread Nicolaus Weidner
Hi Fuyao,

I just wanted to say that the performance loss that you rightly suspected
when using savepoints (as opposed to checkpoints) may disappear with Flink
1.15. There should be no loss of functionality as far as checkpoints are
concerned.
I don't think the savepoint performance improvement goals are in the public
Flink Jira yet.

You are right about the default value of 1 checkpoint that is retained, but
you can change it if required:
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/#state-checkpoints-num-retained
Now that I think about it, you probably want to keep it at 1 intentionally
(I think that's what you implied) - so you can be sure that the single
checkpoint in the provided checkpoint directory is the latest retained
checkpoint. I haven't attempted something like this before, but it sounds
like it should work.

Best,
Nico

On Tue, Nov 2, 2021 at 10:14 PM Fuyao Li  wrote:

> Hi David, Nicolaus,
>
>
>
> Thanks for the reply.
>
>
>
>1. For your first question, Yes. I want to use the checkpoint to stop
>and restart the application. I think this is similar to the Reactive mode
>strategy, right? (I don’t know the exact implementation behind the Reactive
>mode). From your description and Nicolaus reply, I guess this improvement
>for checkpoint will benefit both Reactive mode and this workflow I designed
>instead of breaking this proposal, right?
>
>
>- *For Nicolaus, after such change in 1.15, do you mean the checkpoint
>can’t be used to restart a job? If this is the case, maybe my proposal will
>not work after 1.15…*
>
> Please share the Jira link to this design if possible and correct my
> statement if I am wrong.
>
>
>
> Nicolaus’s suggestion of leveraging retained checkpoint is exactly what I
> am trying to describe in my 3-step solution.
>
>
>
> Quote from Nicolaus:
>
> “
>
> About your second question: You are right that taking and restoring from
> savepoints will incur a performance loss. They cannot be incremental, and
> cannot use native (low-level) data formats - for now. These issues are on
> the list of things to improve for Flink 1.15, so if the changes make it
> into the release, it may improve a lot.
>
> You can restore a job from a retained checkpoint (provided you configured
> retained checkpoints, else they are deleted on job cancellation), see [1]
> (right below the part you linked). It should be possible to rescale using a
> retained checkpoint, despite the docs suggesting otherwise (it was
> uncertain whether this guarantee should/can be given, so it was not stated
> in the docs. This is also expected to change in the future as it is a
> necessity for further reactive mode development).
>
>
>
> [1] 
> *https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint
> *
>
>
>
> ”
>
>
>
>1. For using Standalone Kubernetes problem. I have development a Flink 
> *native
>Kubernetes* operator upon
>https://github.com/wangyang0918/flink-native-k8s-operator
>
> 
>. Right now, this operator can basically achieve everything Flink CLI could
>do for session mode and application mode and more. This includes features
>like rescaling with savepoint (stop with savepoint and start from
>savepoint), stop with savepoint, submit/stop/cancel session jobs etc. All
>of these are automated through a unified Kubernetes CRD. For sake of time,
>I don’t want to write another operator for standalone k8s operator… As a
>result, I am seeking to add the reactive scaling function into this
>operator. Nevertheless, I really appreciate the work for reactive mode in
>standalone Kubernetes.
>
>
>
> Based on Nicolaus’s reply. I think if we configure the retain checkpoint
> policy. By default, I think only one checkpoint will be retained (please
> correct me if I am wrong) and we can capture the directory and rescale the
> application.
>
> See
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#retained-checkpoints
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#execution-checkpointing-externalized-checkpoint-retention
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint
>
>
>
>
>
>1. I removed the [EXTERNAL] tag in the email. This is something
>automatically added by the company’s email box. Sorry for the confusion.
>
>
>
>
>
> Best Regards,
>
> Fuyao
>
>
>
> *From: *David Morávek 
> *Date: *Tuesday, November 2, 2021 at 05:53
> *To: *Fuyao Li 
> *Cc: *user , Yang Wang ,
> Robert 

Re: NoClassDefFoundError for RMIServerImpl when running Flink with Scala 2.12 and Java 11

2021-11-02 Thread Nicolaus Weidner
Hi,

I tried building Flink 1.13 with the Scala 2.12 profile and running some of
the examples with Java 11, without encountering the issue you describe
(with or without HA). Can you give more details on how exactly you built
Flink (ideally the full command), and how you ran the job?

Best,
Nico

On Thu, Oct 28, 2021 at 10:42 AM L. C. Hsieh  wrote:

> Hi Flink developers,
>
> I encountered a weird error like follows. It only happens when I build
> Flink with Scala 2.12 profile and run with Java 11. No such error for Scala
> 2.11 + Java 8/11.
>
> Just search for relative info, but don't find any useful. Do you have any
> idea about this? Thanks.
>
> java.lang.NoClassDefFoundError: javax/management/remote/rmi/RMIServerImpl
> at
> org.apache.flink.runtime.management.JMXService.getPort(JMXService.java:76)
> ~[flink-dist_2.12-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.connectToResourceManager(TaskExecutor.java:1300)
> ~[flink-dist_2.12-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.tryConnectToResourceManager(TaskExecutor.java:1284)
> ~[flink-dist_2.12-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.reconnectToResourceManager(TaskExecutor.java:1279)
> ~[flink-dist_2.12-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1258)
> ~[flink-dist_2.12-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:181)
> ~[flink-dist_2.12-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:2150)
> ~[flink-dist_2.12-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>


Re: Possibility of supporting Reactive mode for native Kubernetes application mode

2021-11-02 Thread Nicolaus Weidner
Hi Fuyao,

About your second question: You are right that taking and restoring from
savepoints will incur a performance loss. They cannot be incremental, and
cannot use native (low-level) data formats - for now. These issues are on
the list of things to improve for Flink 1.15, so if the changes make it
into the release, it may improve a lot.
You can restore a job from a retained checkpoint (provided you configured
retained checkpoints, else they are deleted on job cancellation), see [1]
(right below the part you linked). It should be possible to rescale using a
retained checkpoint, despite the docs suggesting otherwise (it was
uncertain whether this guarantee should/can be given, so it was not stated
in the docs. This is also expected to change in the future as it is a
necessity for further reactive mode development).

Best,
Nico

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint

On Wed, Oct 27, 2021 at 11:56 PM Fuyao Li  wrote:

> Hello Community,
>
>
>
> I am checking the reactive mode for Flink deployment. I noticed that this
> is supported in Kubernetes environment, but only for standalone Kubernetes
> as of now. I have read some previous discussion threads regarding this
> issue. See [1][2][3][4][5][6].
>
>
>
> Question 1:
>
> It seems that due to some interface and design considerations [4]
> mentioned by Robert and Xintong and official doc[5], this feature is only
> for standalone k8s and it is not available for native Kubernetes now.
> However, I believe in theory, it is possible to be added to native
> Kubernetes, right? Will this be part of the future plan? If not, what is
> the restriction and is it a hard restriction?
>
>
>
> Question 2:
>
> I have built an native Kubernetes operator on top of Yang’s work [7]
> supporting various state transfers in native k8s application mode and
> session mode. Right now, I am seeking for adding some similar features like
> reactive scaling for native k8s. From my perspective, what I can do is to
> enable periodic savepoints and scale up/down based certain metrics we
> collect inside the Flink application. Some additional resource
> considerations need to be added to implement such feature, similar to the
> adaptive scheduler concept in [9][10] (I didn’t dive deep into that, I
> guess I just need to calculated the new TMs will be offered with sufficient
> k8s resources if the rescale happens?)
>
> I think as a user/operator, I am not supposed by to be able to
> recover/restarted a job from checkpoint [8].
>
> I guess this might cause some performance loss since savepoints are more
> expensive and the Flink application must do both savepoint and checkpoint
> periodically… Is there any possible ways that user can also use checkpoints
> to restart and recover as a user? If Question 1 will be part of the future
> plan, I guess I won’t need much work here.
>
>
>
> Reference:
>
> [1] Reactive mode blog:
> https://flink.apache.org/2021/05/06/reactive-mode.html
>
> [2] example usage of reactive scaling:
> https://github.com/rmetzger/flink-reactive-mode-k8s-demo
>
> [3] FILP:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode
>
> [4] Discussion thread:
> https://lists.apache.org/thread.html/ra688faf9dca036500f0445c55671e70ba96c70f942afe650e9db8374%40%3Cdev.flink.apache.org%3E
>
> [5] Flink doc:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/elastic_scaling/
>
> [6] Flink Jira: https://issues.apache.org/jira/browse/FLINK-10407\
> 
>
> [7] https://github.com/wangyang0918/flink-native-k8s-operator
>
> [8]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/checkpoints/#difference-to-savepoints
>
> [9]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management
>
> [10]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-160%3A+Adaptive+Scheduler
>
>
>
> Thanks,
>
> Fuyao
>


Re: Converting a Table to a DataStream[RowData] instead of DataStream[Row] with `toDataStream`

2021-11-02 Thread Nicolaus Weidner
Hi Yuval,

Can you try
toDataStream[RowData](tableSchema.toPhysicalRowDataType.bridgedTo(classOf[RowData]))?

Best regards,
Nico

On Thu, Oct 28, 2021 at 10:15 PM Yuval Itzchakov  wrote:

> Flink 1.14
> Scala 2.12.5
>
> Hi,
> I want to be able to convert a Table into a DataStream[RowData]. I need to
> do this since I already have specific infrastructure in place that knows
> how to serialize RowData into a JSON format. Previously when using
> toAppendStream[RowData], this worked fine.
>
> However, now with the new toDataStream API, I don't see a direct way to do
> this. Using toDataStream[RowData](tableSchema.toPhysicalRowDataType) still
> creates an internal OutputConversionOperator and yields a Row to the Kafka
> sink I'm trying to use.
>
> Would love some help.
>
> --
> Best Regards,
> Yuval Itzchakov.
>


Re: Snapshot method for custom keyed state checkpointing ?

2021-10-12 Thread Nicolaus Weidner
Hi Marc,

thanks for clarifying, I had misunderstood some parts.
Unfortunately, I don't think there is a way to update keyed state (for
multiple keys even) outside of a keyed context.

I will ask if someone else has an idea, but allow me to ask one
counter-question first: Did you actually run tests to verify that using the
custom state solution is more efficient than using Flink's keyed state
regularly (in the end, you would even have to include the state
synchronization in the performance test)? Efficient stateful stream
processing is one of the key features of Flink, and you are essentially
trying to override a specific piece of it with custom logic.

Best regards,
Nico

On Wed, Oct 6, 2021 at 5:50 PM Marc LEGER  wrote:

> Hello Nicolaus,
>
> Thank you for your quick feedback, sorry if I am not clear enough.
> Actually in the documented example, the state which is updated in the
> snapshotState method is an operator state and not a keyed state:
>
> *public void initializeState(FunctionInitializationContext context) throws
> Exception {*
>
>
> *  [...]*
>
> *  countPerPartition =
> context.getOperatorStateStore().getOperatorState(new
> ListStateDescriptor<>("perPartitionCount", Long.class));*
>
>
>
>
> *  [...] } public void snapshotState(FunctionSnapshotContext context)
> throws Exception {*
>
>
> *  [...]*
>
> *  countPerPartition.add(localCount);*
>
> *}*
>
>
> It seems that the method is then only called once per operator parallel
> task and not once per key.
> On my side I have two keyed states with same key (e.g., userId) in a
> CoFlatMapFunction:
>
>
>
>
> *// Control state partitioned by userId private ValueState
> controlState; // Data state partitioned by userId coming from the
> ser/deserialization of a custom system having a partitioned state private
> ValueState dataState;*
>
> and I would like to do something like that to update dataState in a keyed
> context for every key and every checkpoint:
>
>
>
> *public void snapshotState(FunctionSnapshotContext context) throws
> Exception {  dataState.update(customSystem.getSnapshot(context.getKey());
> // Not a keyed context here ! }*
>
> instead of saving dataState in the flatMap2 function for every received
> event:
>
>
> *public void flatMap1(Control control, Collector out) {*
>
> *   controlState.update(control); *
>
> *}*
>
>
>
>
>
>
>
>
>
> *public void flatMap2(Event event, Collector out) {  //
> Perform some event transformations based on controlState  ProcessedEvent
> result = customSystem.process(controlState.value() , event);  // Save
> internal custom system state after processing: can be costly if high event
> throughput
> dataState.update(customSystem.getSnapshot(controlState.value().getUserId());
> // Output the processed event  out.collect(result); }*
>
>
> So basically, I want to be able to synchronize the partitioned state of my
> custom system with the checkpoints done by Flink.
>
>
> Best Regards,
> Marc
>
> Le mer. 6 oct. 2021 à 12:11, Nicolaus Weidner <
> nicolaus.weid...@ververica.com> a écrit :
>
>> Hi Marc,
>>
>> I think you can just use keyed state in a
>> CheckpointedFunction. FunctionInitializationContext gives you access to
>> both keyed state and operator state (your stream needs to be keyed, of
>> course). So you could just update your local custom state on regular
>> invocations and update keyed state on calls to snapshotState.
>> Check out the example in [1] where both types of state are used.
>>
>> Does that help? Not sure if I understood the problem correctly.
>>
>> Best regards,
>> Nico
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java#L74-L110
>>
>> On Tue, Oct 5, 2021 at 3:28 PM Marc LEGER  wrote:
>>
>>> Hello,
>>>
>>> Is there any method available in a RichFunction to be called by Flink
>>> with a keyed context each time a checkpoint is triggered please ?
>>>
>>> It seems that the CheckpointedFunction interface provides such a feature
>>> (snapshotState method) but only in case of operator state and it is called
>>> in a non-keyed context.
>>>
>>> Indeed, I am implementing a CoFlatMapFunction with:
>>> - a keyed state (state1) for a "control" stream (stream1) which is not
>>> often updated,
>>> - a keyed state (state2) for a "data" stream (stream2) with a high
>>> throughput and relying on a custom solution for internal state snapshot
>>> with some potential performance impact.
>>>
>>> Consequently, I don't want to trigger a state2 update for every event
>>> received in stream2 for efficiency reasons but rather update state2 based
>>> on checkpoints triggered by Flink.
>>>
>>> Best Regards,
>>> Marc
>>>
>>>


Re: Snapshot method for custom keyed state checkpointing ?

2021-10-06 Thread Nicolaus Weidner
Hi Marc,

I think you can just use keyed state in a
CheckpointedFunction. FunctionInitializationContext gives you access to
both keyed state and operator state (your stream needs to be keyed, of
course). So you could just update your local custom state on regular
invocations and update keyed state on calls to snapshotState.
Check out the example in [1] where both types of state are used.

Does that help? Not sure if I understood the problem correctly.

Best regards,
Nico

[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java#L74-L110

On Tue, Oct 5, 2021 at 3:28 PM Marc LEGER  wrote:

> Hello,
>
> Is there any method available in a RichFunction to be called by Flink with
> a keyed context each time a checkpoint is triggered please ?
>
> It seems that the CheckpointedFunction interface provides such a feature
> (snapshotState method) but only in case of operator state and it is called
> in a non-keyed context.
>
> Indeed, I am implementing a CoFlatMapFunction with:
> - a keyed state (state1) for a "control" stream (stream1) which is not
> often updated,
> - a keyed state (state2) for a "data" stream (stream2) with a high
> throughput and relying on a custom solution for internal state snapshot
> with some potential performance impact.
>
> Consequently, I don't want to trigger a state2 update for every event
> received in stream2 for efficiency reasons but rather update state2 based
> on checkpoints triggered by Flink.
>
> Best Regards,
> Marc
>
>


Re: asyhcnrouonous io question

2021-10-05 Thread Nicolaus Weidner
Hi Tom,

On Mon, Oct 4, 2021 at 10:42 PM tom yang  wrote:

> Hello,
>
>
>
> I have a recently ran into an issue with RichAsyncFunction and wanted to
> get some guidance from the community
>
>
>
> Please see snippet
>
>
>
> *class* AsyncFetchFromHttp *extends* RichAsyncFunction String, String>> {
>
> 2
>
> 3*private* *transient* AysncHttpClient client;
>
> 4
>
> 5@Override
>
> 6*public* *void* *open*(Configuration parameters) *throws* Exception {
>
> 7client = *new* AysncHttpClient();
>
> 8}
>
> 9
>
> 10@Override
>
> 11*public* *void* close() *throws* Exception {
>
> 12client.close();
>
> 13}
>
> 14
>
> 15@Override
>
> 16*public* *void* asyncInvoke(String key, *final* ResultFuture String, String>> resultFuture) *throws* Exception {
>
> 17
>
> 18*// issue the asynchronous request, receive a future for result*
>
> 19CompleteableFuture> future = httpClient
> .sendAsync(request, HttpResponse.BodyHandlers.ofString())
>
> 20
>
> 21future.whenCompleteAsync((response, throwable) -> {
>
> 22  *if* (throwable != *null* ) {
>
> 23
>
> 24  resultFuture.completeExceptionally(throwable);
>
> 25  }
>
> 26  *else* {
>
> 27*if* (resp.statusCode() == HttpStatus.SC_OK) {
>
> 28  resultFuture.complete(Collections.singleton(*new* Tuple2
> <>(key, response.body())
>
> 29}
>
> 30*else* *if* (resp.statusCode() == HttpStatus.SC_NOT_FOUND) {
>
> 32  resultFuture.complete(Collections.emptyList())
>
> 33}
>
> 34*else* {
>
> 35   resultFuture.completeExceptionally(*new* RuntimeException
> ("Server processing error"));
>
> 36}
>
> 37  }
>
> 38
>
> 39})
>
> 40
>
> 41
>
> 42}
>
> 43}
>
>
>
> 1 . If the future completes exceptionally, ie resultFuture
> .completeExceptionally(throwable);
>
> does the input message get discarded?
>

Which input do you mean here, "request"? It is not defined in your snippet,
did it get lost when trimming unimportant parts?
By default, you will get only the contained throwable, you would have to
enrich it with the input if you want to retain it.

2. Should the request be made on a dedicated ExecutorService or is the
> forkpoolcommon sufficient?
>

I don't see a good reason in general here to use a separate thread pool for
the requests. They are async (not blocking), are part of your Flink job and
run on your Taskmanagers. Unless there is something special in your setup
that makes you suspect they block other tasks...


> 3. If the rest api service for example returns 404, should you complete
> with an empty collection or can you omit line 32 entirely?
>

This depends on your desired behavior: Do you want it to complete normally,
but without any results (this is your current state), or do you want it to
complete exceptionally?

Best regards,
Nico


>
>
> Thanks!
>
>
>
>
>


Re: Issues while upgrading from 1.12.1 to 1.14.0

2021-10-05 Thread Nicolaus Weidner
Hi Parag,

I am not so familiar with the setup you are using, but did you check out
[1]? Maybe the parameter
[--fromSavepoint /path/to/savepoint [--allowNonRestoredState]]
is what you are looking for?

Best regards,
Nico

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/docker/#application-mode-on-docker

On Tue, Oct 5, 2021 at 12:37 PM Parag Somani  wrote:

> Hello,
>
> We are currently using Apache flink 1.12.0 deployed on k8s cluster of 1.18
> with zk for HA. Due to certain vulnerabilities in container related with
> few jar(like netty-*, meso), we are forced to upgrade.
>
> While upgrading flink to 1.14.0, faced NPE,
> https://issues.apache.org/jira/browse/FLINK-23901?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel=17402570#comment-17402570
>
> To address it, I have followed steps
>
>1. savepoint creation
>2. Stop the job
>3. Restore from save point where i am facing challenge.
>
> For step #3 from above, i was able to restore from savepoint mainly
> because:
> "bin/flink run -s :savepointPath [:runArgs] "
> It majorly about restarting a jar file uploaded. As our application is
> based on k8s and running using docker, i was not able to restore it. And
> because of it, state of variables in accumulator got corrupted and i lost
> the data in one of env.
>
> My query is, what is preffered way to restore from savepoint, if
> application is running on k8s using docker.
>
> We are using following command to run job manager:
>  /docker-entrypoint.sh "standalone-job" "-Ds3.access-key=
> ${AWS_ACCESS_KEY_ID}" "-Ds3.secret-key=${AWS_SECRET_ACCESS_KEY}"
> "-Ds3.endpoint=${AWS_S3_ENDPOINT}" "-Dhigh-availability.zookeeper.quorum=
> ${ZOOKEEPER_CLUSTER}" "--job-classname" ""  ${args}
>
> Thank you in advance...!
>
> --
> Regards,
> Parag Surajmal Somani.
>


Re: Pyflik job data stream to table conversion declareManagedMemory exception

2021-10-05 Thread Nicolaus Weidner
Hi Kamil,

On Tue, Oct 5, 2021 at 9:03 AM Kamil ty  wrote:

> Hello,
>
> I'm trying to run a pyflink job in cluster mode (with yarn). My job
> contains source and sink definitions using Table API which are converted to
> a datastream and back. Unfortunately I'm getting an unusual exception at:
> *table = t_env.from_data_stream(ds, 'user_id, first_name, last_name).*
>

Just to make sure: Is the missing quotation mark just a typo in your mail,
or your code (right before the closing bracket)?
*table = t_env.from_data_stream(ds, 'user_id, first_name, last_name['])*

Best regards,
Nico


Re: Python statefun - Context update

2021-09-21 Thread Nicolaus Weidner
Hi Jérémy,

objects are serialized when you store them in state. So when you retrieve
run_state from state, it is deserialized and you have a fresh instance.
Calling method_caller() then modifies this instance, but *not *the
serialized version stored in state.
In the second attempt you described, you modified the retrieved instance,
then stored the modified version in state (or rather a serialized form of
it) - which works as expected.

I don't think there is any way around explicitly storing any local changes
you made in state.

Best,
Nico

On Mon, Sep 20, 2021 at 5:22 PM Jérémy Albrecht 
wrote:

> Hi all !
>
> I am currently developing an environment that uses Flink Stateful
> functions in Python. The architecture is complex but here are the main
> points that needs to be understood to frame the problem I am facing.
> * functions.py contains several methods, one of them is handling protobuf
> messages and storing into context.storage a ValueSpec which is a python
> Object. This object contains classical types such as natives types, but
> also lists of named tuple and objects.
> Since pickle is not able to serialize objects and functions, I use dill to
> serialize this object and make a Flink-compatible type.
> * The object stored in the Context can be resumed as this:
> @dataclass
> class RunState:
> duration: int
> runner: Runner
> and runner is also an instance of the class Runner, defined as:
> class Runner:
> counter: int = 0
>
> def prepare_smth(self):
> self.counter = 10
>
> def add_to_counter():
> self.counter += 1
>
>return add_to_counter
>
> From the object stored in the state of the Flink function I so have access
> to the function definition *add_to_counter* and I can do something like:
> method_caller = ctx.storage.run_state.runner.prepare_smth()
> method_caller()
>
> -> from now on I expect the attribute counter from the Runner instance
> defined as runner in the Run State to have the value 11. What happens is
> that the value is modified inside the context of add_to_counter and
> prepare_smth, but the change is never reflected inside the object stored in
> the context. ctx.storage.state.runner.counter still equals to 0.
>
> * After some research with the debugger, in my opinion, the program tries
> to update the value but ctx.storage seems to be only updatable when I do a
> reassignement:
> e.g. ctx.storage.run_state.counter += 1 has NO effect, but
> rs = ctx.storage.run_state
> rs.counter +=1
> ctx.storage.run_state = rs has the expected result.
>
> If you have any clue about how to start or what to do about my problem I
> greatly appreciate the help !!
> Don't hesitate to ask me if anything is unclear :)
>
> Thanks,
> Jérémy
>
>


Re: Exception by flink kafka

2021-09-20 Thread Nicolaus Weidner
Hi Ragini,

On Fri, Sep 17, 2021 at 1:40 PM Ragini Manjaiah 
wrote:

> Later I started encountering org.apache.kafka.common.errors.TimeoutException:
> Failed to update metadata after 6 ms.
>

This message can have several causes. There may be network issues, your
Kafka configuration might be broken, or the broker could be overloaded, for
example. Did you see any messages reach the destination topic(s) or are
there none at all? See e.g. [1] or [2], where people discussed various
possible causes.


> I face this expectation intermittently and the jobs terminates.
>
> I am using FlinkKafkaProducer010 with these properties set
>
> producerConfig.setProperty(COMPRESSION_TYPE_CONFIG, CompressionType.LZ4.
> name);
> producerConfig.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> bootstrapAddress);
> producerConfig.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "
> 30");
> producerConfig.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "
> 200");
> producerConfig.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,"52428800");
> producerConfig.setProperty(ProducerConfig.LINGER_MS_CONFIG, "900");
> producerConfig.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "524288000
> ");
> producerConfig.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "19
> ");
> producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "0");
> producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "2147483647");
> producerConfig.setProperty("security.protocol","SSL");
>

I am no expert in Kafka configuration, but I notice that you have a very
high  RETRY_BACKOFF_MS_CONFIG (significantly higher than MAX_BLOCK_MS) and
very large BATCH_SIZE_CONFIG. You could try decreasing BATCH_SIZE_CONFIG to
be smaller than MAX_REQUEST_SIZE_CONFIG in particular.
I don't see why this would cause a OOM though. If your Kafka Producer does
not have a lot of heap space, you should try increasing that.

Hope some of this helps!
Nico

[1] https://github.com/dpkp/kafka-python/issues/607
[2]
https://stackoverflow.com/questions/54780605/guidelines-to-handle-timeout-exception-for-kafka-producer


Re: Seeing Exception ClassNotFoundException: __wrapper while running in Kubernetes Cluster

2021-09-02 Thread Nicolaus Weidner
Hi Praneeth,

It does look like a failure constructing the serializer. Can you share the
serialization config you use for the Kafka producer? In particular, are you
using a custom serializer?
Do you use any custom classloading configuration?

Best regards,
Nico

On Wed, Sep 1, 2021 at 11:38 PM Praneeth Ramesh 
wrote:

> Hi All
>
> I am trying to run a flink scala application which reads from kafka apply
> some lookup transformations and then writes to kafka.
>
> I am using Flink Version 1.12.1
>
> I tested it in local and it works fine. But when I try to run it on
> cluster using native kubernetes integration I see weird errors like below.
>
> The cluster also looks fine, because I tried to run a wordcount
> application on the cluster and it worked fine.
>
> The exception is not clear and also the stacktrace shows the taskmanager
> stack trace and hence no idea where in the application the problem could
> be. Could this be a serialization issue? Is there a way to debug such
> issues and find the actual point in application code where there is a
> problem?
>
> ```org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not
> instantiate serializer.
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:216)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createStreamOutput(OperatorChain.java:664)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainOutputs(OperatorChain.java:250)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:160)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:533)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> [flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> [flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at java.lang.Thread.run(Unknown Source) [?:?]
> Caused by: java.io.IOException: unexpected exception type
> at java.io.ObjectStreamClass.throwMiscException(Unknown Source)
> ~[?:?]
> at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
> ~[?:?]
> at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
> ~[?:?]
> at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
> at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?]
> at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
> at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
> ~[?:?]
> at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
> ~[?:?]
> at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
> at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
> at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> ... 8 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.ClassNotFoundException:
> __wrapper$1$7aa8fcbe22114421a688e120fcde1df7.__wrapper$1$7aa8fcbe22114421a688e120fcde1df7$
> at
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
> 

Re: How Do I Specify the Encryption Algorithm Suite of the Flink REST Service?

2021-07-07 Thread Nicolaus Weidner
Hi Wanghui,

quoting your reply here since it went only to me instead of the mailing
list as well:

Hi Nico:
>
>  Thank you for your reply.
>
>  I configured security.ssl.algorithms in flink-conf.yaml, but it
> seems to work only for SSL connections to internal services.
>
>  The red part in the figure does not take effect.
>
> Best regards,
>
> WangHui
>

You are right, in SSLUtils#createRestNettySSLContext, ciphers are not
considered (as opposed to SSLUtils#createInternalNettySSLContext) -
corresponding to your observation that it works for internal, but not
external communication. Adding Nico (another one) to the loop here, who may
know more about whether there is a reason for this or whether this is a bug.

Best regards,
Nico

On Tue, Jul 6, 2021 at 5:28 PM Nicolaus Weidner <
nicolaus.weid...@data-artisans.com> wrote:

> Hi Wanghui,
>
> if I understand correctly, you are looking for the config option
> security.ssl.algorithms [1]?
>
> Best regards,
> Nico
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/security/security-ssl/#cipher-suites
>
> On Tue, Jul 6, 2021 at 3:46 AM Wanghui (HiCampus) 
> wrote:
>
>> Hi all:
>>
>> How Do I Specify the Encryption Algorithm Suite of the REST Service After
>> the SSL Option Is Enabled for Flink?
>>
>>
>>
>> *王辉  (WangHui)*
>>
>> 智慧园区解决方案部 [云与计算BG]
>>
>> 华为技术有限公司
>>
>> Tel : +86 15940836304
>>
>> Email : *wanghu...@huawei.com *
>>
>>
>> This e-mail and its attachments contain confidential information from
>> HUAWEI, which is intended only for the person or entity whose address is
>> listed above. Any use of the information contained herein in any way
>> (including, but not limited to, total or partial disclosure, reproduction,
>> or dissemination) by persons other than the intended recipient(s) is
>> prohibited. If you receive this e-mail in error, please notify the sender
>> by phone or email immediately and delete it!
>>
>>
>>
>


Re: Issue while using parallelism.default in flink-conf.yaml file

2021-07-07 Thread Nicolaus Weidner
Hi Mahima,

looks like you found the relevant parts of the code already: In
JarHandlerUtils.JarHandlerContext#fromRequest, the parallelism value is
extracted from the request body or query parameter (the latter is
deprecated, though). If none is found, it defaults to 1 and overwrites the
configured value of 3 in applyToConfiguration.
>From what I could gather, this is the case for legacy reasons (backwards
compatibility). I am afraid when executing a job by running a jar that was
previously uploaded via the jar upload, default parallelism from the config
will not be considered.

Best regards,
Nico

On Wed, Jul 7, 2021 at 8:18 AM Mahima Agarwal 
wrote:

> Hi Team,
>
> Please find the query below.
>
> Use Case: Using parallelism.default property mentioned in flink-conf.yaml
> file to enable system-level parallelism in flink configuration.
>
> Issue: Even after setting the parallelism.default to 3, on config start
> the configuration starts with parallelism as 1.
>
> On debugging the code we found that the value of parallelism.default in
> Configuration object instantiated inside handleRequest() method of
> JarRunHandler class(Line Number - 90) is initially set to 3 but it is
> changed to 1 in applyToConfiguration method of
> JarHandlerUtils.JarHandlerContext class(Line Number - 132) which is called
> from handleRequest method of JarRunHandler(Line Number - 95).
>
> Flink Version - 1.12.1
> Job Code -
>
> public class FlinkJob
> {
>
> public static void main(String[] args) throws Exception
> {
>
> String TOPIC_IN = args[0];
> String TOPIC_OUT = args[1];
> String BOOTSTRAP_SERVER = args[2];
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> Properties props = new Properties();
> props.put("bootstrap.servers", BOOTSTRAP_SERVER);
> props.put("group.id","kc1");
>
> KafkaDeserializationSchema deserializationSchema = new
> MMDeserializer<>("org.gamezone.GameZoneInput");
>
> FlinkKafkaConsumer kafkaConsumer = new
> FlinkKafkaConsumer<>(TOPIC_IN, deserializationSchema, props);
> kafkaConsumer.setStartFromLatest();
>
> Properties prodProps = new Properties();
> prodProps.put("bootstrap.servers", BOOTSTRAP_SERVER);
>
> FlinkKafkaProducer kafkaProducer =
> new FlinkKafkaProducer<>(TOPIC_OUT,
> ((value, timestamp) -> new
> ProducerRecord<>(TOPIC_OUT, "myKey".getBytes(),
> value.toString().getBytes())),
> prodProps,
> Semantic.AT_LEAST_ONCE);
>
> DataStream stream = env.addSource(kafkaConsumer);
> KeySelector keys = create();
> KeyedStream playerId = stream.keyBy(keys);
>
> playerId.process(new KeyedAggregateFunction(),
> TypeInformation.of(new TypeHint()
> {})).addSink(kafkaProducer);
>
> env.execute();
> }
>
> public static KeySelector create()
> {
> return record ->
> {
> final Serializable key = ((GameZoneInput)record).getPlayerId();
> return key;
> };
> }
>
> Any leads would be appreciated.
>
> Thanks
> Mahima
>


Re:

2021-07-07 Thread Nicolaus Weidner
Hi Maciek,

is there a typo in the input data? Timestamp 2021-05-01 04:42:57 appears
twice, but timestamp 2021-05-01T15:28:34 (from the log lines) is not there
at all. I find it hard to correlate the logs with the input...

Best regards,
Nico

On Wed, Jul 7, 2021 at 11:16 AM Arvid Heise  wrote:

> Hi Maciek,
>
> could you bypass the MATCH_RECOGNIZE (=comment out) and check if the
> records appear in a shortcutted output?
>
> I suspect that they may be filtered out before (for example because of
> number conversion issues with 0E-18)
>
> On Tue, Jul 6, 2021 at 3:26 PM Maciek Bryński  wrote:
>
>> Hi,
>> I have a very strange bug when using MATCH_RECOGNIZE.
>>
>> I'm using some joins and unions to create event stream. Sample event
>> stream (for one user) looks like this:
>>
>> uuidcif event_type  v   balance ts
>> 621456e9-389b-409b-aaca-bca99eeb43b30004091386  trx
>>  4294.38 74.524950   2021-05-01 04:42:57
>> 7b2bc022-b069-41ca-8bbf-e93e3f0e85a70004091386  application
>>  0E-18   74.524950   2021-05-01 10:29:10
>> 942cd3ce-fb3d-43d3-a69a-aaeeec5ee90e0004091386  application
>>  0E-18   74.524950   2021-05-01 10:39:02
>> 433ac9bc-d395-457n-986c-19e30e375f2e0004091386  trx
>>  4294.38 74.524950   2021-05-01 04:42:57
>>
>> Then I'm using following MATCH_RECOGNIZE definition (trace function will
>> be explained later)
>>
>> CREATE VIEW scenario_1 AS (
>> SELECT * FROM events
>> MATCH_RECOGNIZE(
>> PARTITION BY cif
>> ORDER BY ts
>> MEASURES
>> TRX.v as trx_amount,
>> TRX.ts as trx_ts,
>> APP_1.ts as app_1_ts,
>> APP_2.ts as app_2_ts,
>> APP_2.balance as app_2_balance
>> ONE ROW PER MATCH
>> PATTERN (TRX ANY_EVENT*? APP_1 NOT_LOAN*? APP_2) WITHIN INTERVAL
>> '10' DAY
>> DEFINE
>> TRX AS trace(TRX.event_type = 'trx' AND TRX.v > 1000,
>>   'TRX', TRX.uuid, TRX.cif, TRX.event_type, TRX.ts),
>> ANY_EVENT AS trace(true,
>>   'ANY_EVENT', TRX.uuid, ANY_EVENT.cif,
>> ANY_EVENT.event_type, ANY_EVENT.ts),
>> APP_1 AS trace(APP_1.event_type = 'application' AND APP_1.ts <
>> TRX.ts + INTERVAL '3' DAY,
>>   'APP_1', TRX.uuid, APP_1.cif, APP_1.event_type,
>> APP_1.ts),
>> APP_2 AS trace(APP_2.event_type = 'application' AND APP_2.ts >
>> APP_1.ts
>>AND APP_2.ts < APP_1.ts + INTERVAL '7' DAY AND
>> APP_2.balance < 100,
>>   'APP_2', TRX.uuid, APP_2.cif, APP_2.event_type,
>> APP_2.ts),
>> NOT_LOAN AS trace(NOT_LOAN.event_type <> 'loan',
>>   'NOT_LOAN', TRX.uuid, NOT_LOAN.cif,
>> NOT_LOAN.event_type, NOT_LOAN.ts)
>> ))
>>
>>
>> This scenario could be matched by sample events because:
>> - TRX is matched by event with ts 2021-05-01 04:42:57
>> - APP_1 by ts 2021-05-01 10:29:10
>> - APP_2 by ts 2021-05-01 10:39:02
>> Unfortunately I'm not getting any data. And it's not watermarks fault.
>>
>> Trace function has following code and gives me some logs:
>>
>> public class TraceUDF extends ScalarFunction {
>>
>> public Boolean eval(Boolean condition, @DataTypeHint(inputGroup =
>> InputGroup.ANY) Object ... message) {
>> log.info((condition ? "Condition true: " : "Condition false: ")
>> + Arrays.stream(message).map(Object::toString).collect(Collectors.joining("
>> ")));
>> return condition;
>> }
>> }
>>
>> And log from this trace function is following.
>>
>> 2021-07-06 13:09:43,762 INFO TraceUDF [] -
>> Condition true: TRX 621456e9-389b-409b-aaca-bca99eeb43b3 0004091386 trx
>> 2021-05-01T04:42:57
>> 2021-07-06 13:12:28,914 INFO  TraceUDF [] -
>> Condition true: ANY_EVENT 621456e9-389b-409b-aaca-bca99eeb43b3 0004091386
>> trx 2021-05-01T15:28:34
>> 2021-07-06 13:12:28,915 INFO  TraceUDF [] -
>> Condition false: APP_1 621456e9-389b-409b-aaca-bca99eeb43b3 0004091386 trx
>> 2021-05-01T15:28:34
>> 2021-07-06 13:12:28,915 INFO  TraceUDF [] -
>> Condition false: TRX 433ac9bc-d395-457n-986c-19e30e375f2e 0004091386 trx
>> 2021-05-01T15:28:34
>>
>> As you can see 2 events are missing.
>> What can I do ?
>> I failed with create minimal example of this bug. Any other ideas ?
>>
>


Re: WELCOME to user@flink.apache.org

2021-07-06 Thread Nicolaus Weidner
Hi Wanghui,

unfortunately, this is not supported to my knowledge. See also this similar
question on Stackoverflow:
https://stackoverflow.com/questions/60950594/flink-encryption-parameters-in-flink-conf-yaml

Best regards,
Nico

On Mon, Jul 5, 2021 at 3:45 PM Wanghui (HiCampus) 
wrote:

> Hello, I find that security.ssl.rest.enabled: true is configured in flink,
> and the Java keystore password is stored in plaintext in the configuration
> file. Can the keystore password be encrypted for storage?
>
>
>


Re: How Do I Specify the Encryption Algorithm Suite of the Flink REST Service?

2021-07-06 Thread Nicolaus Weidner
Hi Wanghui,

if I understand correctly, you are looking for the config option
security.ssl.algorithms [1]?

Best regards,
Nico

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/security/security-ssl/#cipher-suites

On Tue, Jul 6, 2021 at 3:46 AM Wanghui (HiCampus) 
wrote:

> Hi all:
>
> How Do I Specify the Encryption Algorithm Suite of the REST Service After
> the SSL Option Is Enabled for Flink?
>
>
>
> *王辉  (WangHui)*
>
> 智慧园区解决方案部 [云与计算BG]
>
> 华为技术有限公司
>
> Tel : +86 15940836304
>
> Email : *wanghu...@huawei.com *
>
>
> This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any way
> (including, but not limited to, total or partial disclosure, reproduction,
> or dissemination) by persons other than the intended recipient(s) is
> prohibited. If you receive this e-mail in error, please notify the sender
> by phone or email immediately and delete it!
>
>
>


Re: Allow setting job name when using StatementSet

2021-06-07 Thread Nicolaus Weidner
Hi Yuval,

I am not familiar with the Table API, but in the fragment you posted, the
generated job name is only used as default if configuration option
pipeline.name is not set. Can't you just set that to the name you want to
have?

Best wishes,
Nico

On Mon, Jun 7, 2021 at 10:09 AM Yuval Itzchakov  wrote:

> Hi,
>
> Currently when using StatementSet, the name of the job is autogenerated by
> the runtime:
>
> [image: image.png]
>
> Is there any reason why there shouldn't be an overload that allows the
> user to explicitly specify the name of the running job?
> --
> Best Regards,
> Yuval Itzchakov.
>


Re: Stream processing into single sink to multiple DB Schemas

2021-06-07 Thread Nicolaus Weidner
Hi Tamir,

I assume you want to use the Jdbc connector?
You can use three filters on your input stream to separate it into three
separate streams, then add a sink to each of those (see e.g. [1]). Then you
can have a different SQL statement for each of the three sinks. If you
specify the driver name in JdbcConnectionOptions, that driver will be used
to obtain a DB connection (see [2]). So if you use a pooling driver (e.g.
[3]), connections should automatically be taken from a shared pool.

Does that help?

Best wishes,
Nico

[1]
https://stackoverflow.com/questions/53588554/apache-flink-using-filter-or-split-to-split-a-stream
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/jdbc/#jdbc-connection-parameters
[3] https://commons.apache.org/proper/commons-dbcp/

On Mon, Jun 7, 2021 at 8:23 AM Tamir Sagi 
wrote:

> Hey Community
>
> Assuming there are 3 groups,
> A, B, C
>
> Each group represents a set of data about employees and salaries.
> Group A ( 0-20K $)
> Group B (20K$ - 50K$)
> Group C ( > 50K$)
>
> Is it possible to process stream data from single source containing
> information about employees and salaries and split the data into different
> DB schemas on the same DB? (Single Sink - *Single Connection*)
>
> I Encountered Side output and dynamic tables
>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/side_output/
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/concepts/dynamic_tables/
>
> I'm not sure it's the right way.
>
> If there is a better way , enlighten me
>
> Thank you,
>
> Tamir.
>
>
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>


Re: Is it possible to customize avro schema name when using SQL

2021-06-07 Thread Nicolaus Weidner
Hi Tao,

This is currently not possible using Table API, though this will likely
change in a future version. Currently, you would have to do that using the
Datastream API [1] and then switch to the Table API.

Best wishes,
Nico

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/

On Sun, Jun 6, 2021 at 9:55 AM tao xiao  wrote:

> Hi team,
>
> I want to use avro-confluent to encode the data using SQL but the schema
> registered by the encoder hard code the schema name to 'record'. is it
> possible to dictate the name?
>
> --
> Regards,
> Tao
>


Re: How to get throughput and processing latency when testing Flink with hibench?

2021-05-17 Thread Nicolaus Weidner
Hi,

Flink allows to enable latency tracking [1] and exposes several metrics
that might be what you are looking for [2, look for e.g. "numRecordsIn" or
"numBytesIn"]. You can query these metrics using the REST API [3] or by
registering a metrics reporter [4] that exposes them. As noted in the other
email, I cannot speak about what hibench does and why it cannot show any
throughput information for Flink. Even Flink 1.0.3 seems to expose some
metrics (just going by the docs [5] here, I was not involved back then).

Best wishes,
Nico

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/metrics/#latency-tracking
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/metrics/#io
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/rest_api/
[4]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/metric_reporters/
[5]
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/monitoring_rest_api.html

On Mon, May 17, 2021 at 2:49 PM penguin.  wrote:

> When using hibench to test Flink, the jobs submitted are built-in
> applications of hibench, that is, the code logic of programs like wordcount
> cannot be changed.
>
> How can I get the throughput and processing delay of Flink?
>
> In addition, in the /report/hibench.report file of hibench, we can't get
> the throughput information of Flink. The contents of
> /report/hibench.report are as follows:
> TypeDateTimeInput_data_sizeDuration(s)Throunghput(bytes/s)Throunghput/node
> FlinkFixWindow 2021-05-17 16:17:58 0 88.599 0 0
> FlinkFixWindow 2021-05-17 16:33:12 0 872.982 0 0
> FlinkFixWindow 2021-05-17 16:53:29 0 301.524 0 0
> FlinkFixWindow 2021-05-17 17:09:22 0 880.098 0 0
>
>
>
>


Re: Testing Flink with identity program in hibench

2021-05-17 Thread Nicolaus Weidner
Hi,

I am not familiar with hibench. Does the Flink UI show the configured
parallelism of 20 for the job, and there are indeed 20 partitions on the
Kafka topic you consume?
Which Flink version are you running? The repo
https://github.com/Intel-bigdata/HiBench mentions Flink 1.0.3, which is *very
*outdated. The current release is 1.13.0, so results may not be accurate.

Best wishes,
Nico

On Mon, May 17, 2021 at 10:28 AM penguin.  wrote:

> Thanks for reading this email.
>
> According to the introduction, the identity program in hibench reads data
> from Kafka and then writes it back to Kafka.
>
>
> When using the identity program in hibench to test the Flink, set the
> parallelism to 20 in the flink.conf file in the conf directory of hibench.
>
>
> After the task is submitted, only one subtask is found on the UI
> interface, that is, only one slot in the cluster is assigned a subtask. How
> can I have multiple subtasks when using identity to test Flink?
>
> (it seems that every time the picture is uploaded, it can't be displayed,
> so no screenshot is provided.)
>
>
>
>


Re: Poor use case? (late arrival + IoT + windowing)

2021-05-17 Thread Nicolaus Weidner
Hi,

On Sat, May 15, 2021 at 5:07 PM  wrote:

> First I was told that my application need only perform keyed aggregation
> of streaming IoT data on a sliding window. Flink seemed the obvious choice.
>
> Then I was told that the window size must be configurable, taking on one
> of 5 possible values, anywhere from 5-60 minutes. Oh and configuration
> changes should take effect immediately. No biggie - I just opted to perform
> aggregation against all 5 possible window durations and let the
> post-processor worry about which outputs were of interest.
>
> Now the latest requirement (and the interesting part): if the IoT devices
> lose connectivity, they will buffer many days worth of data until
> connectivity is restored at which point all of that buffered data will be
> transmitted to my application. I believe this implies that event time (as
> determined by each individual device) must now be taken into consideration
> but...
>
>
> Question 1: is Flink really the right choice for this application now?
> Assuming the memory requirements for allowing such late data wouldn't be a
> deal-breaker, is Flink even capable of tracking event time on a per
> device/key basis?
>

Tracking and using event time for a windowed aggregation is certainly
possible. You can check out [1] for an introduction and [2] for some
further information on assigning timestamps to events. Of course, the
events need to contain some timestamp of when they were produced in the
first place, which I assume to be the case.

Next, lateness: You will need to define a "watermark strategy", i.e. a
strategy for deciding when it is safe to close a certain window (see [2]).
For example, you could decide that events will probably arrive at most 30
minutes out of order, so 30 minutes after seeing an event with timestamp x,
windows with that ending timestamp can be closed. Note that key-based
watermarks are not supported currently, they are global.
In addition, you can configure "allowed lateness" for windows [3], meaning
that windows will fire again with updated results if events arrive after
the "end watermark" of the window has passed and it fired once.


> Question 2: Assuming a solution with Flink is suitable, what constructs
> would I need to leverage? Custom windows maybe? Custom triggers and
> evictors?
>

For your use case, you would probably need to allow lateness of one or even
several weeks. This is not necessarily a problem, but it will depend on the
type of aggregation you perform - whether all events need to be kept in
state or just some aggregate values. There are some tips on state size on
the bottom of the page in [3]. If you use RocksDB as state backend, state
will be kept on disk, so memory limitations shouldn't be an issue.
A custom trigger is not strictly required, but helpful: The default
EventTimeTrigger would fire for each element of a late batch, whereas you
probably want to fire only after no further event was received for some
time span.

An alternative would be to route late events to a side output (see also
[3]) and process them separately. This may be preferrable if late batches
are more of a rare case, as it won't interfere with the main streaming
logic.

Hope this helped at least a bit!
Best wishes,
Nico

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/concepts/time/#event-time-and-watermarks
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/event-time/generating_watermarks/
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/windows/#allowed-lateness
[4]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/


Re: Log rollover for logs.

2021-04-27 Thread Nicolaus Weidner
Hi John,

On Tue, Apr 27, 2021 at 9:47 AM John Smith  wrote:

> Hi, I'm running flink as a systemd service with...
>
> [Service]
> Type=forking
> WorkingDirectory=/opt/flink
> User=flink
> Group=flink
> ExecStart=/opt/flink/bin/taskmanager.sh start
> ExecStop=/opt/flink/bin/taskmanager.sh stop
> TimeoutSec=30
> Restart=on-failure
>
> My log4j.porperties file is at /opt/flink/conf
>
> Is it as simple as just setting the log rollover in there? Cause sometimes
> with certain services like zookeeper some env variable overrides etc... So
> just wondering if it's that straight forward with flink.
>

yes, you can use the log4j RollingFileAppender [1] for this purpose. You
can also find more information regarding logging in Flink in the docs [2].

Best regards,
Nico

[1]
https://logging.apache.org/log4j/2.x/manual/appenders.html#RollingFileAppender
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/advanced/logging.html