Re: Best practice for creating/restoring savepoint in standalone k8 setup

2022-07-05 Thread jonas eyob
Thanks Weihua and Gyula,

@Weihia
> If you restart flink cluster by delete/create deployment directly, it
will be automatically restored from the latest checkpoint[1], so maybe just
enabling the checkpoint is enough.
Not sure I follow, we might have changes to the job that will require us to
restore from a savepoint, where checkpoints wouldn't be possible due to
significant changes to the JobGraph.

> But if you want to use savepoint, you need to check whether the latest
savepoint is successful (check whether have _metadata in savepoint dir is
useful in most scenarios, but in some cases the _metadata may not be
completed).

Yes that is basically what our savepoint restore script does, it checks S3
to see if we have any savepoints generated and will specify that to the
"--fromSavePoint" argument.

@Gyula

>Did you check the https://github.com/apache/flink-kubernetes-operator
<https://github.com/apache/flink-kubernetes-operator> by any chance?
Interesting, no I have missed this! will have a look but it would also be
interesting to see how this have been solved before the introduction of the
Flink operator

Den tis 5 juli 2022 kl 16:37 skrev Gyula Fóra :

> Hi!
>
> Did you check the https://github.com/apache/flink-kubernetes-operator
> <https://github.com/apache/flink-kubernetes-operator> by any chance?
>
> It provides many of the application lifecycle features that you are
> probably after straight out-of-the-box. It has both manual and periodic
> savepoint triggering also included in the latest upcoming version :)
>
> Cheers,
> Gyula
>
> On Tue, Jul 5, 2022 at 5:34 PM Weihua Hu  wrote:
>
>> Hi, jonas
>>
>> If you restart flink cluster by delete/create deployment directly, it
>> will be automatically restored from the latest checkpoint[1], so maybe just
>> enabling the checkpoint is enough.
>> But if you want to use savepoint, you need to check whether the latest
>> savepoint is successful (check whether have _metadata in savepoint dir is
>> useful in most scenarios, but in some cases the _metadata may not be
>> completed).
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/
>>
>> Best,
>> Weihua
>>
>>
>> On Tue, Jul 5, 2022 at 10:54 PM jonas eyob  wrote:
>>
>>> Hi!
>>>
>>> We are running a Standalone job on Kubernetes using application
>>> deployment mode, with HA enabled.
>>>
>>> We have attempted to automate how we create and restore savepoints by
>>> running a script for generating a savepoint (using k8 preStop hook) and
>>> another one for restoring from a savepoint (located in a S3 bucket).
>>>
>>> Restoring from a savepoint is typically not a problem once we have a
>>> savepoint generated and accessible in our s3 bucket. The problem is
>>> generating the savepoint which hasn't been very reliable thus far. Logs are
>>> not particularly helpful either so we wanted to rethink how we go about
>>> taking savepoints.
>>>
>>> Are there any best practices for doing this in a CI/CD manner given our
>>> setup?
>>>
>>> --
>>>
>>>

-- 
*Med Vänliga Hälsningar*
*Jonas Eyob*


Best practice for creating/restoring savepoint in standalone k8 setup

2022-07-05 Thread jonas eyob
Hi!

We are running a Standalone job on Kubernetes using application deployment
mode, with HA enabled.

We have attempted to automate how we create and restore savepoints by
running a script for generating a savepoint (using k8 preStop hook) and
another one for restoring from a savepoint (located in a S3 bucket).

Restoring from a savepoint is typically not a problem once we have a
savepoint generated and accessible in our s3 bucket. The problem is
generating the savepoint which hasn't been very reliable thus far. Logs are
not particularly helpful either so we wanted to rethink how we go about
taking savepoints.

Are there any best practices for doing this in a CI/CD manner given our
setup?

--


Re: Log4j2 configuration

2022-02-15 Thread jonas eyob
1. Ok, thanks!
2. We are using application mode. No changes to the distribution other than
updating the log4j-console.properties file.

content of /lib/:

* flink-csv-1.14.3.jar
* flink-json-1.14.3.jar
* flink-table_2.12-1.14.3.jar
* log4j-api-2.17.1.jar
* log4j-slf4j-impl-2.17.1.jar
* flink-dist_2.12-1.14.3.jar
* flink-shaded-zookeeper-3.4.14.jar
* log4j-1.2-api-2.17.1.jar
* log4j-core-2.17.1.jar

Den tis 15 feb. 2022 kl 16:30 skrev Chesnay Schepler :

> 1) You either need to modify the log4j-console.properties file, or
> explicitly set the log4j.configurationFile property to point to your .xml
> file.
> 2)
> Have you made modifications to the distribution (e.g., removing other
> logging jars from the lib directory)?
> Are you using application mode, or session clusters?
>
> On 15/02/2022 16:41, jonas eyob wrote:
>
> Hey,
>
> We are deploying our Flink Cluster on a standalone Kubernetes with the
> longrunning job written in scala.
>
> We recently upgraded our Flink cluster from 1.12 to 1.14.3 - after which
> we started seeing a few problems related to logging which I have been
> struggling to fix for the past days).
> Related is also an attempt to add, we are also attempting to add a Sentry
> integration for our error logs.
>
> PROBLEM 1 - Error logs not being sent to Sentry.
> We are bundling our code and dependencies into a FAT jar, which includes a
> log4j2.xml specifying the Sentry Appender. But if I understand the
> documentation
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/advanced/logging/#configuring-log4j-2>
> correctly our log4j2.xml won't be picked up by Flink as it already defines
> a set of default logging configurations files (e.g. log4j and logback).
>
> Q: How does Flink resolve logging configurations to use?
>
> I can see the following JVM override params provided when running in our
> dockerized version locally.
>
> -Dlog.file=/opt/flink/log/flink--taskexecutor-0-thoros-taskmanager-6
> b9785d4df-c28n4.log
> 2022-02-15 10:01:59,826 INFO org.apache.flink.runtime.taskexecutor.
> TaskManagerRunner [] - -Dlog4j.configuration=
> file:/opt/flink/conf/log4j-console.properties
> 2022-02-15 10:01:59,827 INFO org.apache.flink.runtime.taskexecutor.
> TaskManagerRunner [] - -Dlog4j.configurationFile=
> file:/opt/flink/conf/log4j-console.properties
> 2022-02-15 10:01:59,830 INFO org.apache.flink.runtime.taskexecutor.
> TaskManagerRunner [] - -Dlogback.configurationFile=
> file:/opt/flink/conf/logback-console.xml
>
> Content of the log4j2.xml (path: src/main/resources):
>
>  packages="org.apache.logging.log4j.core,io.sentry.log4j2"> 
> pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
>  minimumEventLevel="ERROR"/>
> 
> 
>
>
> For our kubernetes deployment we have followed the reference example here
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#common-cluster-resource-definitions
> .
> My assumption is that I would need to also provide the Sentry-related
> configuration to the "log4-console.properties" for it to be picked up by
> the Taskmanager and JobManager?
>
> PROBLEM 2:
> ERROR StatusLogger Log4j2 could not find a logging implementation.
> Please add log4j-core to the classpath. Using SimpleLogger to log to the
> console
>
> I am not sure what's going on here. Following dependencies are bundled
> with the FAT jar
>
> "com.typesafe.scala-logging" %% "scala-logging" % 
> scalaLoggingVersion,"org.slf4j" % "slf4j-api" % 
> "1.7.33","org.apache.logging.log4j" % "log4j-slf4j-impl" % 
> "2.17.0","org.apache.logging.log4j" % "log4j-core" % 
> "2.17.0","org.apache.logging.log4j" %% "log4j-api-scala" % "12.0","io.sentry" 
> % "sentry-log4j2" % "5.6.0",
>
> Confused about what is going on here, possible this might not be Flink
> related matter but I am not sure..any tips on how to best debug this would
> be much appreciated.
> --
> *Thanks,*
> *Jonas*
>
>
>

-- 
*Med Vänliga Hälsningar*
*Jonas Eyob*


Log4j2 configuration

2022-02-15 Thread jonas eyob
Hey,

We are deploying our Flink Cluster on a standalone Kubernetes with the
longrunning job written in scala.

We recently upgraded our Flink cluster from 1.12 to 1.14.3 - after which we
started seeing a few problems related to logging which I have been
struggling to fix for the past days).
Related is also an attempt to add, we are also attempting to add a Sentry
integration for our error logs.

PROBLEM 1 - Error logs not being sent to Sentry.
We are bundling our code and dependencies into a FAT jar, which includes a
log4j2.xml specifying the Sentry Appender. But if I understand the
documentation
<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/advanced/logging/#configuring-log4j-2>
correctly our log4j2.xml won't be picked up by Flink as it already defines
a set of default logging configurations files (e.g. log4j and logback).

Q: How does Flink resolve logging configurations to use?

I can see the following JVM override params provided when running in our
dockerized version locally.

-Dlog.file=/opt/flink/log/flink--taskexecutor-0-thoros-taskmanager-6
b9785d4df-c28n4.log
2022-02-15 10:01:59,826 INFO org.apache.flink.runtime.taskexecutor.
TaskManagerRunner [] - -Dlog4j
.configuration=file:/opt/flink/conf/log4j-console.properties
2022-02-15 10:01:59,827 INFO org.apache.flink.runtime.taskexecutor.
TaskManagerRunner [] - -Dlog4j
.configurationFile=file:/opt/flink/conf/log4j-console.properties
2022-02-15 10:01:59,830 INFO org.apache.flink.runtime.taskexecutor.
TaskManagerRunner [] - -Dlogback
.configurationFile=file:/opt/flink/conf/logback-console.xml

Content of the log4j2.xml (path: src/main/resources):


















For our kubernetes deployment we have followed the reference example here
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#common-cluster-resource-definitions
.
My assumption is that I would need to also provide the Sentry-related
configuration to the "log4-console.properties" for it to be picked up by
the Taskmanager and JobManager?


PROBLEM 2:
ERROR StatusLogger Log4j2 could not find a logging implementation.
Please add log4j-core to the classpath. Using SimpleLogger to log to the
console

I am not sure what's going on here. Following dependencies are bundled with
the FAT jar

"com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingVersion,
"org.slf4j" % "slf4j-api" % "1.7.33",
"org.apache.logging.log4j" % "log4j-slf4j-impl" % "2.17.0",
"org.apache.logging.log4j" % "log4j-core" % "2.17.0",
"org.apache.logging.log4j" %% "log4j-api-scala" % "12.0",
"io.sentry" % "sentry-log4j2" % "5.6.0",

Confused about what is going on here, possible this might not be Flink
related matter but I am not sure..any tips on how to best debug this would
be much appreciated.
-- 
*Thanks,*
*Jonas*


Re: Cannot consum from Kinesalite using FlinkKinesisConsumer

2021-12-04 Thread jonas eyob
Hey Mika,

Were using kinesalite 1.11.5

Yeah, after bumping it to 3.3.3 the first issue which was related to not
being able to list shards disappeared and instead I'm seeing the same issue
you are mentioning

"The timestampInMillis parameter cannot be greater than the
currentTimestampInMillis"

Found a discussion on the same topic:
https://github.com/awslabs/amazon-kinesis-connector-flink/issues/13
It appears to be an issue with aws sdk and likely due to setting AWS_CBOR
to disabled (required when running locally).

Specifically, when using AT_TIMESTAMP (which LATEST is translated to
https://github.com/apache/flink/blob/9bbadb9b105b233b7565af120020ebd8dce69a4f/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java#L310-L331)
it appears the Flink job is requesting the ShardIterator with a timestamp
in ms rather than seconds.

This raises the problem mentioned in kinesalite:
https://github.com/mhart/kinesalite/blob/4019d70a135226f33f1cdec4091f4391e631d2c9/actions/getShardIterator.js#L79-L89

Perhaps using TRIM_HORIZON locally will be the workaround for now - doesn't
appear a fix is in sight


Den fre 3 dec. 2021 kl 12:47 skrev Mika Naylor :

> Hey Jonas,
>
> May I ask what version of Kinesalite you're targeting? With 3.3.3 and
> STREAM_INITIAL_POSITION = "LATEST", I received a "The timestampInMillis
> parameter cannot be greater than the currentTimestampInMillis" which may
> be a misconfiguration on my setup, but with STREAM_INITIAL_POSITION =
> "TRIM_HORIZON" I was able to consume events from the stream.
>
> This was with 1.14.0 of the Kinesis Flink connector.
>
> Kind regards,
> Mika
>
>
> On 02.12.2021 23:05, jonas eyob wrote:
> >Hi all, I have a really simple pipeline to consume events from a local
> >kinesis (kinesalite) and print them out to stdout. But struggling to make
> >sense of why it's failing almost immediately
> >
> >The pipeline code:
> >
> >/* Added this to verify it wasn't a problem with AWS CBOR which needs
> >to be disabled */
>
> >System.setProperty(com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
> >"true")
>
> >System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
> >"true")
>
> >System.setProperty("org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking",
> >"true")
> >
> >val env = StreamExecutionEnvironment.getExecutionEnvironment
> >env.setParallelism(1)
> >
> >val consumerConfig = new Properties()
> >
> >consumerConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1")
> >consumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER,
> "AUTO")
> >consumerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID,
> >"FAKE_ACCESS_KEY")
> >consumerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
> >"FAKE_SECRET_ACCESS_KEY")
>
> >consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
> >"LATEST")
> >consumerConfig.setProperty(AWSConfigConstants.AWS_ENDPOINT,
> >"http://localhost:4567;)
> >
> >env
> >  .addSource(
> >new FlinkKinesisConsumer[String](
> >  "user-profile-events-local",
> >  new SimpleStringSchema,
> >  consumerConfig
> >)
> >  )
> >  .print()
> >
> >env.execute("echo stream")
> >
> >When running this I am getting this:
> >
> >Error I get from running this locally:
> >
> >22:27:23.372 [flink-akka.actor.default-dispatcher-5] INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom
> >Source -> Sink: Print to Std. Out (1/1) (7e920c6918655278fbd09e7658847264)
> >switched from INITIALIZING to RUNNING.
> >Dec 02, 2021 10:27:23 PM
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
> >loadProfiles
> >WARNING: Your profile name includes a 'profile ' prefix. This is
> considered
> >part of the profile name in the Java SDK, so you will need to include this
> >prefix in your profile name when you reference this profile from your Java
> >code.
> >Dec 02, 2021 10:27:23 PM
>
> >org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader
> >loadProfiles
> >WARNING: Your profile name includes a 'profile ' prefix. This is
> considered
> >part of the profile name in the Java SDK, so you will need to include this
> >prefix in your profile name when you reference this profile from your Java
> &

Cannot consum from Kinesalite using FlinkKinesisConsumer

2021-12-02 Thread jonas eyob
~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1035)
~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:1021)
~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:928)
~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at
org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:940)
~[flink-runtime-1.14.0.jar:1.14.0]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
~[flink-runtime-1.14.0.jar:1.14.0]
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:940)
~[flink-runtime-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
~[flink-runtime-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
~[flink-runtime-1.14.0.jar:1.14.0]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]

-- 
*Med Vänliga Hälsningar*
*Jonas Eyob*


Re: Checkpoints aborted - Job is not in state RUNNING but FINISHED

2021-11-26 Thread jonas eyob
Hi Yun, thanks for the quick reply!

Great to hear that a fix has been put in place as of Flink 1.14.

Since we are currently using Beam on top of Flink, we are currently limited
to the Flink 1.13 runner, so I would expect the Fix not to be available to
us yet.

But to clarify the underlying problem for me: is this caused by having
tasks parallelism > 1, but only of them is RUNNING (other in FINISHED
state)?
Would there be a problem if say, we have two tasks to consume events from a
kinesis source but the stream has only 1 shard?

Den fre 26 nov. 2021 kl 03:14 skrev Yun Gao :

> Hi Jonas,
>
> Previously Flink indeed does not support checkpoints after some tasks
> finished.
> In 1.14 we implement a first version for this feature (namely
> https://issues.apache.org/jira/browse/FLINK-2491),
> and it could be enabled by set
>
> execution.checkpointing.checkpoints-after-tasks-finish.enabled: true
>
> We will also try to enable the flag by default in 1.15.
>
>
> Best,
>
> Yun
>
>
>
>
> --
> Sender:jonas eyob
> Date:2021/11/26 01:53:17
> Recipient:user
> Theme:Checkpoints aborted - Job is not in state RUNNING but FINISHED
>
> Hi all,
>
> I have been struggling with this issue for a couple of days now.
> Checkpointing appears to fail as the Task Source ( kinesis stream in this
> case) appears to be in a FINISHED state.
>
> Excerpt from Jobmanager logs:
>
> 2021-11-25 12:52:00,479 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
> Source Events/Read(KinesisSource) -> Flat Map -> Source
> Events/MapElements/Map/ParMultiDo(Anonymous) -> Parse
> Events/ParMultiDo(ParseEvent) -> Key Events/ParMultiDo(KeyEvent) ->
> Window/Window.Assign.out -> ToBinaryKeyedWorkItem (1/2)
> (eb31cbc4e319588ba79a26d26abcd2f3) switched from DEPLOYING to RUNNING.
> 2021-11-25 12:52:00,494 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
> Source Events/Read(KinesisSource) -> Flat Map -> Source
> Events/MapElements/Map/ParMultiDo(Anonymous) -> Parse
> Events/ParMultiDo(ParseEvent) -> Key Events/ParMultiDo(KeyEvent) ->
> Window/Window.Assign.out -> ToBinaryKeyedWorkItem (2/2)
> (1eae72b5680529fbd3b4becadb803910) switched from DEPLOYING to RUNNING.
> 2021-11-25 12:52:00,569 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
> GroupByKey -> ToGBKResult -> Fetch User Profile/ParMultiDo(GetStoredState)
> -> Reduce state/ParMultiDo(ReduceState) -> Store
> state/ParMultiDo(StoreState) (1/2) (1a77c7ed026ac4e4a59ab66876053102)
> switched from DEPLOYING to RUNNING.
> 2021-11-25 12:52:00,582 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
> GroupByKey -> ToGBKResult -> Fetch User Profile/ParMultiDo(GetStoredState)
> -> Reduce state/ParMultiDo(ReduceState) -> Store
> state/ParMultiDo(StoreState) (2/2) (31588d4dad22821d7226ec65687d0edb)
> switched from DEPLOYING to RUNNING.
> 2021-11-25 12:52:00,881 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
> Source Events/Read(KinesisSource) -> Flat Map -> Source
> Events/MapElements/Map/ParMultiDo(Anonymous) -> Parse
> Events/ParMultiDo(ParseEvent) -> Key Events/ParMultiDo(KeyEvent) ->
> Window/Window.Assign.out -> ToBinaryKeyedWorkItem (2/2)
> (1eae72b5680529fbd3b4becadb803910) switched from RUNNING to FINISHED.
> 2021-11-25 12:52:06,528 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Checkpoint triggering task Source: Source Events/Read(KinesisSource) ->
> Flat Map -> Source Events/MapElements/Map/ParMultiDo(Anonymous) -> Parse
> Events/ParMultiDo(ParseEvent) -> Key Events/ParMultiDo(KeyEvent) ->
> Window/Window.Assign.out -> ToBinaryKeyedWorkItem (2/2) of job
>  is not in state RUNNING but FINISHED
> instead. Aborting checkpoint.
>
> For context, here is an excerpt from the flink-conf.yaml file:
>
> flink-conf.yaml: |+
> # TaskManager configurations
> taskmanager.numberOfTaskSlots: 2
> taskmanager.rpc.port: 6122
> taskmanager.memory.process.size: 1728m
> # JobManager configurations
> jobmanager.rpc.address: {{ $fullName }}-jobmanager
> jobmanager.rpc.port: 6123
> jobmanager.memory.process.size: 1600m
>
> blob.server.port: 6124
> queryable-state.proxy.ports: 6125
> parallelism.default: 1 # default paralleism when not defined elsewhere
> kubernetes.namespace: {{ $fullName }} # The namespace that will be used
> for running the jobmanager and taskmanager pods.
> scheduler-mode: reactive
>
> # High-availability configurations
> high-availability:
> org.apach

Checkpoints aborted - Job is not in state RUNNING but FINISHED

2021-11-25 Thread jonas eyob
Hi all,

I have been struggling with this issue for a couple of days now.
Checkpointing appears to fail as the Task Source ( kinesis stream in this
case) appears to be in a FINISHED state.

Excerpt from Jobmanager logs:

2021-11-25 12:52:00,479 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
Source Events/Read(KinesisSource) -> Flat Map -> Source
Events/MapElements/Map/ParMultiDo(Anonymous) -> Parse
Events/ParMultiDo(ParseEvent) -> Key Events/ParMultiDo(KeyEvent) ->
Window/Window.Assign.out -> ToBinaryKeyedWorkItem (1/2)
(eb31cbc4e319588ba79a26d26abcd2f3) switched from DEPLOYING to RUNNING.
2021-11-25 12:52:00,494 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
Source Events/Read(KinesisSource) -> Flat Map -> Source
Events/MapElements/Map/ParMultiDo(Anonymous) -> Parse
Events/ParMultiDo(ParseEvent) -> Key Events/ParMultiDo(KeyEvent) ->
Window/Window.Assign.out -> ToBinaryKeyedWorkItem (2/2)
(1eae72b5680529fbd3b4becadb803910) switched from DEPLOYING to RUNNING.
2021-11-25 12:52:00,569 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
GroupByKey -> ToGBKResult -> Fetch User Profile/ParMultiDo(GetStoredState)
-> Reduce state/ParMultiDo(ReduceState) -> Store
state/ParMultiDo(StoreState) (1/2) (1a77c7ed026ac4e4a59ab66876053102)
switched from DEPLOYING to RUNNING.
2021-11-25 12:52:00,582 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
GroupByKey -> ToGBKResult -> Fetch User Profile/ParMultiDo(GetStoredState)
-> Reduce state/ParMultiDo(ReduceState) -> Store
state/ParMultiDo(StoreState) (2/2) (31588d4dad22821d7226ec65687d0edb)
switched from DEPLOYING to RUNNING.
2021-11-25 12:52:00,881 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
Source Events/Read(KinesisSource) -> Flat Map -> Source
Events/MapElements/Map/ParMultiDo(Anonymous) -> Parse
Events/ParMultiDo(ParseEvent) -> Key Events/ParMultiDo(KeyEvent) ->
Window/Window.Assign.out -> ToBinaryKeyedWorkItem (2/2)
(1eae72b5680529fbd3b4becadb803910) switched from RUNNING to FINISHED.
2021-11-25 12:52:06,528 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
Checkpoint triggering task Source: Source Events/Read(KinesisSource) ->
Flat Map -> Source Events/MapElements/Map/ParMultiDo(Anonymous) -> Parse
Events/ParMultiDo(ParseEvent) -> Key Events/ParMultiDo(KeyEvent) ->
Window/Window.Assign.out -> ToBinaryKeyedWorkItem (2/2) of job
 is not in state RUNNING but FINISHED
instead. Aborting checkpoint.

For context, here is an excerpt from the flink-conf.yaml file:

flink-conf.yaml: |+
# TaskManager configurations
taskmanager.numberOfTaskSlots: 2
taskmanager.rpc.port: 6122
taskmanager.memory.process.size: 1728m
# JobManager configurations
jobmanager.rpc.address: {{ $fullName }}-jobmanager
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 1600m

blob.server.port: 6124
queryable-state.proxy.ports: 6125
parallelism.default: 1 # default paralleism when not defined elsewhere
kubernetes.namespace: {{ $fullName }} # The namespace that will be used for
running the jobmanager and taskmanager pods.
scheduler-mode: reactive

# High-availability configurations
high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: s3://company-flink-{{ .Values.environment
}}/recovery
hive.s3.use-instance-credentials: true
kubernetes.cluster-id: {{ $fullName }}

# Checkpoint and State backend
state.backend: rocksdb
state.checkpoint-storage: filesystem # jobmanager or filesystem
state.backend.incremental: true # only supported by rocksdb
state.checkpoints.dir: s3://company-flink-{{ .Values.environment
}}/checkpoints
execution.checkpointing.interval: 20 min
execution.checkpointing.min-pause: 10 min # minimum time between
checkpoints to reduce overhead
state.checkpoints.num-retained: 1 # Maximum number of completed checkpoints
to retain

# Fault tolerance
restart-strategy: fixed-delay
restart-strategy.fixed-delay.delay: 10 s
restart-strategy.fixed-delay.attempts: 3 # try n times before job is
considered failed

>From what I can see the job is still running, and the checkpointing keeps
failing.
After finding this (https://issues.apache.org/jira/browse/FLINK-2491) I
updated the default parallelism from 2 -> 1 since our current kinesis steam
consists of 1 shard. But problem persists.

Any ideas?

Jonas


High availability - leader election not working?

2021-09-01 Thread jonas eyob
31 14:59:18,486 WARN  akka.remote.ReliableDeliverySupervisor
[] - Association with remote system [akka.tcp://
flink@100.117.0.4:6122] has failed, address is now gated for [50] ms.
Reason: [Disassociated]
2021-08-31 14:59:21,411 WARN  akka.remote.transport.netty.NettyTransport
[] - Remote connection to [null] failed with
java.net.ConnectException: Connection refused: /100.117.0.4:6122
2021-08-31 14:59:21,450 WARN  akka.remote.ReliableDeliverySupervisor
[] - Association with remote system [akka.tcp://
flink@100.117.0.4:6122] has failed, address is now gated for [50] ms.
Reason: [Association failed with [akka.tcp://flink@100.117.0.4:6122]]
Caused by: [java.net.ConnectException: Connection refused: /100.117.0.4:6122
]
2021-08-31 14:59:31,447 WARN  akka.remote.transport.netty.NettyTransport
[] - Remote connection to [null] failed with
java.net.ConnectException: Connection refused: /100.117.0.4:6122
2021-08-31 14:59:31,453 WARN  akka.remote.ReliableDeliverySupervisor
[] - Association with remote system [akka.tcp://
flink@100.117.0.4:6122] has failed, address is now gated for [50] ms.
Reason: [Association failed with [akka.tcp://flink@100.117.0.4:6122]]
Caused by: [java.net.ConnectException: Connection refused: /100.117.0.4:6122
]
2021-08-31 14:59:34,914 INFO
 org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Registering TaskManager with ResourceID 100.117.0.4:6122-9f2331 (akka.tcp://
flink@100.117.0.4:6122/user/rpc/taskmanager_0) at ResourceManager
2021-08-31 15:00:01,405 INFO
 org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
The heartbeat of TaskManager with id 100.117.0.4:6122-69262a timed out.
2021-08-31 15:00:01,406 INFO
 org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Closing TaskExecutor connection 100.117.0.4:6122-69262a because: The
heartbeat of TaskManager with id 100.117.0.4:6122-69262a  timed out.
2021-08-31 15:00:02,793 INFO
 org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Request slot with profile ResourceProfile{UNKNOWN} for job
 with allocation id
ec3e0def8314dea150bb281818828b55.
2021-08-31 15:00:02,796 INFO
 org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Request slot with profile ResourceProfile{UNKNOWN} for job
 with allocation id
2b8d9ba4a7c7895dbb0dba4f16006441.

Excerpt of LOGS after (what I think is killing the leader) a new JobManager
("JobManager 2") pod is created to replace the killed one

2021-08-31 15:00:02,778 INFO
 org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] -
Recovering checkpoints from
KubernetesStateHandleStore{configMapName='thoros--jobmanager-leader'}.
2021-08-31 15:00:02,784 INFO
 org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] -
Found 0 checkpoints in
KubernetesStateHandleStore{configMapName='thoros--jobmanager-leader'}.
2021-08-31 15:00:02,784 INFO
 org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] -
All 0 checkpoints found are already downloaded.
2021-08-31 15:00:02,784 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - No
checkpoint found during restore.


-- 
*Med Vänliga Hälsningar*
*Jonas Eyob*


Re: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden)

2021-08-26 Thread jonas eyob
@Svend - that seems to have done the trick, adding the bucket itself as a
resource got flink to write to the configured s3 bucket.

@Gil - we manage our kubernetes cluster on aws with kops. But we do assign
the iam roles through the deployment annotations. Seems presto is able to
use the s3:// schema in our case

Thanks both!

Den tors 26 aug. 2021 kl 17:59 skrev Gil De Grove :

> Hi Jonas,
>
>
>
> Just wondering, are you trying to deploy via iam service account
> annotations in a AWS eks cluster?
>
> We noticed that when using presto, the iam service account was using en
> ec2 metadata API inside AWS. However, when using eks service account, the
> API used is the webtoken auth.
>
> Not sure if the solution we find is the appropriate one, but switching to
> s3a instead of presto, and forcing the aws defaultProviderChain did the
> trick.
>
> Maybe you could try that.
>
> Regards,
> Gil
>
> On Thu, Aug 26, 2021, 18:45 Svend  wrote:
>
>> Hi Jonas,
>>
>> Just a thought, could you try this policy? If I recall correctly, I think
>> you need ListBucket on the bucket itself, whereas the other can have a path
>> prefix like the "/*" you added
>>
>> "
>> {
>> "Version": "2012-10-17",
>> "Statement": [
>> {
>> "Action": [
>> "s3:ListBucket",
>> "s3:Get*",
>> "s3:Put*",
>> "s3:Delete*"
>> ],
>> "Resource": [
>> "arn:aws:s3:::-flink-dev",
>> "arn:aws:s3:::-flink-dev/*"
>> ],
>> "Effect": "Allow"
>> }
>> ]
>> }
>> "
>>
>> Svend
>>
>>
>> On Thu, 26 Aug 2021, at 6:19 PM, jonas eyob wrote:
>>
>> Hey Matthias,
>>
>> Yes, I have followed the documentation on the link you provided - and
>> decided to go for the recommended approach of using IAM roles.
>> The hive.s3.use-instance-credentials configuration parameter I got from
>> [1] (first bullet) since I am using the flink-s3-fs-presto plugin - which
>> says:
>>
>> ..flink-s3-fs-presto, registered under the scheme *s3://* and *s3p://*,
>> is based on code from the Presto project <https://prestodb.io/>. You can
>> configure it using the same configuration keys as the Presto file system
>> <https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration>,
>> by adding the configurations to your flink-conf.yaml. The Presto S3
>> implementation is the recommended file system for checkpointing to S3
>>
>> Its possible I am misunderstanding it?
>>
>> Best,
>> Jonas
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
>>
>> Den tors 26 aug. 2021 kl 16:32 skrev Matthias Pohl <
>> matth...@ververica.com>:
>>
>> Hi Jonas,
>> have you included the s3 credentials in the Flink config file like it's
>> described in [1]? I'm not sure about this hive.s3.use-instance-credentials
>> being a valid configuration parameter.
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
>>
>> On Thu, Aug 26, 2021 at 3:43 PM jonas eyob  wrote:
>>
>> Hey,
>>
>> I am setting up HA on a standalone Kubernetes Flink application job
>> cluster.
>> Flink (1.12.5) is used and I am using S3 as the storage backend
>>
>> * The JobManager shortly fails after starts with the following errors
>> (apologies in advance for the length), and I can't understand what's going
>> on.
>> * First I thought it may be due to missing Delete privileges of the IAM
>> role and updated that, but the problem persists.
>> * The S3 bucket configured s3:///recovery is empty.
>>
>> configmap.yaml
>> flink-conf.yaml: |+
>> jobmanager.rpc.address: {{ $fullName }}-jobmanager
>> jobmanager.rpc.port: 6123
>> jobmanager.memory.process.size: 1600m
>> taskmanager.numberOfTaskSlots: 2
>> taskmanager.rpc.port: 6122
>> taskmanager.memory.process.size: 1728m
>> blob.server.port: 6124
>> queryable-state.proxy.ports: 6125
>> parallelism.default: 2
>> scheduler-mode: reactive
>> execution.checkpointing.interval: 10s
>> restart-strategy: fixed-delay
>> restart-strate

Re: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden)

2021-08-26 Thread jonas eyob
Hey Matthias,

Yes, I have followed the documentation on the link you provided - and
decided to go for the recommended approach of using IAM roles.
The hive.s3.use-instance-credentials configuration parameter I got from [1]
(first bullet) since I am using the flink-s3-fs-presto plugin - which says:

..flink-s3-fs-presto, registered under the scheme *s3://* and *s3p://*, is
based on code from the Presto project <https://prestodb.io/>. You can
configure it using the same configuration keys as the Presto file system
<https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration>,
by adding the configurations to your flink-conf.yaml. The Presto S3
implementation is the recommended file system for checkpointing to S3

Its possible I am misunderstanding it?

Best,
Jonas

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins

Den tors 26 aug. 2021 kl 16:32 skrev Matthias Pohl :

> Hi Jonas,
> have you included the s3 credentials in the Flink config file like it's
> described in [1]? I'm not sure about this hive.s3.use-instance-credentials
> being a valid configuration parameter.
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
>
> On Thu, Aug 26, 2021 at 3:43 PM jonas eyob  wrote:
>
>> Hey,
>>
>> I am setting up HA on a standalone Kubernetes Flink application job
>> cluster.
>> Flink (1.12.5) is used and I am using S3 as the storage backend
>>
>> * The JobManager shortly fails after starts with the following errors
>> (apologies in advance for the length), and I can't understand what's going
>> on.
>> * First I thought it may be due to missing Delete privileges of the IAM
>> role and updated that, but the problem persists.
>> * The S3 bucket configured s3:///recovery is empty.
>>
>> configmap.yaml
>> flink-conf.yaml: |+
>> jobmanager.rpc.address: {{ $fullName }}-jobmanager
>> jobmanager.rpc.port: 6123
>> jobmanager.memory.process.size: 1600m
>> taskmanager.numberOfTaskSlots: 2
>> taskmanager.rpc.port: 6122
>> taskmanager.memory.process.size: 1728m
>> blob.server.port: 6124
>> queryable-state.proxy.ports: 6125
>> parallelism.default: 2
>> scheduler-mode: reactive
>> execution.checkpointing.interval: 10s
>> restart-strategy: fixed-delay
>> restart-strategy.fixed-delay.attempts: 10
>> high-availability:
>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>> kubernetes.cluster-id: {{ $fullName }}
>> high-availability.storageDir: s3://-flink-{{ .Values.environment
>> }}/recovery
>> hive.s3.use-instance-credentials: true
>> kubernetes.namespace: {{ $fullName }} # The namespace that will be used
>> for running the jobmanager and taskmanager pods
>>
>> role.yaml
>> kind: Role
>> apiVersion: rbac.authorization.k8s.io/v1
>> metadata:
>> name: {{ $fullName }}
>> namespace: {{ $fullName }}
>> labels:
>> app: {{ $appName }}
>> chart: {{ template "thoros.chart" . }}
>> release: {{ .Release.Name }}
>> heritage: {{ .Release.Service }}
>>
>> rules:
>> - apiGroups: [""]
>> resources: ["configmaps"]
>> verbs: ["create", "edit", "delete", "watch", "get", "list", "update"]
>>
>> aws IAM policy
>> {
>> "Version": "2012-10-17",
>> "Statement": [
>> {
>> "Action": [
>> "s3:ListBucket",
>> "s3:Get*",
>> "s3:Put*",
>> "s3:Delete*"
>> ],
>> "Resource": [
>> "arn:aws:s3:::-flink-dev/*"
>> ],
>> "Effect": "Allow"
>> }
>> ]
>> }
>>
>> *Error-log:*
>> 2021-08-26 13:08:43,439 INFO  org.apache.beam.runners.flink.FlinkRunner
>>  [] - Executing pipeline using FlinkRunner.
>> 2021-08-26 13:08:43,444 WARN  org.apache.beam.runners.flink.FlinkRunner
>>  [] - For maximum performance you should set the
>> 'fasterCopy' option. See more at
>> https://issues.apache.org/jira/browse/BEAM-11146
>> 2021-08-26 13:08:43,451 INFO  org.apache.beam.runners.flink.FlinkRunner
>>  [] - Translating pipeline to Flink program.
>> 2021-08-26 13:08:43,456 INFO
>>  org.apache.beam.runners.fl

Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden)

2021-08-26 Thread jonas eyob
e(AbstractActor.scala:225)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
Caused by: java.lang.Exception: Could not open output stream for state
backend
at
org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:72)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
... 27 more
Caused by:
com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException:
com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service:
Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID:
V0BWCA4RDVE0EVK8; S3 Extended Request ID:
yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=;
Proxy: null), S3 Extended Request ID:
yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=
(Path:
s3://-flink-dev/recovery/default/submittedJobGraphe95ce29174c6)
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573)
~[?:?]
at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
~[?:?]
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
~[?:?]
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734) ~[?:?]
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.create(PrestoS3FileSystem.java:356)
~[?:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) ~[?:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) ~[?:?]
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) ~[?:?]
at
org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:154)
~[?:?]
at
org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
~[?:?]
at
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:170)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:64)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
~[flink-dist_2.12-1.12.5.jar:1.12.5]
... 27 more
-- 
*Med Vänliga Hälsningar*
*Jonas Eyob*


Re: NullPointerException when using KubernetesHaServicesFactory

2021-08-26 Thread jonas eyob
So when spinning it up on minikube, and then ssh into one of the JobManager
pods shows following for the commands you mentioned:

flink@local-thoros-jobmanager-6l9lz:~$ id
uid=(flink) gid=(flink) groups=(flink)

flink@local-thoros-jobmanager-6l9lz:~$  ls -la
$FLINK_HOME/plugins/s3-fs-presto/
ls: cannot access '/opt/flink/plugins/s3-fs-presto/': No such file or
directory

It appears, this had to do with minikube using cached version of the docker
image, so the new changes (i.e. adding the plugin) never was reflected.
After pulling down the latest the error stopped :)

Best,
Jonas



Den ons 25 aug. 2021 kl 21:17 skrev Thms Hmm :

> Can you check what is the output of those commands
>
> $ id
> $ ls -la $FLINK_HOME/plugins/s3-fs-presto/
>
>
> jonas eyob  schrieb am Mi. 25. Aug. 2021 um 16:17:
>
>> The exception is showing up both in TM and JM
>>
>> This however seemed only to appear when running on my local kubernetes
>> setup.
>> > I'd also recommend setting "kubernetes.namespace" option, unless you're
>> using "default" namespace.
>>
>> Yes, good point - I now see why that was needed.
>>
>>
>> Den ons 25 aug. 2021 kl 11:37 skrev David Morávek :
>>
>>> Hi Jonas,
>>>
>>> Where does the exception pop-up? In job driver, TM, JM? You need to make
>>> sure that the plugin folder is setup for all of them, because they all may
>>> need to access s3 at some point.
>>>
>>> Best,
>>> D.
>>>
>>> On Wed, Aug 25, 2021 at 11:54 AM jonas eyob 
>>> wrote:
>>>
>>>> Hey Thms,
>>>>
>>>> tried the s3p:// option as well - same issue.
>>>>
>>>> > Also check if your user that executes the process is able to read the
>>>> jars.
>>>> Not exactly sure how to do that? The user "flink" in the docker image
>>>> is able to read the contents as far I understand. But maybe that's not how
>>>> I would check it?
>>>>
>>>> Den ons 25 aug. 2021 kl 10:12 skrev Thms Hmm :
>>>>
>>>>> Hey Jonas,
>>>>> you could also try to use the ´s3p://´ scheme to directly specify that
>>>>> presto should be used. Also check if your user that executes the process 
>>>>> is
>>>>> able to read the jars.
>>>>>
>>>>> Am Mi., 25. Aug. 2021 um 10:01 Uhr schrieb jonas eyob <
>>>>> jonas.e...@gmail.com>:
>>>>>
>>>>>> Thanks David for the quick response!
>>>>>>
>>>>>> *face palm* - Thanks a lot, that seems to have addressed the
>>>>>> NullPointerException issue.
>>>>>> May I also suggest that this [1] page be updated, since it says the
>>>>>> key is "high-availability.cluster-id"
>>>>>>
>>>>>> This led me to another issue however: 
>>>>>> "org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
>>>>>> Could not find a file system implementation for scheme 's3'"
>>>>>>
>>>>>> The section [2] describe how I can either use environment variables
>>>>>> e.g. ENABLE_BUILT_IN_PLUGINS or bake that in to the image by copying the
>>>>>> provided plugins in opt/ under /plugins
>>>>>>
>>>>>> Dockerfile (snippet)
>>>>>> # Configure flink provided plugin for S3 access
>>>>>> RUN mkdir -p $FLINK_HOME/plugins/s3-fs-presto
>>>>>> RUN cp $FLINK_HOME/opt/flink-s3-fs-presto-*.jar
>>>>>> $FLINK_HOME/plugins/s3-fs-presto/
>>>>>>
>>>>>> When bashing into the image:
>>>>>>
>>>>>> flink@dd86717a92a0:~/plugins/s3-fs-presto$ ls
>>>>>> flink-s3-fs-presto-1.12.5.jar
>>>>>>
>>>>>> Any idea?
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#high-availability
>>>>>> [2]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
>>>>>>
>>>>>>
>>>>>> Den ons 25 aug. 2021 kl 08:00 skrev David Morávek :
>>>>>>
>>>>>>> Hi Jonas,
>>>>>>>
>>>>>>> this exception is raised because "kubernetes.cluster-id" [1] is not

Re: NullPointerException when using KubernetesHaServicesFactory

2021-08-25 Thread jonas eyob
The exception is showing up both in TM and JM

This however seemed only to appear when running on my local kubernetes
setup.
> I'd also recommend setting "kubernetes.namespace" option, unless you're
using "default" namespace.

Yes, good point - I now see why that was needed.


Den ons 25 aug. 2021 kl 11:37 skrev David Morávek :

> Hi Jonas,
>
> Where does the exception pop-up? In job driver, TM, JM? You need to make
> sure that the plugin folder is setup for all of them, because they all may
> need to access s3 at some point.
>
> Best,
> D.
>
> On Wed, Aug 25, 2021 at 11:54 AM jonas eyob  wrote:
>
>> Hey Thms,
>>
>> tried the s3p:// option as well - same issue.
>>
>> > Also check if your user that executes the process is able to read the
>> jars.
>> Not exactly sure how to do that? The user "flink" in the docker image is
>> able to read the contents as far I understand. But maybe that's not how I
>> would check it?
>>
>> Den ons 25 aug. 2021 kl 10:12 skrev Thms Hmm :
>>
>>> Hey Jonas,
>>> you could also try to use the ´s3p://´ scheme to directly specify that
>>> presto should be used. Also check if your user that executes the process is
>>> able to read the jars.
>>>
>>> Am Mi., 25. Aug. 2021 um 10:01 Uhr schrieb jonas eyob <
>>> jonas.e...@gmail.com>:
>>>
>>>> Thanks David for the quick response!
>>>>
>>>> *face palm* - Thanks a lot, that seems to have addressed the
>>>> NullPointerException issue.
>>>> May I also suggest that this [1] page be updated, since it says the key
>>>> is "high-availability.cluster-id"
>>>>
>>>> This led me to another issue however: 
>>>> "org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
>>>> Could not find a file system implementation for scheme 's3'"
>>>>
>>>> The section [2] describe how I can either use environment variables
>>>> e.g. ENABLE_BUILT_IN_PLUGINS or bake that in to the image by copying the
>>>> provided plugins in opt/ under /plugins
>>>>
>>>> Dockerfile (snippet)
>>>> # Configure flink provided plugin for S3 access
>>>> RUN mkdir -p $FLINK_HOME/plugins/s3-fs-presto
>>>> RUN cp $FLINK_HOME/opt/flink-s3-fs-presto-*.jar
>>>> $FLINK_HOME/plugins/s3-fs-presto/
>>>>
>>>> When bashing into the image:
>>>>
>>>> flink@dd86717a92a0:~/plugins/s3-fs-presto$ ls
>>>> flink-s3-fs-presto-1.12.5.jar
>>>>
>>>> Any idea?
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#high-availability
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
>>>>
>>>>
>>>> Den ons 25 aug. 2021 kl 08:00 skrev David Morávek :
>>>>
>>>>> Hi Jonas,
>>>>>
>>>>> this exception is raised because "kubernetes.cluster-id" [1] is not
>>>>> set. I'd also recommend setting "kubernetes.namespace" option, unless
>>>>> you're using "default" namespace.
>>>>>
>>>>> I've filled FLINK-23961 [2] so we provide more descriptive warning for
>>>>> this issue next time ;)
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/#example-configuration
>>>>> [2] https://issues.apache.org/jira/browse/FLINK-23961
>>>>>
>>>>> Best,
>>>>> D.
>>>>>
>>>>> On Wed, Aug 25, 2021 at 8:48 AM jonas eyob 
>>>>> wrote:
>>>>>
>>>>>> Hey, I've been struggling with this problem now for some days -
>>>>>> driving me crazy.
>>>>>>
>>>>>> I have a standalone kubernetes Flink (1.12.5) using an application
>>>>>> cluster mode approach.
>>>>>>
>>>>>> *The problem*
>>>>>> I am getting a NullPointerException when specifying the FQN of the
>>>>>> Kubernetes HA Service Factory class
>>>>>> i.e.
>>>>>> *org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory*
>>>>>>
>>>>>> What other configurations besides the ones specified (h

Re: NullPointerException when using KubernetesHaServicesFactory

2021-08-25 Thread jonas eyob
Hey Thms,

tried the s3p:// option as well - same issue.

> Also check if your user that executes the process is able to read the
jars.
Not exactly sure how to do that? The user "flink" in the docker image is
able to read the contents as far I understand. But maybe that's not how I
would check it?

Den ons 25 aug. 2021 kl 10:12 skrev Thms Hmm :

> Hey Jonas,
> you could also try to use the ´s3p://´ scheme to directly specify that
> presto should be used. Also check if your user that executes the process is
> able to read the jars.
>
> Am Mi., 25. Aug. 2021 um 10:01 Uhr schrieb jonas eyob <
> jonas.e...@gmail.com>:
>
>> Thanks David for the quick response!
>>
>> *face palm* - Thanks a lot, that seems to have addressed the
>> NullPointerException issue.
>> May I also suggest that this [1] page be updated, since it says the key
>> is "high-availability.cluster-id"
>>
>> This led me to another issue however: 
>> "org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
>> Could not find a file system implementation for scheme 's3'"
>>
>> The section [2] describe how I can either use environment variables e.g.
>> ENABLE_BUILT_IN_PLUGINS or bake that in to the image by copying the
>> provided plugins in opt/ under /plugins
>>
>> Dockerfile (snippet)
>> # Configure flink provided plugin for S3 access
>> RUN mkdir -p $FLINK_HOME/plugins/s3-fs-presto
>> RUN cp $FLINK_HOME/opt/flink-s3-fs-presto-*.jar
>> $FLINK_HOME/plugins/s3-fs-presto/
>>
>> When bashing into the image:
>>
>> flink@dd86717a92a0:~/plugins/s3-fs-presto$ ls
>> flink-s3-fs-presto-1.12.5.jar
>>
>> Any idea?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#high-availability
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
>>
>>
>> Den ons 25 aug. 2021 kl 08:00 skrev David Morávek :
>>
>>> Hi Jonas,
>>>
>>> this exception is raised because "kubernetes.cluster-id" [1] is not set.
>>> I'd also recommend setting "kubernetes.namespace" option, unless you're
>>> using "default" namespace.
>>>
>>> I've filled FLINK-23961 [2] so we provide more descriptive warning for
>>> this issue next time ;)
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/#example-configuration
>>> [2] https://issues.apache.org/jira/browse/FLINK-23961
>>>
>>> Best,
>>> D.
>>>
>>> On Wed, Aug 25, 2021 at 8:48 AM jonas eyob  wrote:
>>>
>>>> Hey, I've been struggling with this problem now for some days - driving
>>>> me crazy.
>>>>
>>>> I have a standalone kubernetes Flink (1.12.5) using an application
>>>> cluster mode approach.
>>>>
>>>> *The problem*
>>>> I am getting a NullPointerException when specifying the FQN of the
>>>> Kubernetes HA Service Factory class
>>>> i.e.
>>>> *org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory*
>>>>
>>>> What other configurations besides the ones specified (here
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#common-cluster-resource-definitions>)
>>>> may I be missing?
>>>>
>>>> Details:
>>>> * we are using a custom image using the flink: 1.12
>>>> <https://hub.docker.com/layers/flink/library/flink/1.12/images/sha256-4b4290888e30d27a28517bac3b1678674cd4b17aa7b8329969d1d12fcdf68f02?context=explore>
>>>> as base image
>>>>
>>>> flink-conf.yaml -- thought this may be useful?
>>>> flink-conf.yaml: |+
>>>> jobmanager.rpc.address: {{ $fullName }}-jobmanager
>>>> jobmanager.rpc.port: 6123
>>>> jobmanager.memory.process.size: 1600m
>>>> taskmanager.numberOfTaskSlots: 2
>>>> taskmanager.rpc.port: 6122
>>>> taskmanager.memory.process.size: 1728m
>>>> blob.server.port: 6124
>>>> queryable-state.proxy.ports: 6125
>>>> parallelism.default: 2
>>>> scheduler-mode: reactive
>>>> execution.checkpointing.interval: 10s
>>>> restart-strategy: fixed-delay
>>>> restart-strategy.fixed-delay.attempts: 10
>>>> high-availability:
>>>> org.apache.flink.kube

Re: NullPointerException when using KubernetesHaServicesFactory

2021-08-25 Thread jonas eyob
Thanks David for the quick response!

*face palm* - Thanks a lot, that seems to have addressed the
NullPointerException issue.
May I also suggest that this [1] page be updated, since it says the key is "
high-availability.cluster-id"

This led me to another issue however:
"org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Could not find a file system implementation for scheme 's3'"

The section [2] describe how I can either use environment variables e.g.
ENABLE_BUILT_IN_PLUGINS or bake that in to the image by copying the
provided plugins in opt/ under /plugins

Dockerfile (snippet)
# Configure flink provided plugin for S3 access
RUN mkdir -p $FLINK_HOME/plugins/s3-fs-presto
RUN cp $FLINK_HOME/opt/flink-s3-fs-presto-*.jar
$FLINK_HOME/plugins/s3-fs-presto/

When bashing into the image:

flink@dd86717a92a0:~/plugins/s3-fs-presto$ ls
flink-s3-fs-presto-1.12.5.jar

Any idea?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#high-availability
[2]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins


Den ons 25 aug. 2021 kl 08:00 skrev David Morávek :

> Hi Jonas,
>
> this exception is raised because "kubernetes.cluster-id" [1] is not set.
> I'd also recommend setting "kubernetes.namespace" option, unless you're
> using "default" namespace.
>
> I've filled FLINK-23961 [2] so we provide more descriptive warning for
> this issue next time ;)
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/#example-configuration
> [2] https://issues.apache.org/jira/browse/FLINK-23961
>
> Best,
> D.
>
> On Wed, Aug 25, 2021 at 8:48 AM jonas eyob  wrote:
>
>> Hey, I've been struggling with this problem now for some days - driving
>> me crazy.
>>
>> I have a standalone kubernetes Flink (1.12.5) using an application
>> cluster mode approach.
>>
>> *The problem*
>> I am getting a NullPointerException when specifying the FQN of the
>> Kubernetes HA Service Factory class
>> i.e.
>> *org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory*
>>
>> What other configurations besides the ones specified (here
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#common-cluster-resource-definitions>)
>> may I be missing?
>>
>> Details:
>> * we are using a custom image using the flink: 1.12
>> <https://hub.docker.com/layers/flink/library/flink/1.12/images/sha256-4b4290888e30d27a28517bac3b1678674cd4b17aa7b8329969d1d12fcdf68f02?context=explore>
>> as base image
>>
>> flink-conf.yaml -- thought this may be useful?
>> flink-conf.yaml: |+
>> jobmanager.rpc.address: {{ $fullName }}-jobmanager
>> jobmanager.rpc.port: 6123
>> jobmanager.memory.process.size: 1600m
>> taskmanager.numberOfTaskSlots: 2
>> taskmanager.rpc.port: 6122
>> taskmanager.memory.process.size: 1728m
>> blob.server.port: 6124
>> queryable-state.proxy.ports: 6125
>> parallelism.default: 2
>> scheduler-mode: reactive
>> execution.checkpointing.interval: 10s
>> restart-strategy: fixed-delay
>> restart-strategy.fixed-delay.attempts: 10
>> high-availability:
>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>> high-availability.cluster-id: /{{ $fullName }}
>> high-availability.storageDir: s3://redacted-flink-dev/recovery
>>
>> *Snippet of Job Manager pod log*
>> 2021-08-25 06:14:20,652 INFO
>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting
>> StandaloneApplicationClusterEntryPoint down with application status FAILED.
>> Diagnostics org.apache.flink.util.FlinkException: Could not create the ha
>> services from the instantiated HighAvailabilityServicesFactory
>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.
>> at
>> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:268)
>> at
>> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:124)
>> at
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:338)
>> at
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:296)
>> at
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:224)
>> at
>> org.apache.flink.runtime.entrypoint.ClusterEnt

NullPointerException when using KubernetesHaServicesFactory

2021-08-25 Thread jonas eyob
Hey, I've been struggling with this problem now for some days - driving me
crazy.

I have a standalone kubernetes Flink (1.12.5) using an application cluster
mode approach.

*The problem*
I am getting a NullPointerException when specifying the FQN of the
Kubernetes HA Service Factory class
i.e.
*org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory*

What other configurations besides the ones specified (here
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#common-cluster-resource-definitions>)
may I be missing?

Details:
* we are using a custom image using the flink: 1.12
<https://hub.docker.com/layers/flink/library/flink/1.12/images/sha256-4b4290888e30d27a28517bac3b1678674cd4b17aa7b8329969d1d12fcdf68f02?context=explore>
as base image

flink-conf.yaml -- thought this may be useful?
flink-conf.yaml: |+
jobmanager.rpc.address: {{ $fullName }}-jobmanager
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 1600m
taskmanager.numberOfTaskSlots: 2
taskmanager.rpc.port: 6122
taskmanager.memory.process.size: 1728m
blob.server.port: 6124
queryable-state.proxy.ports: 6125
parallelism.default: 2
scheduler-mode: reactive
execution.checkpointing.interval: 10s
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.cluster-id: /{{ $fullName }}
high-availability.storageDir: s3://redacted-flink-dev/recovery

*Snippet of Job Manager pod log*
2021-08-25 06:14:20,652 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting
StandaloneApplicationClusterEntryPoint down with application status FAILED.
Diagnostics org.apache.flink.util.FlinkException: Could not create the ha
services from the instantiated HighAvailabilityServicesFactory
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.
at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:268)
at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:124)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:338)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:296)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:224)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:178)
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:585)
at
org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.main(StandaloneApplicationClusterEntryPoint.java:85)
Caused by: java.lang.NullPointerException
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59)
at
org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.(Fabric8FlinkKubeClient.java:85)
at
org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory.fromConfiguration(FlinkKubeClientFactory.java:106)
at
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.createHAServices(KubernetesHaServicesFactory.java:37)
at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:265)
... 9 more
.

--
Many thanks,
Jonas


Re: Connect more than two streams

2017-07-25 Thread Jonas Gröger
Hello Govindarajan,

one way to merge multiple streams is to union them. You can do that with
the union operator described at [1]. Is that what you are looking for?


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html

-- Jonas

Am Mo, 24. Jul 2017, um 23:18, schrieb Govindarajan Srinivasaraghavan
[via Apache Flink User Mailing List archive.]:
> 
> 
> Hi,
> 
> 
> I have two streams reading from kafka, one for data and other for
> control.
> The data stream is split by type and there are around six types. Each
> type
> has its own processing logic and finally everything has to be merged to
> get
> the collective state per device. I was thinking I could connect multiple
> streams, process and maintain state but connect only supports two
> streams.
> Is there some way to achieve my desired functionality?
> 
> 
> By the way after split and some processing all of them are keyed streams.
> 
> 
> 
> 
> ___
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Connect-more-than-two-streams-tp14419.html
> 
> To unsubscribe from Apache Flink User Mailing List archive., visit
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=1=am9uYXNAaHVudHVuLmRlfDF8LTcyNDQ4MDc3MA==




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Connect-more-than-two-streams-tp14419p14428.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Anomaly Detection with Flink-ML

2017-07-07 Thread Jonas Gröger
Hello Jeremy,

it looks like what you are looking for is map (1 in, 1 out) / flatmap (1 in,
0-n out) for preprocessing on a single element basis as well as windows for
looking at related MetricDefinition elements calculating some result.

I suggest you look into Windows
(https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html)
and basic transformations
(https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#datastream-transformations).

Regards,
Jonas



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Anomaly-Detection-with-Flink-ML-tp14149p14151.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Related datastream

2017-06-21 Thread Jonas Gröger
Hi nragon,

apparently I didn't read the P.S. since I assumed its not important. Silly
me.

So you are trying to join stream A and B to stream C with stream A and B
being keyed. Alright. Are how often do matching elements (matched by primary
key) from A and B arrive on your operator to-be-implemented?

This question can't be universially answered without having some more
constraints on the streams A and B. To me this sounds more like a batch job
because it needs to have the whole stream A or B in memory in order to join
every element.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Related-datastream-tp13901p13904.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Related datastream

2017-06-21 Thread Jonas Gröger
Hey nragon!

Do the two streams A and B have some sort of id or key or how do you plan on
joining them?
Do you just want to join A and B with elements a and b as they arrive (one
in state and join with the next arriving one from the other stream)?

>From what you are asking, this should be no problem but we need a little bit
more clarification here.

-- Jonas



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Related-datastream-tp13901p13903.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: How to sessionize stream with Apache Flink?

2017-06-18 Thread Jonas
Hey Milad,

since you cannot look into the future which element comes next, you have to
"lag" one behind. This requires building an operator that creates 2-tuples
from incoming elements containing (current-1, current), so basically a
single value state that emits the last and the current element in a tuple.

In a trigger, the element is then of the 2-tuple type and you can see
changes "beforehand". The last element of 1's is then (1, 2).

Hope this helps.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-sessionize-stream-with-Apache-Flink-tp13817p13818.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Start streaming tuples depending on another streams rate

2017-02-12 Thread Jonas
For 2: You can also NOT read the Source (i.e. Kafka) while doing that. This
way you don't have to buffer.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Start-streaming-tuples-depending-on-another-streams-rate-tp11542p11590.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Start streaming tuples depending on another streams rate

2017-02-10 Thread Jonas
Tzu-Li (Gordon) Tai wrote
> Stream A has a rate of 100k tuples/s. After processing the whole Kafka
> queue, the rate drops to 10 tuples/s.

Absolutely correct.
Tzu-Li (Gordon) Tai wrote
> So what you are looking for is that flatMap2 for stream B only doing work
> after the job reaches the latest record in stream A?

Very much so.
Tzu-Li (Gordon) Tai wrote
> You could perhaps insert a special marker event into stream A every time
> you start running this job.Stream A has a rate of 100k tuples/s. After
> processing the whole Kafka queue, the rate drops to 10 tuples/s.

I tried using stream punctuations but it is hard to know which one is the
"last" punctuation, since after some time I might have mutliple in there.
Imagine stream A has in total about 100M messages. We insert a Punctuation
as message number 100.000.001. Works.
Next week we need to start the job again. Stream A now has 110M messages and
2 punctuation marks. One at the 100.000.001 and one at 110.000.001.
 I cannot decide which one is the latest while processing the the stream.

Tzu-Li (Gordon) Tai wrote
> Then, once flatMap2 is invoked with the special event, you can toggle
> logic in flatMap2 to actually start doing stuff.

This has the issue that while stream A is being processed, I lose tuples
from stream B because it is not "stopped".I think my use case is currently
not really doable in Flink.-- Jonas



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Start-streaming-tuples-depending-on-another-streams-rate-tp11542p11559.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Remove old <1.0 documentation from google

2017-02-09 Thread Jonas
Doesen't Google offer wildcard removal?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Remove-old-1-0-documentation-from-google-tp11541p11551.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Remove old <1.0 documentation from google

2017-02-09 Thread Jonas
Maybe add "This documentation is outdated. Please switch to a newer version
by clicking here ".



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Remove-old-1-0-documentation-from-google-tp11541p11544.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: How about Discourse (https://www.discourse.org/) for this mailing list

2017-02-09 Thread Jonas
I might want to add that although these two are available, the content of the
submissions is still often unreadable and not properly formatted. At least
for me this is annoying to read. Additionally we have Stackoverflow which
has a nice UI for editing but not really good for discussions.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-about-Discourse-https-www-discourse-org-for-this-mailing-list-tp11448p11543.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Start streaming tuples depending on another streams rate

2017-02-09 Thread Jonas
Hi!I have a job that uses a RichCoFlatMapFunction of two streams: A and B.In
MyOp, the A stream tuples are combined to form a state using a
ValueStateDescriptor. Stream A is usually started from the beginning of a
Kafka topic. Stream A has a rate of 100k tuples/s. After processing the
whole Kafka queue, the rate drops to 10 tuples/s. A big drop.What I now want
is that while tuples from A are being processed in flatMap1, the stream B in
flatMap2 should wait until the rate of the A stream has dropped and only
then, be flatMap2 should be called. Ideally, this behaviour would be
captured in a separate operator, like RateBasedStreamValve or something like
that :)To solve this, my idea is to add a counter/timer in the
RichCoFlatMapFunction that counts how many tuples have been processed from
A. If the rate drops below a threshold (here maybe 15 tuples/s), flatMap2
that proesses tuples from B empties the buffer. However, this would make my
RichCoFlatMapFunction much bigger and would not allow for operator reuse in
other scenarios.I'm of course happy to answer if something is unclear.--
Jonas



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Start-streaming-tuples-depending-on-another-streams-rate-tp11542.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Remove old <1.0 documentation from google

2017-02-09 Thread Jonas
Hi!

Its really annoying that if you search for something in Flink, you often get
old documentation from Google. Example: Google "flink quickstart scala" and
you get
https://ci.apache.org/projects/flink/flink-docs-release-0.8/scala_api_quickstart.html




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Remove-old-1-0-documentation-from-google-tp11541.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: How about Discourse (https://www.discourse.org/) for this mailing list

2017-02-06 Thread Jonas
Instead of Nabble I will use PonyMail now :) Thanks. Didn't know it existed.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-about-Discourse-https-www-discourse-org-for-this-mailing-list-tp11448p11457.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Many streaming jobs vs one

2017-02-05 Thread Jonas
I recommend multiple Jobs. You can still share most of the code by creating
Java / Scala packages. THis makes it easier to update Jobs.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Many-streaming-jobs-vs-one-tp11449p11450.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Improving Flink Performance

2017-02-05 Thread Jonas
Using a profiler I found out that the main performance problem (80%) was
spent in a domain specific data structure. After implementing it with a more
efficient one, the performance problems are gone.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-Performance-tp11248p11447.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Connection refused error when writing to socket?

2017-01-31 Thread Jonas
Can you try opening a socket with netcat on localhost?

nc -lk 9000

and see it this works? For me this works.

-- Jonas



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Connection-refused-error-when-writing-to-socket-tp11372p11376.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Calling external services/databases from DataStream API

2017-01-30 Thread Jonas
I have a similar usecase where I (for the purposes of this discussion) have a
GeoIP Database that is not fully available from the start but will
eventually be "full". The GeoIP tuples are coming in one after another.
After ~4M tuples the GeoIP database is complete.

I also need to do the same query.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Calling-external-services-databases-from-DataStream-API-tp11366p11367.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Regarding Flink as a web service

2017-01-29 Thread Jonas
You could write your data back to Kafka using the FlinkKafkaProducer and then
use websockets to read from kafka using NodeJS or other.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Regarding-Flink-as-a-web-service-tp11364p11365.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: setParallelism() for addSource() in streaming

2017-01-28 Thread Jonas
env.setParallelism(5).addSource(???) will set the default parallelism for
this Job to 5 and then add the source.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/setParallelism-for-addSource-in-streaming-tp11343p11356.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Improving Flink Performance

2017-01-26 Thread Jonas
JProfiler



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-Performance-tp11248p11311.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Improving Flink Performance

2017-01-25 Thread Jonas
Images:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n11305/Tv6KnR6.png
and
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n11305/Tv6KnR6.png



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-Performance-tp11248p11307.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Improving Flink Performance

2017-01-25 Thread Jonas
I ran a profiler on my Job and it seems that most of the time, its waiting :O
See here:

Also, the following code snippet executes unexpectedly slow: as you can see
in this call graph:

*Any ideas? *



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-Performance-tp11248p11305.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Improving Flink Performance

2017-01-25 Thread Jonas
I tried and it added a little performance (~10%) but nothing outstanding.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-Performance-tp11248p11301.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Improving Flink Performance

2017-01-24 Thread Jonas
The performance hit due to decoding the JSON is expected and there is not a
lot (except for changing the encoding that I can do about that). Alright.

When joining the above stream with another stream I get another performance
hit by ~80% so that in the end I have only 1k msgs/s remaining. Do you know
how to improve that? Might setting the buffer size / timeout be worth
exploring?




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-Performance-tp11248p11272.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Improving Flink Performance

2017-01-24 Thread Jonas
Hello!I'm reposting this since the other thread had some formatting issues
apparently. I hope this time it works.I'm having performance problems with a
Flink job. If there is anything valuable missing, please ask and I will try
to answer ASAP. My job looks like this:First, I read data from Kafka. This
is very fast at 100k msgs/s. The data is decoded, a type is added (we have
multiple message types per Kafka topic). Then we select the TYPE_A messages,
create a Scala entity out of if (a case class). Afterwards in the
MapEntityToMultipleEntities the Scala entities are split into multiple.
Finally a watermark is added.As you can see the data is not keyed in any way
yet. *Is there a way to make this faster?*/Measurements were taken withand
/I'm running this on a Intel i5-3470, 16G RAM, Ubuntu 16.04.1 LTS on Flink
1.1.4



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-Performance-tp11248.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Improving Flink performance

2017-01-24 Thread Jonas
I don't even have images in there :O Will delete this thread and create a new
one.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-performance-tp11211p11245.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Improving Flink performance

2017-01-23 Thread Jonas
I received it well-formatted. May it be that the issue is your Mail reader?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-performance-tp11211p11225.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Improving Flink performance

2017-01-23 Thread Jonas
Hello!

I'm having performance problems with a Flink job. If there is anything
valuable missing, please ask and I will try to answer ASAP. My job looks
like this:



First, I read data from Kafka. This is very fast at 100k msgs/s. The data is
decoded, a type is added (we have multiple message types per Kafka topic).
Then we select the TYPE_A messages, create a Scala entity out of if (a case
class). Afterwards in the MapEntityToMultipleEntities the Scala entities are
split into multiple. Finally a watermark is added.

As you can see the data is not keyed in any way yet. *Is there a way to make
this faster?*


/Measurements were taken with

and /



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-performance-tp11211.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: keyBy called twice. Second time, INetAddress and Array[Byte] are empty

2017-01-20 Thread Jonas
Hey Jamie,

It turns out you were right :) I wrote my own implementation of IPAddress
and then it worked.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/keyBy-called-twice-Second-time-INetAddress-and-Array-Byte-are-empty-tp10907p11179.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Kafka Fetch Failed / DisconnectException

2017-01-18 Thread Jonas
Hallo Fabian,

that IS the error message. The job continues to run without restarting.
There is not really more to see from the logs.

-- Jonas



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Fetch-Failed-DisconnectException-tp11142p11146.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Kafka Fetch Failed / DisconnectException

2017-01-18 Thread Jonas
Hi!

According to the output, I'm having some problems with the KafkaConsumer09.
It reports the following on stdout:



Is that something I should worry about?

-- Jonas



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Fetch-Failed-DisconnectException-tp11142.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: How to read from a Kafka topic from the beginning

2017-01-16 Thread Jonas
You also need to have a new  for this to work.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-read-from-a-Kafka-topic-from-the-beginning-tp3522p11087.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: keyBy called twice. Second time, INetAddress and Array[Byte] are empty

2017-01-09 Thread Jonas
So I created a minimal working example where this behaviour can still be
seen. It is 15 LOC and can be downloaded here:
https://github.com/JonasGroeger/flink-inetaddress-zeroed

To run it, use sbt:

If you don't want to do the above fear not, here is the code:

For some reason, java.net.InetAddress objects get zeroed. Why is that?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/keyBy-called-twice-Second-time-INetAddress-and-Array-Byte-are-empty-tp10907p10947.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


keyBy called twice. Second time, INetAddress and Array[Byte] are empty

2017-01-07 Thread Jonas
Hi!

I have two streams that I connect and call keyBy after.


I put some debugging code in the bKeySelector. Turns out, it gets called
twice from different areas. Stacktrace here:
https://gist.github.com/JonasGroeger/8ce218ee1c19f0639fa990f43b5f9e2b

It contains 1 package which gets keyed twice for some reason from

The case class MyPackage has mostly Ints and Strings. The IP addresses
however are done with  references:

Now for some reason these are on the second call of the keying function
which looks like this:

Also all Arrays have another reference set. Before and after:
)

How come?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/keyBy-called-twice-Second-time-INetAddress-and-Array-Byte-are-empty-tp10907.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.