Re: Kubernetes operator listing jobs TimeoutException

2023-06-07 Thread Evgeniy Lyutikov
Hi, thanks for the reply.
These errors occur on jobs that have already been successfully deployed and are 
running.

When such an error occurs, the operator begins to consider that the job is in 
the DEPLOYING or DEPLOYED_NOT_READY status, but all this time the job is in the 
RUNNING state and no actions are performed with it

It seems that this problem appeared after updating the FlinkDeployment resource 
to update the version of the running job


2023-06-08 06:31:02,741 o.a.f.k.o.o.JobStatusObserver  [WARN 
][job-name/job-name] Exception while listing jobs
2023-06-08 06:31:02,741 o.a.f.k.o.o.d.ApplicationObserver [INFO 
][job-name/job-name] Observing JobManager deployment. Previous status: READY
2023-06-08 06:31:03,758 o.a.f.k.o.o.d.ApplicationObserver [INFO 
][job-name/job-name] JobManager is being deployed
2023-06-08 06:31:03,824 o.a.f.k.o.l.AuditUtils [INFO 
][job-name/job-name] >>> Status | Info| STABLE  | The resource 
deployment is considered to be stable and won’t be rolled back
2023-06-08 06:31:03,825 o.a.f.k.o.a.JobAutoScalerImpl  [INFO 
][job-name/job-name] Job autoscaler is disabled
2023-06-08 06:31:03,825 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO 
][job-name/job-name] Resource fully reconciled, nothing to do...
2023-06-08 06:31:03,825 o.a.f.k.o.c.FlinkDeploymentController [INFO 
][job-name/job-name] End of reconciliation
2023-06-08 06:31:13,828 o.a.f.k.o.c.FlinkDeploymentController [INFO 
][job-name/job-name] Starting reconciliation
2023-06-08 06:31:13,829 o.a.f.k.o.s.FlinkResourceContextFactory [INFO 
][job-name/job-name] Getting service for job-name
2023-06-08 06:31:13,829 o.a.f.k.o.o.d.ApplicationObserver [INFO 
][job-name/job-name] Observing JobManager deployment. Previous status: DEPLOYING
2023-06-08 06:31:14,849 o.a.f.k.o.o.d.ApplicationObserver [INFO 
][job-name/job-name] JobManager is being deployed
2023-06-08 06:31:14,850 o.a.f.k.o.a.JobAutoScalerImpl  [INFO 
][job-name/job-name] Job autoscaler is disabled
2023-06-08 06:31:14,850 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO 
][job-name/job-name] Resource fully reconciled, nothing to do...
2023-06-08 06:31:14,850 o.a.f.k.o.c.FlinkDeploymentController [INFO 
][job-name/job-name] End of reconciliation
2023-06-08 06:31:24,853 o.a.f.k.o.c.FlinkDeploymentController [INFO 
][job-name/job-name] Starting reconciliation
2023-06-08 06:31:24,854 o.a.f.k.o.s.FlinkResourceContextFactory [INFO 
][job-name/job-name] Getting service for job-name
2023-06-08 06:31:24,854 o.a.f.k.o.o.d.ApplicationObserver [INFO 
][job-name/job-name] Observing JobManager deployment. Previous status: DEPLOYING
2023-06-08 06:31:24,858 o.a.f.k.o.o.d.ApplicationObserver [INFO 
][job-name/job-name] JobManager deployment port is ready, waiting for the Flink 
REST API...
2023-06-08 06:31:24,926 o.a.f.k.o.l.AuditUtils [INFO 
][job-name/job-name] >>> Status | Info| STABLE  | The resource 
deployment is considered to be stable and won’t be rolled back
2023-06-08 06:31:24,927 o.a.f.k.o.a.JobAutoScalerImpl  [INFO 
][job-name/job-name] Job autoscaler is disabled
2023-06-08 06:31:24,927 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO 
][job-name/job-name] Resource fully reconciled, nothing to do...
2023-06-08 06:31:24,927 o.a.f.k.o.c.FlinkDeploymentController [INFO 
][job-name/job-name] End of reconciliation
2023-06-08 06:31:34,930 o.a.f.k.o.c.FlinkDeploymentController [INFO 
][job-name/job-name] Starting reconciliation
2023-06-08 06:31:34,931 o.a.f.k.o.s.FlinkResourceContextFactory [INFO 
][job-name/job-name] Getting service for job-name
2023-06-08 06:31:34,931 o.a.f.k.o.o.d.ApplicationObserver [INFO 
][job-name/job-name] Observing JobManager deployment. Previous status: 
DEPLOYED_NOT_READY
2023-06-08 06:31:34,931 o.a.f.k.o.o.d.ApplicationObserver [INFO 
][job-name/job-name] JobManager deployment is ready
2023-06-08 06:31:34,931 o.a.f.k.o.o.JobStatusObserver  [INFO 
][job-name/job-name] Observing job status
2023-06-08 06:31:34,936 o.a.f.k.o.o.JobStatusObserver  [INFO 
][job-name/job-name] Job status changed from RECONCILING to RUNNING
2023-06-08 06:31:34,960 o.a.f.k.o.l.AuditUtils [INFO 
][job-name/job-name] >>> Event  | Info| JOBSTATUSCHANGED | Job status 
changed from RECONCILING to RUNNING
2023-06-08 06:31:35,031 o.a.f.k.o.l.AuditUtils [INFO 
][job-name/job-name] >>> Status | Info| STABLE  | The resource 
deployment is considered to be stable and won’t be rolled back
2023-06-08 06:31:35,032 o.a.f.k.o.a.JobAutoScalerImpl  [INFO 
][job-name/job-name] Job autoscaler is disabled
2023-06-08 06:31:35,032 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO 
][job-name/job-name] Resource fully reconciled, nothing to do...
2023-06-08 06:31:35,032 o.a.f.k.o.c.FlinkDeploymentController [INFO 
][job-name/job-name] End of reconciliation
2023-06-08 06:32:35,035 o.a.f.k.o.c.FlinkDeploymentController [INFO 
][job-name/job-name] Starting reconciliation
2023-06-08 06:32:35,035 o.a.f.k.o.s.FlinkResourceConte

RE: Parquet decoding exception - Flink 1.16.x

2023-06-07 Thread Kamal Mittal via user
Hello,

Can you please share view that for “file system sources”, how to create custom 
metrices e.g. no. of corrupt records count?

Using Flink file source API as in below mail and decoding parquet formatted 
data. Able to count corrupt records but how to give it to Flink?

Rgds,
Kamal

From: Kamal Mittal via user 
Sent: 07 June 2023 05:48 PM
To: Martijn Visser 
Cc: Kamal Mittal via user 
Subject: RE: Parquet decoding exception - Flink 1.16.x

Hello,

Metrices link given in below mail doesn’t give any way to create metrices for 
source function right?

I am using below Flink API to read/decode parquet data, query is where 
exception can be caught for error like “decoding exception” from internal 
parquet API like “AvroParquetReader” and create metrices for corrupt records?

FileSource.FileSourceBuilder source = 
FileSource.forRecordStreamFormat(streamformat, path); //streamformat is of type 
- AvroParquetRecordFormat

Please suggest.

Rgds,
Kamal

From: Martijn Visser mailto:martijnvis...@apache.org>>
Sent: 07 June 2023 03:39 PM
To: Kamal Mittal mailto:kamal.mit...@ericsson.com>>
Cc: Kamal Mittal via user mailto:user@flink.apache.org>>
Subject: Re: Raise alarm for corrupt records

Hi Kamal,

Documentation on the metrics can be found at 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/

Best regards,

Martijn

On Wed, Jun 7, 2023 at 10:13 AM Kamal Mittal via user 
mailto:user@flink.apache.org>> wrote:
Hello,

Thanks for quick reply.

I am using parquet encoder/decoder and during decoding if any corrupt record 
comes then need to raise alarm and maintain metrices visible over Flink 
Metrices GUI.

So any custom metrices can be created in Flink? Please give some reference of 
any such documentation.

Rgds,
Kamal

From: Martijn Visser mailto:martijnvis...@apache.org>>
Sent: 07 June 2023 12:31 PM
To: Kamal Mittal mailto:kamal.mit...@ericsson.com>>
Cc: user@flink.apache.org
Subject: Re: Raise alarm for corrupt records

Hi Kamal,

No, but it should be straightforward to create metrics or events for these 
types of situations and integrate them with your own alerting solution.

Best regards,

Martijn

On Wed, Jun 7, 2023 at 8:25 AM Kamal Mittal via user 
mailto:user@flink.apache.org>> wrote:
Hello Community,

Is there any way Flink provides out of box to raise alarm for corrupt records 
(e.g. due to decoding failure) in between of running data pipeline and send 
this alarm to outside of task manager process?

Rgds,
Kamal


Re: Kubernetes operator listing jobs TimeoutException

2023-06-07 Thread Shammon FY
Hi Evgeniy,

>From the following exception message:

at
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:123)
at
org.apache.flink.runtime.rest.RestClient.submitRequest(RestClient.java:469)
at
org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:392)
at
org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:306)
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$null$37(RestClusterClient.java:931)
at
java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)

It seems that the client tried to submit a job to the flink cluster through
the rest api failed, maybe you need to provide more information such as
config of k8s for the job and community can help better analyze problems.


Best,
Shammon FY

On Wed, Jun 7, 2023 at 11:35 PM Evgeniy Lyutikov 
wrote:

> Hello.
> We use Kubernetes operator 1.4.0, operator serves about 50 jobs, but
> sometimes there are errors in the logs that are reflected in the metrics
> (FlinkDeployment.JmDeploymentStatus.READY.Count). What is the reason for
> such errors?
>
>
> 2023-06-07 15:28:27,601 o.a.f.k.o.c.FlinkDeploymentController [INFO
> ][job-name/job-name] Starting reconciliation
> 2023-06-07 15:28:27,602 o.a.f.k.o.s.FlinkResourceContextFactory [INFO
> ][job-name/job-name] Getting service for job-name
> 2023-06-07 15:28:27,602 o.a.f.k.o.o.JobStatusObserver  [INFO
> ][job-name/job-name] Observing job status
> 2023-06-07 15:28:39,623 o.a.f.s.n.i.n.c.AbstractChannel [WARN ]
> Force-closing a channel whose registration task was not accepted by an
> event loop: [id: 0xd494f516]
> java.util.concurrent.RejectedExecutionException: event executor terminated
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:923)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:350)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:343)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:825)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:815)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.register(AbstractChannel.java:483)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:87)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:81)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.MultithreadEventLoopGroup.register(MultithreadEventLoopGroup.java:86)
> at
> org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:323)
> at
> org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:155)
> at
> org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:139)
> at
> org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:123)
> at
> org.apache.flink.runtime.rest.RestClient.submitRequest(RestClient.java:469)
> at
> org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:392)
> at
> org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:306)
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$null$37(RestClusterClient.java:931)
> at
> java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
> at
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:649)
> at
> java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829)
> 2023-06-07 15:28:39,624 o.a.f.s.n.i.n.u.c.D.rejectedExecution [ERROR]
> Failed to submit a listener notification task. Event loop shut down?
> java.util.concurrent.RejectedExecutionException: event executor terminated
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:923)
> at
> 

Re: [ANNOUNCE] Apache flink-connector-pulsar v3.0.1 released

2023-06-07 Thread Neng Lu
Thank you very much for coordinating this!

I think we also need to release 4.0.1 to fix the pulsar-client OAuth2 issue.

On Wed, Jun 7, 2023 at 1:48 AM Leonard Xu  wrote:

> The Apache Flink community is very happy to announce the release of Apache
> flink-connector-pulsar v3.0.1.
> This release is compatible with Flink 1.16.x series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12352640
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Regards,
> Leonard



-- 
Best Regards,
Neng


Kubernetes operator listing jobs TimeoutException

2023-06-07 Thread Evgeniy Lyutikov
Hello.
We use Kubernetes operator 1.4.0, operator serves about 50 jobs, but sometimes 
there are errors in the logs that are reflected in the metrics 
(FlinkDeployment.JmDeploymentStatus.READY.Count). What is the reason for such 
errors?


2023-06-07 15:28:27,601 o.a.f.k.o.c.FlinkDeploymentController [INFO 
][job-name/job-name] Starting reconciliation
2023-06-07 15:28:27,602 o.a.f.k.o.s.FlinkResourceContextFactory [INFO 
][job-name/job-name] Getting service for job-name
2023-06-07 15:28:27,602 o.a.f.k.o.o.JobStatusObserver  [INFO 
][job-name/job-name] Observing job status
2023-06-07 15:28:39,623 o.a.f.s.n.i.n.c.AbstractChannel [WARN ] Force-closing a 
channel whose registration task was not accepted by an event loop: [id: 
0xd494f516]
java.util.concurrent.RejectedExecutionException: event executor terminated
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:923)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:350)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:343)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:825)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:815)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.register(AbstractChannel.java:483)
at 
org.apache.flink.shaded.netty4.io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:87)
at 
org.apache.flink.shaded.netty4.io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:81)
at 
org.apache.flink.shaded.netty4.io.netty.channel.MultithreadEventLoopGroup.register(MultithreadEventLoopGroup.java:86)
at 
org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:323)
at 
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:155)
at 
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:139)
at 
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:123)
at 
org.apache.flink.runtime.rest.RestClient.submitRequest(RestClient.java:469)
at 
org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:392)
at 
org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:306)
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$null$37(RestClusterClient.java:931)
at 
java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at 
java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
at 
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:649)
at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
2023-06-07 15:28:39,624 o.a.f.s.n.i.n.u.c.D.rejectedExecution [ERROR] Failed to 
submit a listener notification task. Event loop shut down?
java.util.concurrent.RejectedExecutionException: event executor terminated
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:923)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:350)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:343)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:825)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:815)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:841)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:499)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:184)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95)
 

PyFlink Error JAR files

2023-06-07 Thread Kadiyala, Ruthvik via user
Hi,

Please find below the code I have been using to consume a Kafka Stream that is 
hosted on confluent. It returns an error regarding the jar files. Please find 
the error below the code snippet. Let me know what I am doing wrong. I am 
running this on Docker with Flink Version: 1.7.1.

Code:

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.datastream.formats.json import JsonRowDeserializationSchema
import glob
import os
import sys
import logging

# Set up the execution environment
env = StreamExecutionEnvironment.get_execution_environment()

logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

# the sql connector for kafka is used here as it's a fat jar and could avoid 
dependency issues
env.add_jars("file://flink-sql-connector-kafka-1.17.1.jar")
env.add_classpaths("file://flink-sql-connector-kafka-1.17.1.jar")
env.add_jars("file://flink-connector-kafka_2.11-1.9.2-javadoc.jar")
env.add_classpaths("file://flink-connector-kafka_2.11-1.9.2-javadoc.jar")

# Set up the Confluent Cloud Kafka configuration
kafka_config = {
'bootstrap.servers': 'bootstrap-server',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.jaas.config': 
'org.apache.kafka.common.security.plain.PlainLoginModule required 
username="API_KEY" password="API_SECRET";'
}

topic = 'TOPIC_NAME'

deserialization_schema = JsonRowDeserializationSchema.Builder() \
.type_info(Types.ROW([Types.INT(), Types.STRING()])) \
.build()

# Set up the Kafka consumer properties
consumer_props = {
'bootstrap.servers': kafka_config['bootstrap.servers'],
'security.protocol': kafka_config['security.protocol'],
'sasl.mechanism': kafka_config['sasl.mechanism'],
'sasl.jaas.config': kafka_config['sasl.jaas.config'],
'group.id': 'python-group-1'
}

# Create a Kafka consumer
kafka_consumer = FlinkKafkaConsumer(
topics = topic,  # Kafka topic
deserialization_schema = deserialization_schema,
properties = consumer_props,  # Consumer properties
)
kafka_consumer.set_start_from_earliest()
# Add the Kafka consumer as a source to the execution environment
stream = env.add_source(kafka_consumer)

# Define your data processing logic here
# For example, you can print the stream to the console
stream.print()

# Execute the job
env.execute()

Error:

Traceback (most recent call last):
  File "/home/pyflink/test.py", line 45, in 
kafka_consumer = FlinkKafkaConsumer(
  File 
"/usr/local/lib/python3.10/dist-packages/pyflink/datastream/connectors/kafka.py",
 line 203, in __init__
j_flink_kafka_consumer = _get_kafka_consumer(topics, properties, 
deserialization_schema,
  File 
"/usr/local/lib/python3.10/dist-packages/pyflink/datastream/connectors/kafka.py",
 line 161, in _get_kafka_consumer
j_flink_kafka_consumer = j_consumer_clz(topics,
  File "/usr/local/lib/python3.10/dist-packages/pyflink/util/exceptions.py", 
line 185, in wrapped_call
raise TypeError(
TypeError: Could not found the Java class 
'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'. The Java 
dependencies could be specified via command line argument '--jarfile' or the 
config option 'pipeline.jars'



Cheers & Regards,
Ruthvik Kadiyala





RE: Parquet decoding exception - Flink 1.16.x

2023-06-07 Thread Kamal Mittal via user
Hello,

Metrices link given in below mail doesn’t give any way to create metrices for 
source function right?

I am using below Flink API to read/decode parquet data, query is where 
exception can be caught for error like “decoding exception” from internal 
parquet API like “AvroParquetReader” and create metrices for corrupt records?

FileSource.FileSourceBuilder source = 
FileSource.forRecordStreamFormat(streamformat, path); //streamformat is of type 
- AvroParquetRecordFormat

Please suggest.

Rgds,
Kamal

From: Martijn Visser 
Sent: 07 June 2023 03:39 PM
To: Kamal Mittal 
Cc: Kamal Mittal via user 
Subject: Re: Raise alarm for corrupt records

Hi Kamal,

Documentation on the metrics can be found at 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/

Best regards,

Martijn

On Wed, Jun 7, 2023 at 10:13 AM Kamal Mittal via user 
mailto:user@flink.apache.org>> wrote:
Hello,

Thanks for quick reply.

I am using parquet encoder/decoder and during decoding if any corrupt record 
comes then need to raise alarm and maintain metrices visible over Flink 
Metrices GUI.

So any custom metrices can be created in Flink? Please give some reference of 
any such documentation.

Rgds,
Kamal

From: Martijn Visser mailto:martijnvis...@apache.org>>
Sent: 07 June 2023 12:31 PM
To: Kamal Mittal mailto:kamal.mit...@ericsson.com>>
Cc: user@flink.apache.org
Subject: Re: Raise alarm for corrupt records

Hi Kamal,

No, but it should be straightforward to create metrics or events for these 
types of situations and integrate them with your own alerting solution.

Best regards,

Martijn

On Wed, Jun 7, 2023 at 8:25 AM Kamal Mittal via user 
mailto:user@flink.apache.org>> wrote:
Hello Community,

Is there any way Flink provides out of box to raise alarm for corrupt records 
(e.g. due to decoding failure) in between of running data pipeline and send 
this alarm to outside of task manager process?

Rgds,
Kamal


Re: Custom Counter on Flink File Source

2023-06-07 Thread Hang Ruan
Hi, Kirti.

We could find these information in the 1.18 release wiki page[1].

Its timeline is as follows.
Feature Freeze: July 11, 2023, end of business CEST
Release: End of September 2023

Best,
Hang

[1]
https://cwiki.apache.org/confluence/display/FLINK/1.18+Release#id-1.18Release-Summary

Kirti Dhar Upadhyay K  于2023年6月7日周三
15:49写道:

> Thanks Hang.
>
> Any expected date for Flink 1.18.0 release?
>
>
>
> Regards,
>
> Kirti Dhar
>
>
>
> *From:* Hang Ruan 
> *Sent:* 07 June 2023 07:34
> *To:* Kirti Dhar Upadhyay K 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Custom Counter on Flink File Source
>
>
>
> Hi, Kirti Dhar Upadhyay K.
>
>
>
> I check the FLIP-274[1]. This issue will be released in the 1.18.0. It is
> not contained in any release now.
>
>
>
> Best,
>
> Hang
>
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-274%3A+Introduce+metric+group+for+OperatorCoordinator
>
>
>
> Kirti Dhar Upadhyay K  于2023年6月7日周三
> 02:51写道:
>
> Hi Hang,
>
>
>
> Thanks for reply.
>
> I tried using SplitEnumeratorContext passed in
> AbstractFileSource#createEnumerator but resulted as NullPointerException.
>
> As SplitEnumeratorContext provides its implementation as
> SourceCoordinatorContext having metricGroup() as below-
>
>
>
>
>
> @Override
>
> *public* SplitEnumeratorMetricGroup metricGroup() {
>
> *return* *null*;
>
> }
>
>
>
> Am I doing any mistake?
>
>
>
> Regards,
>
> Kirti Dhar
>
>
>
> *From:* Hang Ruan 
> *Sent:* 06 June 2023 08:12
> *To:* Kirti Dhar Upadhyay K 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Custom Counter on Flink File Source
>
>
>
> Hi, Kirti Dhar Upadhyay K.
>
>
>
> We could get the metric group from the context, like `SourceReaderContext`
> and `SplitEnumeratorContext`. These contexts could be found when creating
> readers and enumerators. See `AbstractFileSource#createReader` and
> `AbstractFileSource#createEnumerator`.
>
>
>
> Best,
>
> Hang
>
>
>
> Kirti Dhar Upadhyay K via user  于2023年6月5日周一 22:57
> 写道:
>
> Hi Community,
>
>
>
> I am trying to add a new counter for number of files collected on Flink
> File Source.
>
> Referring the doc
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/ I
> understand how to add a new counter on any operator.
>
>
>
> *this.*counter *=* *getRuntimeContext().*getMetricGroup*().*counter*(*
> "myCounter"*);*
>
>
>
> But not able to get this RuntimeContext on FileSource.
>
> Can someone give some clue on this?
>
>
>
> Regards,
>
> Kirti Dhar
>
>


Re: Raise alarm for corrupt records

2023-06-07 Thread Martijn Visser
Hi Kamal,

Documentation on the metrics can be found at
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/

Best regards,

Martijn

On Wed, Jun 7, 2023 at 10:13 AM Kamal Mittal via user 
wrote:

> Hello,
>
>
>
> Thanks for quick reply.
>
>
>
> I am using parquet encoder/decoder and during decoding if any corrupt
> record comes then need to raise alarm and maintain metrices visible over
> Flink Metrices GUI.
>
>
>
> So any custom metrices can be created in Flink? Please give some reference
> of any such documentation.
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Martijn Visser 
> *Sent:* 07 June 2023 12:31 PM
> *To:* Kamal Mittal 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Raise alarm for corrupt records
>
>
>
> Hi Kamal,
>
>
>
> No, but it should be straightforward to create metrics or events for these
> types of situations and integrate them with your own alerting solution.
>
>
>
> Best regards,
>
>
>
> Martijn
>
>
>
> On Wed, Jun 7, 2023 at 8:25 AM Kamal Mittal via user <
> user@flink.apache.org> wrote:
>
> Hello Community,
>
>
>
> Is there any way Flink provides out of box to raise alarm for corrupt
> records (e.g. due to decoding failure) in between of running data pipeline
> and send this alarm to outside of task manager process?
>
>
>
> Rgds,
>
> Kamal
>
>


[ANNOUNCE] Apache flink-connector-pulsar v3.0.1 released

2023-06-07 Thread Leonard Xu
The Apache Flink community is very happy to announce the release of Apache 
flink-connector-pulsar v3.0.1. 
This release is compatible with Flink 1.16.x series.

Apache Flink® is an open-source stream processing framework for distributed, 
high-performing, always-available, and accurate data streaming applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12352640

We would like to thank all contributors of the Apache Flink community who made 
this release possible!

Regards,
Leonard

RE: Raise alarm for corrupt records

2023-06-07 Thread Kamal Mittal via user
Hello,

Thanks for quick reply.

I am using parquet encoder/decoder and during decoding if any corrupt record 
comes then need to raise alarm and maintain metrices visible over Flink 
Metrices GUI.

So any custom metrices can be created in Flink? Please give some reference of 
any such documentation.

Rgds,
Kamal

From: Martijn Visser 
Sent: 07 June 2023 12:31 PM
To: Kamal Mittal 
Cc: user@flink.apache.org
Subject: Re: Raise alarm for corrupt records

Hi Kamal,

No, but it should be straightforward to create metrics or events for these 
types of situations and integrate them with your own alerting solution.

Best regards,

Martijn

On Wed, Jun 7, 2023 at 8:25 AM Kamal Mittal via user 
mailto:user@flink.apache.org>> wrote:
Hello Community,

Is there any way Flink provides out of box to raise alarm for corrupt records 
(e.g. due to decoding failure) in between of running data pipeline and send 
this alarm to outside of task manager process?

Rgds,
Kamal


RE: Custom Counter on Flink File Source

2023-06-07 Thread Kirti Dhar Upadhyay K via user
Thanks Hang.
Any expected date for Flink 1.18.0 release?

Regards,
Kirti Dhar

From: Hang Ruan 
Sent: 07 June 2023 07:34
To: Kirti Dhar Upadhyay K 
Cc: user@flink.apache.org
Subject: Re: Custom Counter on Flink File Source

Hi, Kirti Dhar Upadhyay K.

I check the FLIP-274[1]. This issue will be released in the 1.18.0. It is not 
contained in any release now.

Best,
Hang

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-274%3A+Introduce+metric+group+for+OperatorCoordinator

Kirti Dhar Upadhyay K 
mailto:kirti.k.dhar.upadh...@ericsson.com>> 
于2023年6月7日周三 02:51写道:
Hi Hang,

Thanks for reply.
I tried using SplitEnumeratorContext passed in 
AbstractFileSource#createEnumerator but resulted as NullPointerException.
As SplitEnumeratorContext provides its implementation as 
SourceCoordinatorContext having metricGroup() as below-


@Override
public SplitEnumeratorMetricGroup metricGroup() {
return null;
}

Am I doing any mistake?

Regards,
Kirti Dhar

From: Hang Ruan mailto:ruanhang1...@gmail.com>>
Sent: 06 June 2023 08:12
To: Kirti Dhar Upadhyay K 
mailto:kirti.k.dhar.upadh...@ericsson.com>>
Cc: user@flink.apache.org
Subject: Re: Custom Counter on Flink File Source

Hi, Kirti Dhar Upadhyay K.

We could get the metric group from the context, like `SourceReaderContext` and 
`SplitEnumeratorContext`. These contexts could be found when creating readers 
and enumerators. See `AbstractFileSource#createReader` and 
`AbstractFileSource#createEnumerator`.

Best,
Hang

Kirti Dhar Upadhyay K via user 
mailto:user@flink.apache.org>> 于2023年6月5日周一 22:57写道:
Hi Community,

I am trying to add a new counter for number of files collected on Flink File 
Source.
Referring the doc  
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/ I 
understand how to add a new counter on any operator.

this.counter = getRuntimeContext().getMetricGroup().counter("myCounter");

But not able to get this RuntimeContext on FileSource.
Can someone give some clue on this?

Regards,
Kirti Dhar


Re: Raise alarm for corrupt records

2023-06-07 Thread Martijn Visser
Hi Kamal,

No, but it should be straightforward to create metrics or events for these
types of situations and integrate them with your own alerting solution.

Best regards,

Martijn

On Wed, Jun 7, 2023 at 8:25 AM Kamal Mittal via user 
wrote:

> Hello Community,
>
>
>
> Is there any way Flink provides out of box to raise alarm for corrupt
> records (e.g. due to decoding failure) in between of running data pipeline
> and send this alarm to outside of task manager process?
>
>
>
> Rgds,
>
> Kamal
>