Re: Difference between Session Mode and Session Job(Flink Opearator)

2022-07-07 Thread bat man
Awesome, thanks!

On Thu, Jul 7, 2022 at 1:21 PM Gyula Fóra  wrote:

> Hi!
>
> The Flink Kubernetes Operator on a high level supports 3 types of
> resources:
>
>1. Session Deployment : Empty Flink Session cluster
>2. Application Deployment: Flink Application cluster (single job /
>cluster)
>3. Session Job: Flink Job deployed to an existing Session Deployment.
>
> So in other words, the Session deployment only creates the Flink cluster.
> The Session job can be deployed to an existing session deployment and it
> represents an actual Flink job.
>
> I hope that helps :)
> Gyula
>
> On Thu, Jul 7, 2022 at 7:42 AM bat man  wrote:
>
>> Hi,
>>
>> I want to understand the difference between session mode and the new
>> deployment mode - Flink Session Job which I believe is newly introduced as
>> part of the Flink Operator(1.15) release.
>> What's the benefit of using this mode as opposed to session mode as both
>> run sessions to which flink jobs can be submitted.
>>
>> Cheers.
>> H.
>>
>


Difference between Session Mode and Session Job(Flink Opearator)

2022-07-06 Thread bat man
Hi,

I want to understand the difference between session mode and the new
deployment mode - Flink Session Job which I believe is newly introduced as
part of the Flink Operator(1.15) release.
What's the benefit of using this mode as opposed to session mode as both
run sessions to which flink jobs can be submitted.

Cheers.
H.


Re: context.timestamp null in keyedprocess function

2022-06-15 Thread bat man
Has anyone experienced this or has any clue?

On Tue, Jun 14, 2022 at 6:21 PM bat man  wrote:

> Hi,
>
> We are using flink 12.1 on AWS EMR. The job reads the event stream and
> enrich stream from another topic.
> We extend AssignerWithPeriodicWatermarks to assign watermarks and extract
> timestamp from the event and handle idle source partitions.
> AutoWatermarkInterval set to 5000L.
>  The timestamp extractor looks like below -
>
> @Override
> public long extractTimestamp(Raw event, long
> previousElementTimestamp) {
> lastRecordProcessingTime = System.currentTimeMillis();
> Double eventTime =
>
> Double.parseDouble(event.getTimestamp().toString()).longValue();
> long timestamp = Instant.ofEpochMilli(eventTime
> *1_000).toEpochMilli();
> if (timestamp > currentMaxTimestamp) {
> currentMaxTimestamp = timestamp;
> }
> return timestamp;
> }
>
> Second step the rules are joined to events, this is done in keyedprocess
> function.
> What we have observed is that at times when the job starts consuming from
> the beginning of the event source stream, the timestamp accessed in
> the keyedprocess fn using context.timestamp comes as null and the code is
> throwing NPE.
> This happens only for some records intermittently and the same event when
> we try to process in another environment it processes fine, that means the
> event is getting parsed fine.
>
> What could be the issue, anyone has any idea, because as far as timestamp
> goes it could only be null if the timestamp extractor sends null.
>
> Thanks.
>


context.timestamp null in keyedprocess function

2022-06-14 Thread bat man
Hi,

We are using flink 12.1 on AWS EMR. The job reads the event stream and
enrich stream from another topic.
We extend AssignerWithPeriodicWatermarks to assign watermarks and extract
timestamp from the event and handle idle source partitions.
AutoWatermarkInterval set to 5000L.
 The timestamp extractor looks like below -

@Override
public long extractTimestamp(Raw event, long
previousElementTimestamp) {
lastRecordProcessingTime = System.currentTimeMillis();
Double eventTime =

Double.parseDouble(event.getTimestamp().toString()).longValue();
long timestamp = Instant.ofEpochMilli(eventTime
*1_000).toEpochMilli();
if (timestamp > currentMaxTimestamp) {
currentMaxTimestamp = timestamp;
}
return timestamp;
}

Second step the rules are joined to events, this is done in keyedprocess
function.
What we have observed is that at times when the job starts consuming from
the beginning of the event source stream, the timestamp accessed in
the keyedprocess fn using context.timestamp comes as null and the code is
throwing NPE.
This happens only for some records intermittently and the same event when
we try to process in another environment it processes fine, that means the
event is getting parsed fine.

What could be the issue, anyone has any idea, because as far as timestamp
goes it could only be null if the timestamp extractor sends null.

Thanks.


Re: Flink on Native Kubernetes S3 checkpointing error

2021-11-22 Thread bat man
Hi Matthias,

Looks like the service account token volume projection was not working fine
with the EKS version I was running. Upgraded the version and with the same
configs now the s3 checkpointing is working fine.
So, in short, on AWS use EKS v1.20+ for IAM Pod Identity Webhook.

Thanks,
Hemant

On Mon, Nov 22, 2021 at 7:26 PM Matthias Pohl 
wrote:

> Hi bat man,
> this feature seems to be tied to a certain AWS SDK version [1] which you
> already considered. But I checked the version used in Flink 1.13.1 for the
> s3 filesystem. It seems like the version that's used (1.11.788) is good
> enough to provide this feature (which was added in 1.11.704):
> ```
> $ git checkout release-1.13.1
> $ cd flink-filesystems/flink-s3-fs-base; mvn dependency:tree | grep
> com.amazonaws:aws-java-sdk-s3
> [INFO] +- com.amazonaws:aws-java-sdk-s3:jar:1.11.788:compile
> ```
>
> Matthias
>
> [1]
> https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts-minimum-sdk.html
>
> On Mon, Nov 22, 2021 at 8:04 AM bat man  wrote:
>
>> Hi,
>>
>> I am using flink 1.13.1 to use checkpointing(RocksDB) on s3 with native
>> kubernetes.
>> Passing in this parameter to job -
>>
>>
>> *-Dfs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider*
>> I am getting this error in job-manager logs -
>>
>> *Caused by: com.amazonaws.AmazonClientException: No AWS Credentials
>> provided by WebIdentityTokenCredentialsProvider :
>> com.amazonaws.SdkClientException: Unable to locate specified web identity
>> token file: /var/run/secrets/eks.amazonaws.com/serviceaccount/token
>> <http://eks.amazonaws.com/serviceaccount/token> at
>> org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:139)
>> ~[?:?]*
>>
>> Describing the pod shows that that volume is mounted to the jobmanager
>> pod.
>> Is there anything specific that needs to be done as on the same EKS
>> cluster for testing I ran a sample pod with aws cli image and it's able to
>> do *ls* on the s3 buckets.
>> Is this related to aws sdk used in Flink 1.13.1, shall I try with recent
>> flink versions.
>>
>> Any help would be appreciated.
>>
>> Thanks.
>>
>


Flink on Native Kubernetes S3 checkpointing error

2021-11-21 Thread bat man
Hi,

I am using flink 1.13.1 to use checkpointing(RocksDB) on s3 with native
kubernetes.
Passing in this parameter to job -

*-Dfs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider*
I am getting this error in job-manager logs -

*Caused by: com.amazonaws.AmazonClientException: No AWS Credentials
provided by WebIdentityTokenCredentialsProvider :
com.amazonaws.SdkClientException: Unable to locate specified web identity
token file: /var/run/secrets/eks.amazonaws.com/serviceaccount/token
 at
org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:139)
~[?:?]*

Describing the pod shows that that volume is mounted to the jobmanager pod.
Is there anything specific that needs to be done as on the same EKS cluster
for testing I ran a sample pod with aws cli image and it's able to do *ls* on
the s3 buckets.
Is this related to aws sdk used in Flink 1.13.1, shall I try with recent
flink versions.

Any help would be appreciated.

Thanks.


Re: Flink S3 Presto Checkpointing Permission Forbidden

2021-11-21 Thread bat man
Hi Dennis,

Were you able to use checkpointing on s3 with native kubernetes. I am using
flink 1.13.1 and did tried your solution of passing the
webidentitytokencredentialsprovider.

*-Dfs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider*
I am getting this error in job-manager logs - *Caused by:
com.amazonaws.SdkClientException: Unable to locate specified web identity
token file: /var/run/secrets/eks.amazonaws.com/serviceaccount/token
*
Describing the pod shows that that volume is mounted to the pod.
Is there anything specific that needs to be done as on the same EKS cluster
for testing I ran a sample pod with aws cli image and it's able to do *ls*
on the same s3 bucket.

Thanks,
Hemant

On Mon, Oct 11, 2021 at 1:56 PM Denis Nutiu  wrote:

> Hi Rommel,
>
>
>
> Thanks for getting back to me and for your time.
>
> I switched to the Hadoop plugin and used the following authentication
> method that worked:
> *fs.s3a.aws.credentials.provider:
> "com.amazonaws.auth.WebIdentityTokenCredentialsProvider"*
>
>
> Turns out that I was using the wrong credentials provider. Reading
> AWSCredentialProvider[1] and seeing that I have the
> AWS_WEB_IDENTITY_TOKEN_FILE variable in the container allowed me to find
> the correct one.
>
>
> [1]
> https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html
>
>
> Best,
>
> Denis
>
>
>
>
>
> *From:* Rommel Holmes 
> *Sent:* Saturday, October 9, 2021 02:09
> *To:* Denis Nutiu 
> *Cc:* user 
> *Subject:* Re: Flink S3 Presto Checkpointing Permission Forbidden
>
>
>
> You already have s3 request ID, you can easily reach out to AWS tech
> support to know what account was used to write to S3. I guess that account
> probably doesn't have permission to do the following:
>
>
>
> "s3:GetObject",
> "s3:PutObject",
> "s3:DeleteObject",
> "s3:ListBucket"
>
> Then grant the account with the permission in k8s. Then you should be good
> to go.
>
>
>
>
>
>
>
>
>
> On Fri, Oct 8, 2021 at 6:06 AM Denis Nutiu  wrote:
>
> Hello,
>
>
>
> I'm trying to deploy my Flink cluster inside of an AWS EKS using Flink
> Native. I want to use S3 as a filesystem for checkpointing, and giving the
> following options related to flink-s3-fs-presto:
>
>
>
> "-Dhive.s3.endpoint": "https://s3.eu-central-1.amazonaws.com;
> "-Dhive.s3.iam-role": "arn:aws:iam::xxx:role/s3-flink"
> "-Dhive.s3.use-instance-credentials": "true"
> "-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS":
> "flink-s3-fs-presto-1.13.2.jar"
> "-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS":
> "flink-s3-fs-presto-1.13.2.jar"
> "-Dstate.backend": "rocksdb"
> "-Dstate.backend.incremental": "true"
> "-Dstate.checkpoints.dir": "s3://bucket/checkpoints/"
> "-Dstate.savepoints.dir": "s3://bucket/savepoints/"
>
>
>
> But my job fails with:
>
>
>
> 2021-10-08 11:38:49,771 WARN
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Could
> not properly dispose the private states in the pending checkpoint 45 of job
> 75bdd6fb6e689961ef4e096684e867bc.
> com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException:
> com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service:
> Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID:
> JEZ3X8YPDZ2TF4T9; S3 Extended Request ID:
> u2RBcDpifTnzO4hIOGqgTOKDY+nw6iSeSepd4eYThITCPCpVddIUGMU7jY5DpJBg1LkPuYXiH9c=;
> Proxy: null), S3 Extended Request ID:
> u2RBcDpifTnzO4hIOGqgTOKDY+nw6iSeSepd4eYThITCPCpVddIUGMU7jY5DpJBg1LkPuYXiH9c=
> (Path: s3://bucket/checkpoints/75bdd6fb6e689961ef4e096684e867bc/chk-45)
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573)
> ~[?:?]
> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
> ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
> ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.directory(PrestoS3FileSystem.java:450)
> ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.delete(PrestoS3FileSystem.java:427)
> ~[?:?]
> at
> org.apache.flink.fs.s3presto.common.HadoopFileSystem.delete(HadoopFileSystem.java:160)
> ~[?:?]
> at
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.delete(PluginFileSystemFactory.java:155)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.disposeOnFailure(FsCheckpointStorageLocation.java:117)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.discard(PendingCheckpoint.java:588)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:60)
> 

Re: Flink Native Kubernetes - Configuration kubernetes.flink.log.dir not working

2021-09-18 Thread bat man
I was able to verify that this works fine
Dkubernetes.flink.log.dir="/var/log/containers" with Flink 1.13.1.
Initially there were some issues with the deployment, once fixed it worked
fine.

Cheers,
Hemant

On Sat, Sep 18, 2021 at 1:58 PM Yang Wang  wrote:

> I think it might be a bug that "kubernetes.flink.log.dir" could not take
> effect. I have created a ticket[1].
>
> Could you please try with "-Denv.log.dir=/var/log/containers"?
>
> [1]. https://issues.apache.org/jira/browse/FLINK-24334
>
> Best,
> Yang
>
> Guowei Ma  于2021年9月14日周二 下午4:48写道:
>
>> Hi
>>
>> Maybe you could try the `kubectl describe pod -n ${namespace}
>> ${podname}`  to see what happened atm.
>>
>> Best,
>> Guowei
>>
>>
>> On Tue, Sep 14, 2021 at 2:58 PM bat man  wrote:
>>
>>> Hello Guowei,
>>>
>>> The pods terminate almost within a second so am unable to pull any logs.
>>> Is there any way I can pull the logs?
>>>
>>> Thanks,
>>> Hemant
>>>
>>> On Tue, Sep 14, 2021 at 12:22 PM Guowei Ma  wrote:
>>>
>>>> Hi,
>>>>
>>>> Could you share some logs when the job fails?
>>>>
>>>> Best,
>>>> Guowei
>>>>
>>>>
>>>> On Mon, Sep 13, 2021 at 10:59 PM bat man  wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am running a POC to evaluate Flink on Native Kubernetes. I tried
>>>>> changing the default log location by using the configuration -
>>>>> kubernetes.flink.log.dir
>>>>> However, the job in application mode fails after bringing up the task
>>>>> manager. This is the command I use -
>>>>>
>>>>> ./bin/flink run-application --target kubernetes-application
>>>>> -Dkubernetes.cluster-id=flink-k8s-poc-app
>>>>> -Dkubernetes.container.image=
>>>>> -Dkubernetes.flink.log.dir="/var/log/containers"
>>>>> local:///opt/flink/usrlib/uber.jar
>>>>>
>>>>> Is there something else which needs to be done to write logs to a
>>>>> different location like creating the folders in the custom image.
>>>>>
>>>>> Thanks.
>>>>>
>>>>


Re: Flink Native Kubernetes - Configuration kubernetes.flink.log.dir not working

2021-09-14 Thread bat man
Hello Guowei,

The pods terminate almost within a second so am unable to pull any logs. Is
there any way I can pull the logs?

Thanks,
Hemant

On Tue, Sep 14, 2021 at 12:22 PM Guowei Ma  wrote:

> Hi,
>
> Could you share some logs when the job fails?
>
> Best,
> Guowei
>
>
> On Mon, Sep 13, 2021 at 10:59 PM bat man  wrote:
>
>> Hi,
>>
>> I am running a POC to evaluate Flink on Native Kubernetes. I tried
>> changing the default log location by using the configuration -
>> kubernetes.flink.log.dir
>> However, the job in application mode fails after bringing up the task
>> manager. This is the command I use -
>>
>> ./bin/flink run-application --target kubernetes-application
>> -Dkubernetes.cluster-id=flink-k8s-poc-app
>> -Dkubernetes.container.image=
>> -Dkubernetes.flink.log.dir="/var/log/containers"
>> local:///opt/flink/usrlib/uber.jar
>>
>> Is there something else which needs to be done to write logs to a
>> different location like creating the folders in the custom image.
>>
>> Thanks.
>>
>


Flink Native Kubernetes - Configuration kubernetes.flink.log.dir not working

2021-09-13 Thread bat man
Hi,

I am running a POC to evaluate Flink on Native Kubernetes. I tried changing
the default log location by using the configuration -
kubernetes.flink.log.dir
However, the job in application mode fails after bringing up the task
manager. This is the command I use -

./bin/flink run-application --target kubernetes-application
-Dkubernetes.cluster-id=flink-k8s-poc-app
-Dkubernetes.container.image=
-Dkubernetes.flink.log.dir="/var/log/containers"
local:///opt/flink/usrlib/uber.jar

Is there something else which needs to be done to write logs to a
different location like creating the folders in the custom image.

Thanks.


KafkaFetcher [] - Committing offsets to Kafka failed.

2021-08-26 Thread bat man
Hi,

I am using flink 12.1 to consume data from kafka in a streaming job. Using
the flink-connector-kafka_2.12:1.12.1. Kafka broker version is 2.2.1
 In logs I see warnings like this -

2021-08-26 13:36:49,903 WARN
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher [] -
Committing offsets to Kafka failed. This does not compromise Flink's
checkpoints.
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
completed since the group has already rebalanced and assigned the
partitions to another member.
This means that the time between subsequent calls to poll() was longer than
the configured max.poll.interval.ms, which typically implies that the poll
loop is spending too much time message processing.
You can address this either by increasing max.poll.interval.ms or by
reducing the maximum size of batches returned in poll() with
max.poll.records.
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:840)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:790)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:910)

at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:890)

at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)

at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)

at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)

at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)

at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)

at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)

at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1256)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1200)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1135)

at
org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:258)


I understand that this might not cause an issue as checkpointing is not
impacted, however metrics monitoring might as I am using burrow to monitor
group offsets. I have already tried to change below properties in kafka
producer configs -

kafkaProps.setProperty("max.poll.interval.ms","90");
kafkaProps.setProperty("max.poll.records","200");
kafkaProps.setProperty("heartbeat.interval.ms","1000");
kafkaProps.setProperty("request.timeout.ms","4");
kafkaProps.setProperty("session.timeout.ms","1");
But the warnings are still present in the logs.

In addition I see this error just before this warn -
ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] -
[Consumer clientId=consumer-3, groupId=xxx] Offset commit failed on
partition xxx-1 at offset 33651:
The coordinator is not aware of this member.

The code uses watermarkstrategy to extract timestamp and emit watermark.

Any clue is much appreciated.

Thanks,
Hemant


Re: High DirectByteBuffer Usage

2021-07-15 Thread bat man
I am not using the Kafka SSL port.

On Thu, Jul 15, 2021 at 9:48 PM Alexey Trenikhun  wrote:

> Just in case, make sure that you are not using Kafka SSL port without
> setting security protocol, see [1]
>
> [1] https://issues.apache.org/jira/plugins/servlet/mobile#issue/KAFKA-4090
> ------
> *From:* bat man 
> *Sent:* Wednesday, July 14, 2021 10:55:54 AM
> *To:* Timo Walther 
> *Cc:* user 
> *Subject:* Re: High DirectByteBuffer Usage
>
> Hi Timo,
>
> I am looking at these options.
> However, I had a couple of questions -
> 1. The off-heap usage grows overtime. My job does not do any off-heap
> operations so I don't think there is a leak there. Even after GC it keeps
> adding a few MBs after hours of running.
> 2. Secondly, I am seeing as the incoming record volume increases the
> off-heap usage grows. What's the reason for this?
>
> I am using 1.9. Is there any known bug which is causing this issue?
>
> Thanks,
> Hemant
>
> On Wed, Jul 14, 2021 at 7:30 PM Timo Walther  wrote:
>
> Hi Hemant,
>
> did you checkout the dedicated page for memory configuration and
> troubleshooting:
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_trouble/#outofmemoryerror-direct-buffer-memory
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_trouble/#container-memory-exceeded
>
> It is likely that the high number of output streams could cause your
> issues.
>
> Regards,
> Timo
>
>
>
>
> On 14.07.21 08:46, bat man wrote:
> > Hi,
> > I have a job which reads different streams from 5 kafka topics. It
> > filters data and then data is streamed to different operators for
> > processing. This step involves data shuffling.
> >
> > Also, once data is enriched in 4 joins(KeyedCoProcessFunction)
> > operators. After joining the data is written to different kafka topics.
> > There are a total of 16 different output streams which are written to 4
> > topics.
> >
> > I have been facing some issues with yarn killing containers. I took the
> > heap dump and ran it through JXray [1]. Heap usage is not high. One
> > thing which stands out is off-heap usage which is very high. My guess is
> > this is what is killing the containers as the data inflow increases.
> >
> > Screenshot 2021-07-14 at 11.52.41 AM.png
> >
> >
> >  From the stack above is this usage high because of many output streams
> > being written to kafka topics. As the stack shows RecordWriter holding
> > off this DirectByteBuffer. I have assigned Network Memory as 1GB, and
> > --MaxDirectMemorySize also shows ~1GB for task managers.
> >
> >  From here[2] I found that setting -Djdk.nio.maxCachedBufferSize=262144
> > limits the temp buffer cache. Will it help in this case?
> > jvm version used is - JVM: OpenJDK 64-Bit Server VM - Red Hat, Inc. -
> > 1.8/25.282-b08
> >
> > [1] - https://jxray.com <https://jxray.com>
> > [2] -
> >
> https://dzone.com/articles/troubleshooting-problems-with-native-off-heap-memo
> > <
> https://dzone.com/articles/troubleshooting-problems-with-native-off-heap-memo
> >
> >
> > Thanks,
> > Hemant
>
>


Re: High DirectByteBuffer Usage

2021-07-14 Thread bat man
Hi Timo,

I am looking at these options.
However, I had a couple of questions -
1. The off-heap usage grows overtime. My job does not do any off-heap
operations so I don't think there is a leak there. Even after GC it keeps
adding a few MBs after hours of running.
2. Secondly, I am seeing as the incoming record volume increases the
off-heap usage grows. What's the reason for this?

I am using 1.9. Is there any known bug which is causing this issue?

Thanks,
Hemant

On Wed, Jul 14, 2021 at 7:30 PM Timo Walther  wrote:

> Hi Hemant,
>
> did you checkout the dedicated page for memory configuration and
> troubleshooting:
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_trouble/#outofmemoryerror-direct-buffer-memory
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_trouble/#container-memory-exceeded
>
> It is likely that the high number of output streams could cause your
> issues.
>
> Regards,
> Timo
>
>
>
>
> On 14.07.21 08:46, bat man wrote:
> > Hi,
> > I have a job which reads different streams from 5 kafka topics. It
> > filters data and then data is streamed to different operators for
> > processing. This step involves data shuffling.
> >
> > Also, once data is enriched in 4 joins(KeyedCoProcessFunction)
> > operators. After joining the data is written to different kafka topics.
> > There are a total of 16 different output streams which are written to 4
> > topics.
> >
> > I have been facing some issues with yarn killing containers. I took the
> > heap dump and ran it through JXray [1]. Heap usage is not high. One
> > thing which stands out is off-heap usage which is very high. My guess is
> > this is what is killing the containers as the data inflow increases.
> >
> > Screenshot 2021-07-14 at 11.52.41 AM.png
> >
> >
> >  From the stack above is this usage high because of many output streams
> > being written to kafka topics. As the stack shows RecordWriter holding
> > off this DirectByteBuffer. I have assigned Network Memory as 1GB, and
> > --MaxDirectMemorySize also shows ~1GB for task managers.
> >
> >  From here[2] I found that setting -Djdk.nio.maxCachedBufferSize=262144
> > limits the temp buffer cache. Will it help in this case?
> > jvm version used is - JVM: OpenJDK 64-Bit Server VM - Red Hat, Inc. -
> > 1.8/25.282-b08
> >
> > [1] - https://jxray.com <https://jxray.com>
> > [2] -
> >
> https://dzone.com/articles/troubleshooting-problems-with-native-off-heap-memo
> > <
> https://dzone.com/articles/troubleshooting-problems-with-native-off-heap-memo
> >
> >
> > Thanks,
> > Hemant
>
>


Re: Flink 1.13 Native Kubernetes - Custom Pod Templates

2021-07-13 Thread bat man
Hi Yang,

One more follow-up question on the custom pod-templates for JobManager and
Taskmanager - As you mention pod template is for advanced features so is it
that in custom template we just need to include the custom feature like if
I want to include a volume mount or sid-car. I don't have to include any
flink specific resource, since it is decorated internally, pod will come up
with the combined resources - flink + custom.

Let me know if my understanding is correct.

Thanks,
Hemant

On Mon, Jul 12, 2021 at 9:03 AM Yang Wang  wrote:

> Hi Hemant,
>
> Thanks for trying the native Kubernetes integration and share your
> feedback.
>
> Pod template is a supplementary feature for those advanced features(e.g.
> volume mounts, sidecar container, init container, etc.),
> which are not supported by Flink config options[1].
>
> By default, Flink is using an empty pod template to initialize the
> JobManager/TaskManager pod, then it is decorated internally.
> For example, set the pod resources(mem, cpu), the starting commands, as
> well as the image and etc.
>
> However, not all the fields could be configured via pod template. Like the
> pod resource you mentioned, it needs to be configured
> via Flink config options. I believe you could find the detailed
> information in the documentation[2].
>
> If you want to capture and store the metrics, the prometheus reporter
> should be right direction[3].
>
> [1].
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#kubernetes
> [2].
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#fields-overwritten-by-flink
> [3].
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/metric_reporters/#prometheus
>
> Best,
> Yang
>
>
> bat man  于2021年7月11日周日 下午5:27写道:
>
>> Hi,
>>
>> I am running a POC to run flink 1.13 on Native Kubernetes. Per
>> documentation [1] this supports custom pod templates. Are there any
>> references for sample pod templates which can be used as a baseline to
>> build upon. Could not find any references, documentation[1] has one sample
>> for pod-template.yaml which is for flink-main-container.
>>
>> 1. I would like to specify the task manager/job manager memory
>> configurations from pod templates.
>> 2. Secondly, how can the metrics be captured, is it possible to use
>> service-monitor.
>>
>> [1] -
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/
>>
>> Thanks,
>> Hemant
>>
>


Re: Flink 1.13 Native Kubernetes - Custom Pod Templates

2021-07-13 Thread bat man
Thanks Yang for the information.

Are all the commands applicable with Kubernetes integration like taking
savepoint, starting from savepoint. I see commands from here [1] for
savepoint and on yarn as well, nothing specific to kubernetes.

[1] -
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/savepoints/

Thanks,
Hemant

On Mon, Jul 12, 2021 at 9:03 AM Yang Wang  wrote:

> Hi Hemant,
>
> Thanks for trying the native Kubernetes integration and share your
> feedback.
>
> Pod template is a supplementary feature for those advanced features(e.g.
> volume mounts, sidecar container, init container, etc.),
> which are not supported by Flink config options[1].
>
> By default, Flink is using an empty pod template to initialize the
> JobManager/TaskManager pod, then it is decorated internally.
> For example, set the pod resources(mem, cpu), the starting commands, as
> well as the image and etc.
>
> However, not all the fields could be configured via pod template. Like the
> pod resource you mentioned, it needs to be configured
> via Flink config options. I believe you could find the detailed
> information in the documentation[2].
>
> If you want to capture and store the metrics, the prometheus reporter
> should be right direction[3].
>
> [1].
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#kubernetes
> [2].
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#fields-overwritten-by-flink
> [3].
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/metric_reporters/#prometheus
>
> Best,
> Yang
>
>
> bat man  于2021年7月11日周日 下午5:27写道:
>
>> Hi,
>>
>> I am running a POC to run flink 1.13 on Native Kubernetes. Per
>> documentation [1] this supports custom pod templates. Are there any
>> references for sample pod templates which can be used as a baseline to
>> build upon. Could not find any references, documentation[1] has one sample
>> for pod-template.yaml which is for flink-main-container.
>>
>> 1. I would like to specify the task manager/job manager memory
>> configurations from pod templates.
>> 2. Secondly, how can the metrics be captured, is it possible to use
>> service-monitor.
>>
>> [1] -
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/
>>
>> Thanks,
>> Hemant
>>
>


Flink 1.13 Native Kubernetes - Custom Pod Templates

2021-07-11 Thread bat man
Hi,

I am running a POC to run flink 1.13 on Native Kubernetes. Per
documentation [1] this supports custom pod templates. Are there any
references for sample pod templates which can be used as a baseline to
build upon. Could not find any references, documentation[1] has one sample
for pod-template.yaml which is for flink-main-container.

1. I would like to specify the task manager/job manager memory
configurations from pod templates.
2. Secondly, how can the metrics be captured, is it possible to use
service-monitor.

[1] -
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/

Thanks,
Hemant


Re: Memory Usage - Total Memory Usage on UI and Metric

2021-07-03 Thread bat man
Thanks Ken for the info.This is something which I have done when running
spark batch jobs. However, in this case I really want to understand if
there is anything wrong with the job itself. Is the flink kafka consumer or
some other piece needs more memory than I am allocating.

Hemant

On Fri, Jul 2, 2021 at 9:57 PM Ken Krugler 
wrote:

> When we run Flink jobs in EMR (typically batch, though) we disable the
> pmem (permanent memory) and vmem (virtual memory) checks.
>
> This was initially done for much older versions of Flink (1.6???), where
> the memory model wasn’t so well documented or understood by us.
>
> But I think the pmem check might still have an issue, due to Flink’s use
> of off-heap.
>
> So something like:
>
> [
> {
> "classification": "yarn-site",
> "properties": {
> "yarn.nodemanager.pmem-check-enabled": "false",
> "yarn.nodemanager.vmem-check-enabled": "false"
> }
> }
> ]
>
>
> …might help.
>
> — Ken
>
>
> On Jul 2, 2021, at 8:36 AM, bat man  wrote:
>
> Hi,
>
> I am running a streaming job (Flink 1.9) on EMR on yarn. Flink web UI or
> metrics reported from prometheus shows total memory usage within specified
> task manager memory - 3GB.
>
> Metrics shows below numbers(in MB) -
> Heap - 577
> Non Heap - 241
> DirectMemoryUsed - 852
>
> Non-heap does rise gradually, starting around 210MB and reaching 241 when
> yarn kills the container. Heap fluctuates between 1.x - .6GB,
> DirectMemoryUsed is constant at 852.
>
> Based on configurations these are the tm params from yarn logs -
> -Xms1957m -Xmx1957m -XX:MaxDirectMemorySize=1115m
>
> These are other params as configuration in flink-conf
> yarn-cutoff - 270MB
> Managed memory - 28MB
> Network memory - 819MB
>
> Above memory values are from around the same time the container is killed
> by yarn for -  is running beyond physical memory limits.
>
> Is there anything else which is not reported by flink in metrics or I have
> been misinterpreting as seen from above total memory consumed is below -
> 3GB.
>
> Same behavior is reported when I have run the job with 2GB, 2.7GB and now
> 3GB task mem. My job does have shuffles as data from one operator is sent
> to 4 other operators after filtering.
>
> One more thing is I am running this with 3 yarn containers(2 tasks in each
> container), total parallelism as 6. As soon as one container fails with
> this error, the job re-starts. However, within minutes other 2 containers
> also fail with the same error one by one.
>
> Thanks,
> Hemant
>
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>


Memory Usage - Total Memory Usage on UI and Metric

2021-07-02 Thread bat man
Hi,

I am running a streaming job (Flink 1.9) on EMR on yarn. Flink web UI or
metrics reported from prometheus shows total memory usage within specified
task manager memory - 3GB.

Metrics shows below numbers(in MB) -
Heap - 577
Non Heap - 241
DirectMemoryUsed - 852

Non-heap does rise gradually, starting around 210MB and reaching 241 when
yarn kills the container. Heap fluctuates between 1.x - .6GB,
DirectMemoryUsed is constant at 852.

Based on configurations these are the tm params from yarn logs -
-Xms1957m -Xmx1957m -XX:MaxDirectMemorySize=1115m

These are other params as configuration in flink-conf
yarn-cutoff - 270MB
Managed memory - 28MB
Network memory - 819MB

Above memory values are from around the same time the container is killed
by yarn for -  is running beyond physical memory limits.

Is there anything else which is not reported by flink in metrics or I have
been misinterpreting as seen from above total memory consumed is below -
3GB.

Same behavior is reported when I have run the job with 2GB, 2.7GB and now
3GB task mem. My job does have shuffles as data from one operator is sent
to 4 other operators after filtering.

One more thing is I am running this with 3 yarn containers(2 tasks in each
container), total parallelism as 6. As soon as one container fails with
this error, the job re-starts. However, within minutes other 2 containers
also fail with the same error one by one.

Thanks,
Hemant


Re: Read kafka offsets from checkpoint - state processor

2021-05-07 Thread bat man
Anyone who has tried this or can help on this.

Thanks.

On Thu, May 6, 2021 at 10:34 AM bat man  wrote:

> Hi Users,
>
> Is there a way that Flink 1.9 the checkpointed data can be read using the
> state processor api.
> Docs [1] says - When reading operator state, users specify the operator
> uid, the state name, and the type information.
>
> What is the type for the kafka operator, which needs to be specified while
> reading the state.
>
> [1] -
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/libs/state_processor_api/
>
> Thanks,
> Hemant
>


Read kafka offsets from checkpoint - state processor

2021-05-05 Thread bat man
Hi Users,

Is there a way that Flink 1.9 the checkpointed data can be read using the
state processor api.
Docs [1] says - When reading operator state, users specify the operator
uid, the state name, and the type information.

What is the type for the kafka operator, which needs to be specified while
reading the state.

[1] -
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/libs/state_processor_api/

Thanks,
Hemant


Re: Get consumed Kafka offsets from Flink kafka source

2021-04-14 Thread bat man
Thanks Piotrek for the references.

Cheers.
Hemant

On Wed, Apr 14, 2021 at 7:18 PM Piotr Nowojski  wrote:

> Hi,
>
> Depending how you configured your FlinkKafkaSource, but you can make the
> source to commit consumed offsets back to Kafka. So one way to examine
> them, would be to check those offsets in Kafka (I don't know how, but I'm
> pretty sure there is a way to do it).
>
> Secondly, if you want to examine Flink's checkpoint state you can use
> State Processor API to do that [1]. As far as I know you could hook up your
> checkpointed data to Table API/SQL and use SQL to query/analyse the state.
>
> Best
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> śr., 14 kwi 2021 o 11:25 bat man  napisał(a):
>
>> Hi All,
>>
>> Is there any way I can inspect/query the checkpointed data. Scenario is
>> like this -
>>
>> We have a high volume of data coming in the data stream pipeline for
>> which kafka is source, in case if fails bcoz of bad data I want to
>> analyse the data which caused the issue. It could be that some data source
>> starts sending bad data so I want to go in kafka to that particular offset
>> and do some analysis before I start the job with checkpointed data.
>>
>> Can anyone suggest how this can be achieved.
>>
>> Thanks,
>> Hemant
>>
>>
>>


Get consumed Kafka offsets from Flink kafka source

2021-04-14 Thread bat man
Hi All,

Is there any way I can inspect/query the checkpointed data. Scenario is
like this -

We have a high volume of data coming in the data stream pipeline for which
kafka is source, in case if fails bcoz of bad data I want to analyse the
data which caused the issue. It could be that some data source starts
sending bad data so I want to go in kafka to that particular offset and do
some analysis before I start the job with checkpointed data.

Can anyone suggest how this can be achieved.

Thanks,
Hemant


Re: flink cdc postgres connector 1.1 - heartbeat.interval.ms - WAL consumption

2021-04-08 Thread bat man
Thanks Till.

Hi Jark,

Any inputs, going through the code of 1.1 and 1.3 in the meantime.

Thanks,
Hemant

On Thu, Apr 8, 2021 at 3:52 PM Till Rohrmann  wrote:

> Hi Hemant,
>
> I am pulling in Jark who is most familiar with Flink's cdc connector. He
> might also be able to tell whether the fix can be backported.
>
> Cheers,
> Till
>
> On Thu, Apr 8, 2021 at 10:42 AM bat man  wrote:
>
>> Anyone who has faced similar issues with cdc with Postgres.
>>
>> I see the restart_lsn and confirmed_flush_lsn constant since the snapshot
>> replication records were streamed even though I have tried inserting
>> a record in the whitelisted table.
>>
>> select * from pg_replication_slots;
>>   slot_name  |  plugin  | slot_type | datoid | database | temporary |
>> active | active_pid | xmin | catalog_xmin | restart_lsn |
>> confirmed_flush_lsn
>>
>> -+--+---++--+---+++--+--+-+-
>>  stream_cdc3 | pgoutput | logical   |  16411 | test_cdc | f | t
>>|   1146 |  | 6872 | 62/34000828 | 62/34000860
>>
>> I have passed the  heartbeat.interval.ms = 1000 and could see
>> the heartbeat events streamed to flink however the transaction log disk
>> usage and oldest replication slot lag consistently increasing. From [1] I
>> have also tried this -
>>
>> For other decoder plug-ins, it is recommended to create a supplementary
>> table that is not monitored by Debezium.
>>
>> A separate process would then periodically update the table (either
>> inserting a new event or updating the same row all over). PostgreSQL then
>> will invoke Debezium which will confirm the latest LSN and allow the
>> database to reclaim the WAL space.
>>
>> [image: Screenshot 2021-04-08 at 2.07.18 PM.png]
>>
>> [image: Screenshot 2021-04-08 at 2.07.52 PM.png]
>>
>> [1] -
>> https://debezium.io/documentation/reference/1.0/connectors/postgresql.html#wal-disk-space
>>
>> Thanks.
>>
>> On Wed, Apr 7, 2021 at 12:51 PM bat man  wrote:
>>
>>> Hi there,
>>>
>>> I am using flink 1.11 and cdc connector 1.1 to stream changes from a
>>> postgres table. I see the WAL consumption is increasing gradually even
>>> though the writes to tables are very less.
>>>
>>> I am using AWS RDS, from [1] I understand that setting the parameter
>>> heartbeat.interval.ms solves this WAL consumption issue. However, I
>>> tried setting this with no success.
>>>
>>> I found a bug [2] which seems to be taking care of committing the lsn
>>> for the db to release the wal. however this seems to be only fixed in 1.3
>>> which is compatible with flink 1.12.1. Is there any way this can be fixed
>>> in 1.11.1. Since I am using EMR and the latest flink version available is
>>> 1.11.
>>>
>>>
>>> [1] -
>>> https://debezium.io/documentation/reference/connectors/postgresql.html
>>> [2] - https://github.com/ververica/flink-cdc-connectors/issues/97
>>>
>>> Thanks.
>>> Hemant
>>>
>>


Re: flink cdc postgres connector 1.1 - heartbeat.interval.ms - WAL consumption

2021-04-08 Thread bat man
Anyone who has faced similar issues with cdc with Postgres.

I see the restart_lsn and confirmed_flush_lsn constant since the snapshot
replication records were streamed even though I have tried inserting
a record in the whitelisted table.

select * from pg_replication_slots;
  slot_name  |  plugin  | slot_type | datoid | database | temporary |
active | active_pid | xmin | catalog_xmin | restart_lsn |
confirmed_flush_lsn
-+--+---++--+---+++--+--+-+-
 stream_cdc3 | pgoutput | logical   |  16411 | test_cdc | f | t
 |   1146 |  | 6872 | 62/34000828 | 62/34000860

I have passed the  heartbeat.interval.ms = 1000 and could see the heartbeat
events streamed to flink however the transaction log disk usage and oldest
replication slot lag consistently increasing. From [1] I have also tried
this -

For other decoder plug-ins, it is recommended to create a supplementary
table that is not monitored by Debezium.

A separate process would then periodically update the table (either
inserting a new event or updating the same row all over). PostgreSQL then
will invoke Debezium which will confirm the latest LSN and allow the
database to reclaim the WAL space.

[image: Screenshot 2021-04-08 at 2.07.18 PM.png]

[image: Screenshot 2021-04-08 at 2.07.52 PM.png]

[1] -
https://debezium.io/documentation/reference/1.0/connectors/postgresql.html#wal-disk-space

Thanks.

On Wed, Apr 7, 2021 at 12:51 PM bat man  wrote:

> Hi there,
>
> I am using flink 1.11 and cdc connector 1.1 to stream changes from a
> postgres table. I see the WAL consumption is increasing gradually even
> though the writes to tables are very less.
>
> I am using AWS RDS, from [1] I understand that setting the parameter
> heartbeat.interval.ms solves this WAL consumption issue. However, I tried
> setting this with no success.
>
> I found a bug [2] which seems to be taking care of committing the lsn for
> the db to release the wal. however this seems to be only fixed in 1.3 which
> is compatible with flink 1.12.1. Is there any way this can be fixed in
> 1.11.1. Since I am using EMR and the latest flink version available is 1.11.
>
>
> [1] -
> https://debezium.io/documentation/reference/connectors/postgresql.html
> [2] - https://github.com/ververica/flink-cdc-connectors/issues/97
>
> Thanks.
> Hemant
>


flink cdc postgres connector 1.1 - heartbeat.interval.ms - WAL consumption

2021-04-07 Thread bat man
Hi there,

I am using flink 1.11 and cdc connector 1.1 to stream changes from a
postgres table. I see the WAL consumption is increasing gradually even
though the writes to tables are very less.

I am using AWS RDS, from [1] I understand that setting the parameter
heartbeat.interval.ms solves this WAL consumption issue. However, I tried
setting this with no success.

I found a bug [2] which seems to be taking care of committing the lsn for
the db to release the wal. however this seems to be only fixed in 1.3 which
is compatible with flink 1.12.1. Is there any way this can be fixed in
1.11.1. Since I am using EMR and the latest flink version available is 1.11.


[1] - https://debezium.io/documentation/reference/connectors/postgresql.html
[2] - https://github.com/ververica/flink-cdc-connectors/issues/97

Thanks.
Hemant


io.netty.channel.epoll.EpollEventLoopGroup cannot be cast to io.netty.channel.MultithreadEventLoopGroup

2021-03-10 Thread bat man
Hi,

I am using this library -* jasync-postgresql* [1] for async calls to
postgres in asyncio operator. I am using running Flink 1.9 on EMR.
I am facing this error -

Caused by: java.lang.ClassCastException:
io.netty.channel.epoll.EpollEventLoopGroup cannot be cast to
io.netty.channel.MultithreadEventLoopGroup
at
com.github.jasync.sql.db.util.NettyUtils$DefaultEventLoopGroup$2.invoke(NettyUtils.kt:32)
at
com.github.jasync.sql.db.util.NettyUtils$DefaultEventLoopGroup$2.invoke(NettyUtils.kt:19)
at kotlin.SynchronizedLazyImpl.getValue(LazyJVM.kt:74)
at
com.github.jasync.sql.db.util.NettyUtils.getDefaultEventLoopGroup(NettyUtils.kt)
at com.github.jasync.sql.db.Configuration.(Configuration.kt:55)
at com.github.jasync.sql.db.Configuration.(Configuration.kt)
at
com.flink.poc.inventory.AsyncJdbcRequest$AsyncDatabaseRequest.open(AsyncJdbcRequest.java:50)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:167)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)

I tried setting taskmanager.network.netty.transport: epoll still no
difference. Is this because of the netty version used by flink and the
library, this library uses -
NETTY_VERSION=4.1.49.Final

[1] - https://github.com/jasync-sql/jasync-sql


Thanks,
Hemant


Re: java options to generate heap dump in EMR not working

2021-03-08 Thread bat man
Issue was with double quotes around the Java options. This worked -

env.java.opts: -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/tmp/dump.hprof

On Mon, 8 Mar 2021 at 12:02 PM, Yun Gao  wrote:

> Hi,
>
> I tried with the standalone session (sorry I do not have a yarn cluster in
> hand) and it seems that
> the flink cluster could startup normally. Could you check the log of
> NodeManager to see the detail
> reason that the container does not get launched? Also have you check if
> there are some spell error
> or some unexpected special white space character for the configuration ?
>
> For the case of configuring `env.java.opts`, it seems the JobManager also
> could not be launched with
> this configuration.
>
> Best,
> Yun
>
> ----------Original Mail --
> *Sender:*bat man 
> *Send Date:*Sat Mar 6 16:03:06 2021
> *Recipients:*user 
> *Subject:*java options to generate heap dump in EMR not working
>
> Hi,
>>
>> I am trying to generate a heap dump to debug a GC overhead OOM. For that
>> I added the below java options in flink-conf.yaml, however after adding
>> this the yarn is not able to launch the containers. The job logs show it
>> goes on requesting for containers from yarn and it gets them, again
>> releases it. then again the same cycle continues. If I remove the option
>> from flink-conf.yaml then the containers are launched and the job starts
>> processing.
>>
>>
>> *env.java.opts.taskmanager: "-XX:+HeapDumpOnOutOfMemoryError
>> -XX:HeapDumpPath=/tmp/dump.hprof"*
>>
>> If I try this then yarn client does not comes up -
>>
>>
>> *env.java.opts: "-XX:+HeapDumpOnOutOfMemoryError
>> -XX:HeapDumpPath=/tmp/dump.hprof"*
>>
>> Am I doing anything wrong here?
>>
>> PS: I am using EMR.
>>
>> Thanks,
>> Hemant
>>
>


Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

2021-03-08 Thread bat man
The Java options should not have the double quotes. That was the issue. I
was able to generate the heap dump. based on the dump have made some
changes in the code to fix this issue.

This worked -

env.java.opts: -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/tmp/dump.hprof

Thanks.

On Mon, 8 Mar 2021 at 7:48 AM, Xintong Song  wrote:

> Hi Hemant,
> I don't see any problem in your settings. Any exceptions suggesting why TM
> containers are not coming up?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Sat, Mar 6, 2021 at 3:53 PM bat man  wrote:
>
>> Hi Xintong Song,
>> I tried using the java options to generate heap dump referring to docs[1]
>> in flink-conf.yaml, however after adding this the task manager containers
>> are not coming up. Note that I am using EMR. Am i doing anything wrong here?
>>
>> env.java.opts: "-XX:+HeapDumpOnOutOfMemoryError
>> -XX:HeapDumpPath=/tmp/dump.hprof"
>>
>> Thanks,
>> Hemant
>>
>>
>>
>>
>>
>> On Fri, Mar 5, 2021 at 3:05 PM Xintong Song 
>> wrote:
>>
>>> Hi Hemant,
>>>
>>> This exception generally suggests that JVM is running out of heap
>>> memory. Per the official documentation [1], the amount of live data barely
>>> fits into the Java heap having little free space for new allocations.
>>>
>>> You can try to increase the heap size following these guides [2].
>>>
>>> If a memory leak is suspected, to further understand where the memory is
>>> consumed, you may need to dump the heap on OOMs and looking for unexpected
>>> memory usages leveraging profiling tools.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>> [1]
>>> https://docs.oracle.com/javase/8/docs/technotes/guides/troubleshoot/memleaks002.html
>>>
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup.html
>>>
>>>
>>>
>>> On Fri, Mar 5, 2021 at 4:24 PM bat man  wrote:
>>>
>>>> Hi,
>>>>
>>>> Getting the below OOM but the job failed 4-5 times and recovered from
>>>> there.
>>>>
>>>> j
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *ava.lang.Exception: java.lang.OutOfMemoryError: GC overhead limit
>>>> exceededat
>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
>>>>   at
>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
>>>>   at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>>>>   at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>>>>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>> at java.lang.Thread.run(Thread.java:748)Caused by:
>>>> java.lang.OutOfMemoryError: GC overhead limit exceeded*
>>>>
>>>> Is there any way I can debug this. since the job after a few re-starts
>>>> started running fine. what could be the reason behind this.
>>>>
>>>> Thanks,
>>>> Hemant
>>>>
>>>


java options to generate heap dump in EMR not working

2021-03-06 Thread bat man
Hi,

I am trying to generate a heap dump to debug a GC overhead OOM. For that I
added the below java options in flink-conf.yaml, however after adding this
the yarn is not able to launch the containers. The job logs show it goes on
requesting for containers from yarn and it gets them, again releases it.
then again the same cycle continues. If I remove the option from
flink-conf.yaml then the containers are launched and the job starts
processing.


*env.java.opts.taskmanager: "-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/tmp/dump.hprof"*

If I try this then yarn client does not comes up -


*env.java.opts: "-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/tmp/dump.hprof"*

Am I doing anything wrong here?

PS: I am using EMR.

Thanks,
Hemant


Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

2021-03-05 Thread bat man
Hi Xintong Song,
I tried using the java options to generate heap dump referring to docs[1]
in flink-conf.yaml, however after adding this the task manager containers
are not coming up. Note that I am using EMR. Am i doing anything wrong here?

env.java.opts: "-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/tmp/dump.hprof"

Thanks,
Hemant





On Fri, Mar 5, 2021 at 3:05 PM Xintong Song  wrote:

> Hi Hemant,
>
> This exception generally suggests that JVM is running out of heap memory.
> Per the official documentation [1], the amount of live data barely fits
> into the Java heap having little free space for new allocations.
>
> You can try to increase the heap size following these guides [2].
>
> If a memory leak is suspected, to further understand where the memory is
> consumed, you may need to dump the heap on OOMs and looking for unexpected
> memory usages leveraging profiling tools.
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://docs.oracle.com/javase/8/docs/technotes/guides/troubleshoot/memleaks002.html
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup.html
>
>
>
> On Fri, Mar 5, 2021 at 4:24 PM bat man  wrote:
>
>> Hi,
>>
>> Getting the below OOM but the job failed 4-5 times and recovered from
>> there.
>>
>> j
>>
>>
>>
>>
>>
>>
>>
>> *ava.lang.Exception: java.lang.OutOfMemoryError: GC overhead limit
>> exceededat
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
>>   at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at java.lang.Thread.run(Thread.java:748)Caused by:
>> java.lang.OutOfMemoryError: GC overhead limit exceeded*
>>
>> Is there any way I can debug this. since the job after a few re-starts
>> started running fine. what could be the reason behind this.
>>
>> Thanks,
>> Hemant
>>
>


java.lang.OutOfMemoryError: GC overhead limit exceeded

2021-03-05 Thread bat man
Hi,

Getting the below OOM but the job failed 4-5 times and recovered from there.

j







*ava.lang.Exception: java.lang.OutOfMemoryError: GC overhead limit
exceededat
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
  at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
  at
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
  at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)Caused by:
java.lang.OutOfMemoryError: GC overhead limit exceeded*

Is there any way I can debug this. since the job after a few re-starts
started running fine. what could be the reason behind this.

Thanks,
Hemant


Re: Watermark doesn't progress after job restore from savepoint

2021-03-04 Thread bat man
Thanks Piotr. Got it. Had to push the static rules to the kafka queue as it
had expired and got archived from the topic. Post this the pipeline
resumed.
To your suggestion on implementing an operator that remembers the
watermark, is there any indicator that the job has been resumed which I can
use to emit the watermark in case the job has been resumed from savepoint.

On Thu, Mar 4, 2021 at 8:46 PM Piotr Nowojski  wrote:

> Hi Hemant,
>
> State of the latest seen watermarks is not persisted in the operators.
> Currently DataStream API assumes that after recovery watermarks are going
> to be re-emitted sooner or later. What probably happens is that one of your
> sources has emitted watermarks (maybe some very high one or even
> `MAX_WATERMARK`) before taking a savepoint, and then it stopped emitting
> them. As long as the job is not restarted, this watermark is kept in
> memory. However after recovery, all watermarks in the operators are set to
> MIN_WATERMARK (-9223372036854775808), and in your case, probably one of the
> inputs `KeyedCoProcessFunction` watermark is never updated after the
> recovery (for multiple input operators/functions combined watermark is min
> from all of the inputs).
>
> You would need to make sure in one way or another that the watermarks are
> being emitted after the recovery. As a last resort, you could probably
> implement an operator that remembers the last checkpointed watermark on
> its state, and re-emits it upon recovery.
>
> Best,
> Piotrek
>
> czw., 4 mar 2021 o 15:43 bat man  napisał(a):
>
>> Hi All,
>>
>> I have a job where my source is kafka. Stream1 is partition the data on
>> dynamic key, join the data with static rules(source kafka).I use
>> KeyedCoProcessFunction to join the Steam1 with Stream2(source kafka).
>> All works fine in a normal run.
>>
>> For changing the watermark generation interval I stop the job taking a
>> savepoint. When I restart the job with the savepoint the watermark is stuck
>> at - -9223372036854775808.
>> Because of this the process function doesn't emit any results.
>>
>> What could be the problem?
>>
>> Thanks,
>> Hemant
>>
>


Watermark doesn't progress after job restore from savepoint

2021-03-04 Thread bat man
Hi All,

I have a job where my source is kafka. Stream1 is partition the data on
dynamic key, join the data with static rules(source kafka).I use
KeyedCoProcessFunction to join the Steam1 with Stream2(source kafka). All
works fine in a normal run.

For changing the watermark generation interval I stop the job taking a
savepoint. When I restart the job with the savepoint the watermark is stuck
at - -9223372036854775808.
Because of this the process function doesn't emit any results.

What could be the problem?

Thanks,
Hemant


Re: BroadcastState dropped when data deleted in Kafka

2021-03-03 Thread bat man
I created a new descriptor and rulestream used it in the second process
function and this works fine.

public static final MapStateDescriptor rulesDescriptor =
new MapStateDescriptor<>(
"rules", BasicTypeInfo.INT_TYPE_INFO,
TypeInformation.of(Rule.class));

public static final MapStateDescriptor rulesDescriptor2 =

new MapStateDescriptor<>(
"rules", BasicTypeInfo.INT_TYPE_INFO,
TypeInformation.of(Rule.class));


BroadcastStream rulesStream =
rulesDataStream.broadcast(TransformDescriptors.Descriptors.rulesDescriptor);

BroadcastStream rulesStream2 =
rulesDataStream.broadcast(TransformDescriptors.Descriptors.rulesDescriptor2);


SingleOutputStreamOperator>
keyedSingleOutputStream =
rawEventStream.
connect(rulesStream).
process(new
DynamicKeyFunction()).name("keyed").uid("keyed").setParallelism(5);

SingleOutputStreamOperator rtEventDataStream =
keyedSingleOutputStream.
keyBy((keyed) -> keyed.getKey()).
connect(rulesStream2).
process(new
DynamicTransformFunction()).name("rt").uid("rt").setParallelism(5);


On Fri, Feb 26, 2021 at 3:38 PM Arvid Heise  wrote:

> Hi,
>
> I have no idea what's going on. There is no mechanism in DataStream to
> react to deleted records.
>
> Can you reproduce it locally and debug through it?
>
>
>
> On Wed, Feb 24, 2021 at 5:21 PM bat man  wrote:
>
>> Hi Arvid,
>>
>> The Flink application was not re-started. I had checked on that.
>> By adding rules to the state of process function you mean the state which
>> is local to the keyedprocess function?
>> From [1] what is being done here -
>>
>> final MapState> state = getRuntimeContext().
>> getMapState(mapStateDesc);
>>
>> state.put(ruleName, stored);
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>>
>> Thanks.
>>
>>
>> On Wed, Feb 24, 2021 at 7:52 PM Arvid Heise  wrote:
>>
>>> Could you double-check if your Flink application was restarted between
>>> Kafka topic was cleared and the time you saw that the rules have been lost?
>>>
>>> I suspect that you deleted the Kafka topic and the Flink application
>>> then failed and restarted. Upon restart it read the empty rule topic.
>>>
>>> To solve it, you probably want to add the rules to the state of your
>>> process function [1]. If you have done that, I'm a bit lost.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>>>
>>> On Wed, Feb 24, 2021 at 7:30 AM bat man  wrote:
>>>
>>>> Hi,
>>>>
>>>> This is my code below -
>>>> As mentioned earlier the rulesStream us again used in later processing.
>>>> Below you can see the rulesStream is again connected with the result stream
>>>> of the first process stream. Do you think this is the reason rules
>>>> operators state getting overridden when the data in kafka is deleted?
>>>> My question is if the data is not present in kafka then no data is read
>>>> in stream how it is updating the existing state data.
>>>>
>>>> public static final MapStateDescriptor rulesDescriptor =
>>>> new MapStateDescriptor<>(
>>>> "rules", BasicTypeInfo.INT_TYPE_INFO, 
>>>> TypeInformation.of(Rule.class));
>>>>
>>>> KafkaSource = getKafkaSource(config.get(RAW_EVENT_TOPIC));
>>>> DataStream rawEventStream = 
>>>> validateData(getRawEventStream(rawEventKafkaSource,env));
>>>>
>>>>  rulesKafkaSource = getKafkaSource(config.get(RULES_TOPIC));
>>>>  DataStream rulesDataStream = getRulesStream(rulesKafkaSource,env);
>>>>
>>>>  deviceSource = getKafkaSource(config.get(DEVICE_EVENT_TOPIC));
>>>>  DataStream deviceDataStream = getDeviceStream(deviceSource,env);
>>>>
>>>>  BroadcastStream rulesStream = 
>>>> rulesDataStream.broadcast(rulesDescriptor);
>>>>
>>>>  SingleOutputStreamOperator> 
>>>> keyedSingleOutputStream =
>>>>  rawEventStream.
>>>>  connect(rulesStream).
>>>>  process(new 
>>>> DynamicKeyFunction()).name("keyed").uid("keyed").setParallelism(5);
>>>>
>>>>  SingleOutputStreamOperator rtEventDataStream =
>&g

Re: BroadcastState dropped when data deleted in Kafka

2021-02-24 Thread bat man
Hi Arvid,

The Flink application was not re-started. I had checked on that.
By adding rules to the state of process function you mean the state which
is local to the keyedprocess function?
>From [1] what is being done here -

final MapState> state = getRuntimeContext().getMapState(
mapStateDesc);

state.put(ruleName, stored);


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html

Thanks.


On Wed, Feb 24, 2021 at 7:52 PM Arvid Heise  wrote:

> Could you double-check if your Flink application was restarted between
> Kafka topic was cleared and the time you saw that the rules have been lost?
>
> I suspect that you deleted the Kafka topic and the Flink application then
> failed and restarted. Upon restart it read the empty rule topic.
>
> To solve it, you probably want to add the rules to the state of your
> process function [1]. If you have done that, I'm a bit lost.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>
> On Wed, Feb 24, 2021 at 7:30 AM bat man  wrote:
>
>> Hi,
>>
>> This is my code below -
>> As mentioned earlier the rulesStream us again used in later processing.
>> Below you can see the rulesStream is again connected with the result stream
>> of the first process stream. Do you think this is the reason rules
>> operators state getting overridden when the data in kafka is deleted?
>> My question is if the data is not present in kafka then no data is read
>> in stream how it is updating the existing state data.
>>
>> public static final MapStateDescriptor rulesDescriptor =
>> new MapStateDescriptor<>(
>> "rules", BasicTypeInfo.INT_TYPE_INFO, 
>> TypeInformation.of(Rule.class));
>>
>> KafkaSource = getKafkaSource(config.get(RAW_EVENT_TOPIC));
>> DataStream rawEventStream = 
>> validateData(getRawEventStream(rawEventKafkaSource,env));
>>
>>  rulesKafkaSource = getKafkaSource(config.get(RULES_TOPIC));
>>  DataStream rulesDataStream = getRulesStream(rulesKafkaSource,env);
>>
>>  deviceSource = getKafkaSource(config.get(DEVICE_EVENT_TOPIC));
>>  DataStream deviceDataStream = getDeviceStream(deviceSource,env);
>>
>>  BroadcastStream rulesStream = 
>> rulesDataStream.broadcast(rulesDescriptor);
>>
>>  SingleOutputStreamOperator> 
>> keyedSingleOutputStream =
>>  rawEventStream.
>>  connect(rulesStream).
>>  process(new 
>> DynamicKeyFunction()).name("keyed").uid("keyed").setParallelism(5);
>>
>>  SingleOutputStreamOperator rtEventDataStream =
>>  keyedSingleOutputStream.
>>  keyBy((keyed) -> keyed.getKey()).
>>  connect(rulesStream).
>>  process(new 
>> DynamicTransformFunction()).name("rt").uid("rt").setParallelism(5);
>>
>>
>> On Tue, Feb 23, 2021 at 10:32 PM Khachatryan Roman <
>> khachatryan.ro...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Deletion of messages in Kafka shouldn't affect Flink state in general.
>>> Probably, some operator in your pipeline is re-reading the topic
>>> and overwrites the state, dropping what was deleted by Kafka.
>>> Could you share the code?
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Tue, Feb 23, 2021 at 7:12 AM bat man  wrote:
>>>
>>>> Hi,
>>>>
>>>> I have 2 streams one event data and the other rules. I broadcast the
>>>> rules stream and then key the data stream on event type. The connected
>>>> stream is processed thereafter.
>>>> We faced an issue where the rules data in the topic got deleted because
>>>> of Kafka retention policy.
>>>> Post this the existing rules data also got dropped in the broadcast
>>>> state and the processing stopped.
>>>>
>>>> As per my understanding the rules which were present in broadcast state
>>>> should still exist even if the data was deleted in Kafka as the rules dats
>>>> was already processed and stored in state map.
>>>>
>>>> PS: I’m reusing the rules stream as broadcast later in processing as
>>>> well. Could this be an issue?
>>>>
>>>> Thanks,
>>>> Hemant
>>>>
>>>


Re: BroadcastState dropped when data deleted in Kafka

2021-02-23 Thread bat man
Hi,

This is my code below -
As mentioned earlier the rulesStream us again used in later processing.
Below you can see the rulesStream is again connected with the result stream
of the first process stream. Do you think this is the reason rules
operators state getting overridden when the data in kafka is deleted?
My question is if the data is not present in kafka then no data is read in
stream how it is updating the existing state data.

public static final MapStateDescriptor rulesDescriptor =
new MapStateDescriptor<>(
"rules", BasicTypeInfo.INT_TYPE_INFO,
TypeInformation.of(Rule.class));

KafkaSource = getKafkaSource(config.get(RAW_EVENT_TOPIC));
DataStream rawEventStream =
validateData(getRawEventStream(rawEventKafkaSource,env));

 rulesKafkaSource = getKafkaSource(config.get(RULES_TOPIC));
 DataStream rulesDataStream = getRulesStream(rulesKafkaSource,env);

 deviceSource = getKafkaSource(config.get(DEVICE_EVENT_TOPIC));
 DataStream deviceDataStream = getDeviceStream(deviceSource,env);

 BroadcastStream rulesStream = rulesDataStream.broadcast(rulesDescriptor);

 SingleOutputStreamOperator>
keyedSingleOutputStream =
 rawEventStream.
 connect(rulesStream).
 process(new
DynamicKeyFunction()).name("keyed").uid("keyed").setParallelism(5);

 SingleOutputStreamOperator rtEventDataStream =
 keyedSingleOutputStream.
 keyBy((keyed) -> keyed.getKey()).
 connect(rulesStream).
 process(new
DynamicTransformFunction()).name("rt").uid("rt").setParallelism(5);


On Tue, Feb 23, 2021 at 10:32 PM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hi,
>
> Deletion of messages in Kafka shouldn't affect Flink state in general.
> Probably, some operator in your pipeline is re-reading the topic
> and overwrites the state, dropping what was deleted by Kafka.
> Could you share the code?
>
> Regards,
> Roman
>
>
> On Tue, Feb 23, 2021 at 7:12 AM bat man  wrote:
>
>> Hi,
>>
>> I have 2 streams one event data and the other rules. I broadcast the
>> rules stream and then key the data stream on event type. The connected
>> stream is processed thereafter.
>> We faced an issue where the rules data in the topic got deleted because
>> of Kafka retention policy.
>> Post this the existing rules data also got dropped in the broadcast state
>> and the processing stopped.
>>
>> As per my understanding the rules which were present in broadcast state
>> should still exist even if the data was deleted in Kafka as the rules dats
>> was already processed and stored in state map.
>>
>> PS: I’m reusing the rules stream as broadcast later in processing as
>> well. Could this be an issue?
>>
>> Thanks,
>> Hemant
>>
>


BroadcastState dropped when data deleted in Kafka

2021-02-22 Thread bat man
Hi,

I have 2 streams one event data and the other rules. I broadcast the rules
stream and then key the data stream on event type. The connected stream is
processed thereafter.
We faced an issue where the rules data in the topic got deleted because of
Kafka retention policy.
Post this the existing rules data also got dropped in the broadcast state
and the processing stopped.

As per my understanding the rules which were present in broadcast state
should still exist even if the data was deleted in Kafka as the rules dats
was already processed and stored in state map.

PS: I’m reusing the rules stream as broadcast later in processing as well.
Could this be an issue?

Thanks,
Hemant


Re: Tag flink metrics to job name

2021-02-19 Thread bat man
Is there a way I can look into say for a specific job what’s the cpu usage
or memory usage of the yarn containers when multiple jobs are running on
the same cluster.
Also, the issue am trying to resolve is I’m seeing high memory usage for
one of the containers I want isolate the issue with one job and then
investigate further.

Thanks,
Hemant

On Fri, 19 Feb 2021 at 12:18 PM, Chesnay Schepler 
wrote:

> No, Job-/TaskManager metrics cannot be tagged with the job name.
> The reason is that this only makes sense for application clusters (opposed
> to session clusters), but we don't differentiate between the two when it
> comes to metrics.
>
> On 2/19/2021 3:59 AM, bat man wrote:
>
> I meant the Flink jobname. I’m using the below reporter -
>
>  metrics.reporter.prom.class: 
> org.apache.flink.metrics.prometheus.PrometheusReporter
>
> Is there any way to tag job names to the task and job manager metrics.
>
> Thanks,
> Hemant
>
> On Fri, 19 Feb 2021 at 12:40 AM, Chesnay Schepler 
> wrote:
>
>> When you mean "job_name", are you referring to the Prometheus concept of
>> jobs, of the one of Flink?
>>
>> Which of Flink prometheus reporters are you using?
>>
>> On 2/17/2021 7:37 PM, bat man wrote:
>> > Hello there,
>> >
>> > I am using prometheus to push metrics to prometheus and then use
>> > grafana for visualization. There are metrics like
>> >
>> - flink_taskmanager_Status_JVM_CPU_Load, 
>> flink_taskmanager_Status_JVM_CPU_Load, flink_taskmanager_Status_JVM_CPU_Time
>>
>> > etc which do not gives job_name. It is tied to an instance.
>> > When running multiple jobs in the same yarn cluster it is possible
>> > that different jobs have yarn containers on the same instance, in this
>> > case it is very difficult to find out which instance has high CPU
>> > load, Memory usage etc.
>> >
>> > Is there a way to tag job_name to these metrics so that the metrics
>> > could be visualized per job.
>> >
>> > Thanks,
>> > Hemant
>>
>>
>>
>


Re: Tag flink metrics to job name

2021-02-18 Thread bat man
I meant the Flink jobname. I’m using the below reporter -


metrics.reporter.prom.class:
org.apache.flink.metrics.prometheus.PrometheusReporter

Is there any way to tag job names to the task and job manager metrics.

Thanks,
Hemant

On Fri, 19 Feb 2021 at 12:40 AM, Chesnay Schepler 
wrote:

> When you mean "job_name", are you referring to the Prometheus concept of
> jobs, of the one of Flink?
>
> Which of Flink prometheus reporters are you using?
>
> On 2/17/2021 7:37 PM, bat man wrote:
> > Hello there,
> >
> > I am using prometheus to push metrics to prometheus and then use
> > grafana for visualization. There are metrics like
> >
> - flink_taskmanager_Status_JVM_CPU_Load, 
> flink_taskmanager_Status_JVM_CPU_Load, flink_taskmanager_Status_JVM_CPU_Time
>
> > etc which do not gives job_name. It is tied to an instance.
> > When running multiple jobs in the same yarn cluster it is possible
> > that different jobs have yarn containers on the same instance, in this
> > case it is very difficult to find out which instance has high CPU
> > load, Memory usage etc.
> >
> > Is there a way to tag job_name to these metrics so that the metrics
> > could be visualized per job.
> >
> > Thanks,
> > Hemant
>
>
>


Tag flink metrics to job name

2021-02-17 Thread bat man
Hello there,

I am using prometheus to push metrics to prometheus and then use grafana
for visualization. There are metrics like
- flink_taskmanager_Status_JVM_CPU_Load,
flink_taskmanager_Status_JVM_CPU_Load,
flink_taskmanager_Status_JVM_CPU_Time
etc which do not gives job_name. It is tied to an instance.
When running multiple jobs in the same yarn cluster it is possible that
different jobs have yarn containers on the same instance, in this case it
is very difficult to find out which instance has high CPU load, Memory
usage etc.

Is there a way to tag job_name to these metrics so that the metrics could
be visualized per job.

Thanks,
Hemant


Re: GC overhead limit exceeded when using Prometheus exporter

2021-02-16 Thread bat man
Hi Till,

Tried increasing the task manager memory to 4GB but unfortunately EMR nodes
are going down, investigating that for now. Will share the results in case
this works out,if not then will get the heap dump.

Thanks,
Hemant

On Tue, Feb 16, 2021 at 10:45 PM Till Rohrmann  wrote:

> Hi Hemant,
>
> Have you tried running a new Flink version? Can you create a heap dump
> when the process fails? This could help us digging into whether there is
> some memory leak.
>
> Cheers,
> Till
>
> On Tue, Feb 16, 2021 at 5:21 PM bat man  wrote:
>
>> Hi there,
>>
>> I am facing *java.lang.OutOfMemoryError: GC overhead limit exceeded *when
>> using prometheus exporter with* Flink 1.9 *on *AWS EMR *emr-5.28.1. I
>> have other jobs which run fine. tihs specific job fails with the below
>> error stack.
>>
>> Exception in thread "pool-3-thread-2" java.lang.OutOfMemoryError: GC
>> overhead limit exceeded
>> at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:133)
>> at java.io.OutputStreamWriter.write(OutputStreamWriter.java:220)
>> at java.io.Writer.write(Writer.java:157)
>> at
>> org.apache.flink.shaded.io.prometheus.client.exporter.common.TextFormat.write004(TextFormat.java:40)
>> at
>> org.apache.flink.shaded.io.prometheus.client.exporter.HTTPServer$HTTPMetricHandler.handle(HTTPServer.java:59)
>> at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79)
>> at sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:83)
>> at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:82)
>> at
>> sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(ServerImpl.java:675)
>> at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79)
>> at sun.net.httpserver.ServerImpl$Exchange.run(ServerImpl.java:647)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> Thanks,
>> Hemant
>>
>


GC overhead limit exceeded when using Prometheus exporter

2021-02-16 Thread bat man
Hi there,

I am facing *java.lang.OutOfMemoryError: GC overhead limit exceeded *when
using prometheus exporter with* Flink 1.9 *on *AWS EMR *emr-5.28.1. I have
other jobs which run fine. tihs specific job fails with the below error
stack.

Exception in thread "pool-3-thread-2" java.lang.OutOfMemoryError: GC
overhead limit exceeded
at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:133)
at java.io.OutputStreamWriter.write(OutputStreamWriter.java:220)
at java.io.Writer.write(Writer.java:157)
at
org.apache.flink.shaded.io.prometheus.client.exporter.common.TextFormat.write004(TextFormat.java:40)
at
org.apache.flink.shaded.io.prometheus.client.exporter.HTTPServer$HTTPMetricHandler.handle(HTTPServer.java:59)
at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79)
at sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:83)
at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:82)
at
sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(ServerImpl.java:675)
at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79)
at sun.net.httpserver.ServerImpl$Exchange.run(ServerImpl.java:647)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Thanks,
Hemant


Re: Flink app logs to Elastic Search

2021-01-14 Thread bat man
I was able to make it work with a fresh Elastic installation. Now
taskmanager and jobmanager logs are available in elastic.
Thanks for the pointers.

-Hemant.

On Wed, Jan 13, 2021 at 6:21 PM Aljoscha Krettek 
wrote:

> On 2021/01/11 01:29, bat man wrote:
> >Yes, no entries to the elastic search. No indices were created in elastic.
> >Jar is getting picked up which I can see from yarn logs. Pre-defined text
> >based logging is also available.
>
> Hmm, I can't imagine much that could go wrong. Maybe there is some
> interference from other configuration files. Could you try and make sure
> that you only have the configuration and logging system in the classpath
> that you want to use?
>
> Best,
> Aljoscha
>


Re: Main class logs in Yarn Mode

2021-01-12 Thread bat man
Thanks Yangze Gua.
Is there a way these can be redirected to a yarn logs.

On Tue, 12 Jan 2021 at 2:35 PM, Yangze Guo  wrote:

> The main function of your WordCountExample is executed in your local
> environment. So, the logs you are looking for ("Entering
> application.") are be located in your console output and the "log/"
> directory of your Flink distribution.
>
> Best,
> Yangze Guo
>
> On Tue, Jan 12, 2021 at 4:50 PM bat man  wrote:
> >
> > Hi,
> >
> > I am running a sample job as below -
> >
> > public class WordCountExample {
> > static Logger logger = LoggerFactory.getLogger(WordCountExample.class);
> >
> > public static void main(String[] args) throws Exception {
> > final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> >
> > logger.info("Entering application.");
> >
> > DataSet text = env.fromElements(
> > "Who's there?",
> > "I think I hear them. Stand, ho! Who's there?");
> >
> > List elements = new ArrayList();
> > elements.add(0);
> >
> >
> > DataSet set = env.fromElements(new TestClass(elements));
> >
> > DataSet> wordCounts = text
> > .flatMap(new LineSplitter())
> > .withBroadcastSet(set, "set")
> > .groupBy(0)
> > .sum(1);
> >
> > wordCounts.print();
> >
> > logger.info("Processing done");
> >
> > //env.execute("wordcount job complete");
> >
> > }
> >
> > public static class LineSplitter implements FlatMapFunction Tuple2> {
> >
> > static Logger loggerLineSplitter =
> LoggerFactory.getLogger(LineSplitter.class);
> >
> > @Override
> > public void flatMap(String line, Collector> out)
> {
> > loggerLineSplitter.info("Logger in LineSplitter.flatMap");
> > for (String word : line.split(" ")) {
> > out.collect(new Tuple2(word, 1));
> > }
> > }
> > }
> >
> > public static class TestClass implements Serializable {
> > private static final long serialVersionUID = -2932037991574118651L;
> >
> > static Logger loggerTestClass =
> LoggerFactory.getLogger("WordCountExample.TestClass");
> >
> > List integerList;
> > public TestClass(List integerList){
> > this.integerList=integerList;
> > loggerTestClass.info("Logger in TestClass");
> > }
> >
> >
> > }
> > }
> >
> > When run in IDE I can see the logs from main class i.e. statements like
> below in console logs -
> >
> > 13:40:24.459 [main] INFO  com.flink.transform.WordCountExample -
> Entering application.
> > 13:40:24.486 [main] INFO  WordCountExample.TestClass - Logger in
> TestClass
> >
> >
> > When run on Yarn with command - flink run -m yarn-cluster  -c
> com.flink.transform.WordCountExample rt-1.0-jar-with-dependencies.jar
> >
> > I only see the flatmap logging statements like -
> > INFO  com.flink.transform.WordCountExample$LineSplitter - Logger in
> LineSplitter.flatMap
> > INFO  com.flink.transform.WordCountExample$LineSplitter - Logger in
> LineSplitter.flatMap
> >
> > I have checked the jobmanager and taskmanager logs from yarn in EMR.
> >
> > This is my log4j.properties from EMR cluster
> >
> > log4j.rootLogger=INFO,file,elastic
> >
> > # Config ES logging appender
> >
> log4j.appender.elastic=com.letfy.log4j.appenders.ElasticSearchClientAppender
> > log4j.appender.elastic.elasticHost=http://<>:9200
> > log4j.appender.elastic.hostName=<>
> > log4j.appender.elastic.applicationName=<>
> >
> > # more options (see github project for the full list)
> > log4j.appender.elastic.elasticIndex=<>
> > log4j.appender.elastic.elasticType=<>
> >
> > # Log all infos in the given file
> > log4j.appender.file=org.apache.log4j.FileAppender
> > log4j.appender.file.file=${log.file}
> > log4j.appender.file.append=false
> > log4j.appender.file.layout=org.apache.log4j.PatternLayout
> > log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS}
> %-5p %-60c %x - %m%n
> >
> > # suppress the irrelevant (wrong) warnings from the netty channel handler
> > log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,file
> >
> >
> > How can I access main driver logs when run on yarn as master.
> >
> > Thanks,
> > Hemant
> >
> >
> >
> >
>


Main class logs in Yarn Mode

2021-01-12 Thread bat man
Hi,

I am running a sample job as below -

public class WordCountExample {
static Logger logger = LoggerFactory.getLogger(WordCountExample.class);

public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.
getExecutionEnvironment();

logger.info("Entering application.");

DataSet text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?");

List elements = new ArrayList();
elements.add(0);


DataSet set = env.fromElements(new TestClass(elements));

DataSet> wordCounts = text
.flatMap(new LineSplitter())
.withBroadcastSet(set, "set")
.groupBy(0)
.sum(1);

wordCounts.print();

logger.info("Processing done");

//env.execute("wordcount job complete");

}

public static class LineSplitter implements FlatMapFunction> {

static Logger loggerLineSplitter = LoggerFactory.getLogger(LineSplitter.
class);

@Override
public void flatMap(String line, Collector> out) {
loggerLineSplitter.info("Logger in LineSplitter.flatMap");
for (String word : line.split(" ")) {
out.collect(new Tuple2(word, 1));
}
}
}

public static class TestClass implements Serializable {
private static final long serialVersionUID = -2932037991574118651L;

static Logger loggerTestClass = LoggerFactory.getLogger(
"WordCountExample.TestClass");

List integerList;
public TestClass(List integerList){
this.integerList=integerList;
loggerTestClass.info("Logger in TestClass");
}


}
}

When run in IDE I can see the logs from main class i.e. statements like
below in console logs -

13:40:24.459 [main] INFO  com.flink.transform.WordCountExample - Entering
application.
13:40:24.486 [main] INFO  WordCountExample.TestClass - Logger in TestClass


When run on Yarn with command - flink run -m yarn-cluster  -c
com.flink.transform.WordCountExample rt-1.0-jar-with-dependencies.jar

I only see the flatmap logging statements like -
INFO  com.flink.transform.WordCountExample$LineSplitter - Logger in
LineSplitter.flatMap
INFO  com.flink.transform.WordCountExample$LineSplitter - Logger in
LineSplitter.flatMap

I have checked the jobmanager and taskmanager logs from yarn in EMR.

This is my log4j.properties from EMR cluster

log4j.rootLogger=INFO,file,elastic

# Config ES logging appender
log4j.appender.elastic=com.letfy.log4j.appenders.ElasticSearchClientAppender
log4j.appender.elastic.elasticHost=http://<>:9200
log4j.appender.elastic.hostName=<>
log4j.appender.elastic.applicationName=<>

# more options (see github project for the full list)
log4j.appender.elastic.elasticIndex=<>
log4j.appender.elastic.elasticType=<>

# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS}
%-5p %-60c %x - %m%n

# suppress the irrelevant (wrong) warnings from the netty channel handler
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,file


How can I access main driver logs when run on yarn as master.

Thanks,
Hemant


Flink app logs to Elastic Search

2021-01-07 Thread bat man
Hi Team,

I have a requirement to push the flink app logs to Elastic Search for log
management. Can anyone guide if you are already doing this.
I have tried this -
https://cristian.io/post/flink-log4j/
I’m not getting any error for a sample job I tried.

I am using EMR to run Flink 1.9 and Elastic Search latest version running
on ec2 machine.

Thanks,
Hemant


Re: Pushing metrics to Influx from Flink 1.9 on AWS EMR(5.28)

2020-08-14 Thread bat man
Hello Arvid,

Thanks I’ll check my config and use the correct reporter and test it out.

Thanks,
Hemant

On Fri, 14 Aug 2020 at 6:57 PM, Arvid Heise  wrote:

> Hi Hemant,
>
> according to the influx section of the 1.9 metric documentation [1], you
> should use the reporter without a factory. The factory was added later.
>
> metrics.reporter.influxdb.class: 
> org.apache.flink.metrics.influxdb.InfluxdbReportermetrics.reporter.influxdb.host:
>  localhostmetrics.reporter.influxdb.port: 8086metrics.reporter.influxdb.db: 
> flinkmetrics.reporter.influxdb.username: 
> flink-metricsmetrics.reporter.influxdb.password: 
> qwertymetrics.reporter.influxdb.retentionPolicy: one_hour
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#influxdb-orgapacheflinkmetricsinfluxdbinfluxdbreporter
>
> On Thu, Aug 13, 2020 at 8:10 AM bat man  wrote:
>
>> Anyone who has made metrics integration to external systems for flink
>> running on AWS EMR, can you share if its a configuration issue or EMR
>> specific issue.
>>
>> Thanks,
>> Hemant
>>
>> On Wed, Aug 12, 2020 at 9:55 PM bat man  wrote:
>>
>>> An update in the yarn logs I could see the below -
>>>
>>> Classpath:
>>> *lib/flink-metrics-influxdb-1.9.0.jar:lib/flink-shaded-hadoop-2-uber-2.8.5-amzn-5-7.0.jar:lib/flink-table-blink_2.11-1.9.0.jar:lib/flink-table_2.11-1.9.0.jar:lib/log4j-1.2.17.jar:lib/slf4j-log4j12-1.7.15.jar:log4j.properties:plugins/influxdb/flink-metrics-influxdb-1.9.0.jar*
>>> *..*
>>> *..*
>>>
>>> This means the jar is getting loaded, in the logs I could also see -
>>> 2020-08-12 15:28:51,505 INFO
>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - Registered
>>> UNIX signal handlers for [TERM, HUP, I
>>> NT]
>>> 2020-08-12 15:28:51,508 INFO
>>>  org.apache.flink.yarn.YarnTaskExecutorRunner  - Current
>>> working Directory: /mnt/yarn/usercache/ha
>>>
>>> doop/appcache/application_1595767096609_0013/container_1595767096609_0013_01_04
>>>
>>> *2020-08-12 15:28:51,512 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: metrics.reporter.influxdb.interval, 60 SECONDS*
>>>
>>> *2020-08-12 15:28:51,512 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: env.yarn.conf.dir, /etc/hadoop/conf*
>>> 2020-08-12 15:28:51,513 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: metrics.reporter.
>>> influxdb.host, xx.xxx.xxx.xx
>>> 2020-08-12 15:28:51,513 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: high-availability
>>> .cluster-id, application_1595767096609_0013
>>> 2020-08-12 15:28:51,513 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: jobmanager.rpc.ad
>>> dress, ip-xx-x-xx-xxx.ec2.internal
>>> 2020-08-12 15:28:51,513 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: metrics.reporter.
>>> influxdb.password, **
>>>
>>> *2020-08-12 15:28:51,513 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: FLINK_PLUGINS_DIR, /usr/lib/flink/plugins*
>>> 2020-08-12 15:28:51,513 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: metrics.reporter.
>>> influxdb.db, xx
>>> 2020-08-12 15:28:51,520 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: metrics.reporter.
>>> influxdb.connectTimeout, 6
>>> 2020-08-12 15:28:51,520 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: env.hadoop.conf.d
>>> ir, /etc/hadoop/conf
>>> 2020-08-12 15:28:51,521 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: taskmanager.numbe
>>> rOfTaskSlots, 1
>>> 2020-08-12 15:28:51,521 INFO
>>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: web.port, 0
>>> 2020-08-12 15:28:51,521 INFO
>>>  org.apache.flink

Re: Pushing metrics to Influx from Flink 1.9 on AWS EMR(5.28)

2020-08-13 Thread bat man
Anyone who has made metrics integration to external systems for flink
running on AWS EMR, can you share if its a configuration issue or EMR
specific issue.

Thanks,
Hemant

On Wed, Aug 12, 2020 at 9:55 PM bat man  wrote:

> An update in the yarn logs I could see the below -
>
> Classpath:
> *lib/flink-metrics-influxdb-1.9.0.jar:lib/flink-shaded-hadoop-2-uber-2.8.5-amzn-5-7.0.jar:lib/flink-table-blink_2.11-1.9.0.jar:lib/flink-table_2.11-1.9.0.jar:lib/log4j-1.2.17.jar:lib/slf4j-log4j12-1.7.15.jar:log4j.properties:plugins/influxdb/flink-metrics-influxdb-1.9.0.jar*
> *..*
> *..*
>
> This means the jar is getting loaded, in the logs I could also see -
> 2020-08-12 15:28:51,505 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
>  - Registered UNIX signal handlers for [TERM, HUP, I
> NT]
> 2020-08-12 15:28:51,508 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
>  - Current working Directory: /mnt/yarn/usercache/ha
>
> doop/appcache/application_1595767096609_0013/container_1595767096609_0013_01_04
>
> *2020-08-12 15:28:51,512 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: metrics.reporter.influxdb.interval, 60 SECONDS*
>
> *2020-08-12 15:28:51,512 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: env.yarn.conf.dir, /etc/hadoop/conf*
> 2020-08-12 15:28:51,513 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: metrics.reporter.
> influxdb.host, xx.xxx.xxx.xx
> 2020-08-12 15:28:51,513 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: high-availability
> .cluster-id, application_1595767096609_0013
> 2020-08-12 15:28:51,513 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.rpc.ad
> dress, ip-xx-x-xx-xxx.ec2.internal
> 2020-08-12 15:28:51,513 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: metrics.reporter.
> influxdb.password, **
>
> *2020-08-12 15:28:51,513 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: FLINK_PLUGINS_DIR, /usr/lib/flink/plugins*
> 2020-08-12 15:28:51,513 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: metrics.reporter.
> influxdb.db, xx
> 2020-08-12 15:28:51,520 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: metrics.reporter.
> influxdb.connectTimeout, 6
> 2020-08-12 15:28:51,520 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: env.hadoop.conf.d
> ir, /etc/hadoop/conf
> 2020-08-12 15:28:51,521 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: taskmanager.numbe
> rOfTaskSlots, 1
> 2020-08-12 15:28:51,521 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: web.port, 0
> 2020-08-12 15:28:51,521 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: metrics.reporter.influxdb.username, 
> 2020-08-12 15:28:51,521 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: taskmanager.memory.size, 264241152b
> 2020-08-12 15:28:51,521 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: web.tmpdir,
> /tmp/flink-web-5562f065-6020-4c38-8260-3aea434bf285
> 2020-08-12 15:28:51,521 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.rpc.port, 32777
> 2020-08-12 15:28:51,521 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: metrics.reporter.influxdb.port, 8086
> 2020-08-12 15:28:51,521 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: metrics.reporter.influxdb.retentionPolicy, one_hour
> 2020-08-12 15:28:51,522 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: internal.cluster.execution-mode, NORMAL
> 2020-08-12 15:28:51,522 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: metrics.reporter.influxdb.writeTimeout, 6
> 2020-08-12 15:28:51,522 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> co

Re: Pushing metrics to Influx from Flink 1.9 on AWS EMR(5.28)

2020-08-12 Thread bat man
-12 15:28:51,561 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 - Current working/local Directory:
/mnt/yarn/usercache/hadoop/appcache/application_1595767096609_0013,/mnt1/yarn/usercache/hadoop/appcache/application_1595767096609_0013
2020-08-12 15:28:51,564 INFO
 org.apache.flink.runtime.clusterframework.BootstrapTools  - Setting
directories for temporary files to:
/mnt/yarn/usercache/hadoop/appcache/application_1595767096609_0013,/mnt1/yarn/usercache/hadoop/appcache/application_1595767096609_0013
2020-08-12 15:28:51,564 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 - TM: remote keytab path obtained null
2020-08-12 15:28:51,564 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 - TM: remote keytab principal obtained null
2020-08-12 15:28:51,566 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
 - YARN daemon is running as: hadoop Yarn client user
obtainer: hadoop
2020-08-12 15:28:51,675 INFO
 org.apache.flink.runtime.security.modules.HadoopModule- Hadoop
user set to hadoop (auth:xx)
2020-08-12 15:28:51,984 WARN  org.apache.flink.configuration.Configuration
 - Config uses deprecated configuration key 'web.port'
instead of proper key 'rest.port'
2020-08-12 15:28:51,987 INFO
 org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Using
configured hostname/address for TaskManager: ip-xx-x-xx-xxx.ec2.internal.
2020-08-12 15:28:51,996 INFO
 org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Trying to
start actor system at ip-xx-x-xx-xxx.ec2.internal:0
2020-08-12 15:28:52,823 INFO  akka.event.slf4j.Slf4jLogger
 - Slf4jLogger started
2020-08-12 15:28:52,854 INFO  akka.remote.Remoting
 - Starting remoting
2020-08-12 15:28:53,061 INFO  akka.remote.Remoting
 - Remoting started; listening on addresses
:[akka.tcp://flink@ip-xx-x-xx-xxx.ec2.internal:37937]
2020-08-12 15:28:53,563 INFO
 org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Actor
system started at akka.tcp://flink@iip-xx-x-xx-xxx.ec2.ec2.internal:37937
*2020-08-12 15:28:53,593 WARN
 org.apache.flink.runtime.metrics.ReporterSetup- The
reporter factory
(org.apache.flink.metrics.influxdb.InfluxdbReporterFactory) could not be
found for reporter influxdb. Available factories:*

*2020-08-12 15:28:53,597 INFO
 org.apache.flink.runtime.metrics.MetricRegistryImpl   - No metrics
reporter configured, no metrics will be exposed/reported.*2020-08-12
15:28:53,599 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils
  - Trying to start actor system at ip-xx-x-xx-xxx.ec2.ec2.internal:0

So at one place org.apache.flink.configuration.GlobalConfiguration refers
to the properties and metrics reported but
then org.apache.flink.runtime.metrics.ReporterSetup complains of not
finding it.

Can anyone guide what I am missing here.

Thanks,
Hemant

On Wed, Aug 12, 2020 at 9:15 PM bat man  wrote:

> Hello Experts,
>
> I am running Flink - 1.9.0 on AWS EMR(emr-5.28.1). I want to push
> metrics to Influxdb. I followed the documentation[1]. I added the
> configuration to /usr/lib/flink/conf/flink-conf.yaml and copied the jar to
> /usr/lib/flink//lib folder on master node. However, I also
> understand that the cluster might need a re-start as only with these steps
> when I run the job I don't see any measurement(table) created in my influx
> db. I am not able to find any documentation on how to restart the cluster
> on EMR.
> Anyone who has configured to push metrics to InfluxDB from AWS EMR could
> you share the steps please.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#influxdb-orgapacheflinkmetricsinfluxdbinfluxdbreporter
>
> Thanks,
> Hemant
>


Pushing metrics to Influx from Flink 1.9 on AWS EMR(5.28)

2020-08-12 Thread bat man
Hello Experts,

I am running Flink - 1.9.0 on AWS EMR(emr-5.28.1). I want to push
metrics to Influxdb. I followed the documentation[1]. I added the
configuration to /usr/lib/flink/conf/flink-conf.yaml and copied the jar to
/usr/lib/flink//lib folder on master node. However, I also understand that
the cluster might need a re-start as only with these steps when I run the
job I don't see any measurement(table) created in my influx db. I am not
able to find any documentation on how to restart the cluster on EMR.
Anyone who has configured to push metrics to InfluxDB from AWS EMR could
you share the steps please.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#influxdb-orgapacheflinkmetricsinfluxdbinfluxdbreporter

Thanks,
Hemant


Re: Handle idle kafka source in Flink 1.9

2020-08-05 Thread bat man
Hello Arvid,

Thanks for the suggestion/reference and my apologies for the late reply.

With this I am able to process the data with some topics not having regular
data. Obviously, late data is being handheld as in side-output and has a
process for it.
One challenge is to handle the back-fill as when I run the job with old
data because of watermark(taking into account maxOutOfOrderness is set to
10 minutes) the older data gets filtered as late data. For handling this I
am thinking of running the side-input with maxOutOfOrderness to the oldest
data, regular job to be ok with normal setting.

Thanks,
Hemant

On Thu, Jul 30, 2020 at 2:41 PM Arvid Heise  wrote:

> Hi Hemant,
>
> sorry for the late reply.
>
> You can just create your own watermark assigner and either copy the
> assigner from Flink 1.11 or take the one that we use in our trainings [1].
>
> [1]
> https://github.com/ververica/flink-training-troubleshooting/blob/master/src/main/java/com/ververica/flinktraining/solutions/troubleshoot/TroubledStreamingJobSolution2.java#L129-L187
>
> On Thu, Jul 23, 2020 at 8:48 PM bat man  wrote:
>
>> Thanks Niels for a great talk. You have covered two of my pain areas -
>> slim and broken streams. Since I am dealing with device data from on-prem
>> data centers. The first option of generating fabricated watermark events is
>> fine, however as mentioned in your talk how are you handling forwarding it
>> to the next stream(next kafka topic) after enrichment. Have you got any
>> solution for this?
>>
>> -Hemant
>>
>> On Thu, Jul 23, 2020 at 12:05 PM Niels Basjes  wrote:
>>
>>> Have a look at this presentation I gave a few weeks ago.
>>> https://youtu.be/bQmz7JOmE_4
>>>
>>> Niels Basjes
>>>
>>> On Wed, 22 Jul 2020, 08:51 bat man,  wrote:
>>>
>>>> Hi Team,
>>>>
>>>> Can someone share their experiences handling this.
>>>>
>>>> Thanks.
>>>>
>>>> On Tue, Jul 21, 2020 at 11:30 AM bat man  wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I have a pipeline which consumes data from a Kafka source. Since, the
>>>>> partitions are partitioned by device_id in case a group of devices is down
>>>>> some partitions will not get normal flow of data.
>>>>> I understand from documentation here[1] in flink 1.11 one can declare
>>>>> the source idle -
>>>>> WatermarkStrategy.>forBoundedOutOfOrderness(
>>>>> Duration.ofSeconds(20)).withIdleness(Duration.ofMinutes(1));
>>>>>
>>>>> How can I handle this in 1.9, since I am using aws emr and emr doesn't
>>>>> have any release with the latest flink version.
>>>>>
>>>>> One way I could think of is to trigger watermark generation every 10
>>>>> minutes or so using Periodic watermarks. However, this will not be full
>>>>> proof, are there any better way to handle this more dynamically.
>>>>>
>>>>> [1] -
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector
>>>>>
>>>>> Thanks,
>>>>> Hemant
>>>>>
>>>>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: Handle idle kafka source in Flink 1.9

2020-07-23 Thread bat man
Thanks Niels for a great talk. You have covered two of my pain areas - slim
and broken streams. Since I am dealing with device data from on-prem data
centers. The first option of generating fabricated watermark events is
fine, however as mentioned in your talk how are you handling forwarding it
to the next stream(next kafka topic) after enrichment. Have you got any
solution for this?

-Hemant

On Thu, Jul 23, 2020 at 12:05 PM Niels Basjes  wrote:

> Have a look at this presentation I gave a few weeks ago.
> https://youtu.be/bQmz7JOmE_4
>
> Niels Basjes
>
> On Wed, 22 Jul 2020, 08:51 bat man,  wrote:
>
>> Hi Team,
>>
>> Can someone share their experiences handling this.
>>
>> Thanks.
>>
>> On Tue, Jul 21, 2020 at 11:30 AM bat man  wrote:
>>
>>> Hello,
>>>
>>> I have a pipeline which consumes data from a Kafka source. Since, the
>>> partitions are partitioned by device_id in case a group of devices is down
>>> some partitions will not get normal flow of data.
>>> I understand from documentation here[1] in flink 1.11 one can declare
>>> the source idle -
>>> WatermarkStrategy.>forBoundedOutOfOrderness(
>>> Duration.ofSeconds(20)).withIdleness(Duration.ofMinutes(1));
>>>
>>> How can I handle this in 1.9, since I am using aws emr and emr doesn't
>>> have any release with the latest flink version.
>>>
>>> One way I could think of is to trigger watermark generation every 10
>>> minutes or so using Periodic watermarks. However, this will not be full
>>> proof, are there any better way to handle this more dynamically.
>>>
>>> [1] -
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector
>>>
>>> Thanks,
>>> Hemant
>>>
>>>


Re: Handle idle kafka source in Flink 1.9

2020-07-22 Thread bat man
Hi Team,

Can someone share their experiences handling this.

Thanks.

On Tue, Jul 21, 2020 at 11:30 AM bat man  wrote:

> Hello,
>
> I have a pipeline which consumes data from a Kafka source. Since, the
> partitions are partitioned by device_id in case a group of devices is down
> some partitions will not get normal flow of data.
> I understand from documentation here[1] in flink 1.11 one can declare the
> source idle -
> WatermarkStrategy.>forBoundedOutOfOrderness(Duration.
> ofSeconds(20)).withIdleness(Duration.ofMinutes(1));
>
> How can I handle this in 1.9, since I am using aws emr and emr doesn't
> have any release with the latest flink version.
>
> One way I could think of is to trigger watermark generation every 10
> minutes or so using Periodic watermarks. However, this will not be full
> proof, are there any better way to handle this more dynamically.
>
> [1] -
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector
>
> Thanks,
> Hemant
>
>


Handle idle kafka source in Flink 1.9

2020-07-21 Thread bat man
Hello,

I have a pipeline which consumes data from a Kafka source. Since, the
partitions are partitioned by device_id in case a group of devices is down
some partitions will not get normal flow of data.
I understand from documentation here[1] in flink 1.11 one can declare the
source idle -
WatermarkStrategy.>forBoundedOutOfOrderness(Duration.
ofSeconds(20)).withIdleness(Duration.ofMinutes(1));

How can I handle this in 1.9, since I am using aws emr and emr doesn't have
any release with the latest flink version.

One way I could think of is to trigger watermark generation every 10
minutes or so using Periodic watermarks. However, this will not be full
proof, are there any better way to handle this more dynamically.

[1] -
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector

Thanks,
Hemant