Re: Flink Kafka offset commit issues

2023-09-28 Thread elakiya udhayanan
Hi Feng,

Thanks for your response.

1. We have configured checkpointing to upload to a s3 location, also we see
metadata files getting created in the s3 location. But we are unsure if the
job is getting triggered from that checkpoint in case of failure. Is there
a possible way to test this. Also does this apply for any upgrades or
enhancements to the job or how we can commit the offset in such cases.
2. For the savepointing, we are currently exploring it.
3. I would like to know if there are any properties that Flink provides to
do the Kafka offset commit.

Thanks,
Elakiya

On Thu, Sep 28, 2023 at 3:10 PM Feng Jin  wrote:

> Hi Elakiya
>
> 1. You can confirm if the checkpoint for the task has been triggered
> normally?
>
> 2. Also, If you stop the job, you need to use "STOP WITH SAVEPOINT" and
> specify the path to the savepoint when starting the Flink job for recovery.
> This is necessary to continue consuming from the historical offset
> correctly.
>
>
> Best,
> Feng
>
>
> On Thu, Sep 28, 2023 at 4:41 PM elakiya udhayanan 
> wrote:
>
>> Hi team,
>>
>> I have a Kafka topic named employee which uses confluent avro schema and
>> will emit the payload as below:
>> {
>> "id": "emp_123456",
>> "employee": {
>> "id": "123456",
>> "name": "sampleName"
>> }
>> }
>> I am using the upsert-kafka connector to consume the events from the
>> above Kafka topic as below using the Flink SQL DDL statement.The problem is
>> the connector is not committing the offset. Everytime, I submit the job, it
>> reads Kafka events from the beginning. Please let me know if we can commit
>> the offset for the read Kafka events.
>>
>> DDL Statement:
>> String statement = "CREATE TABLE Employee (\r\n" +
>> "  id STRING,\r\n" +
>> "  employee  ROW(id STRING, name STRING\r\n" +
>> "  ),\r\n" +
>> "  PRIMARY KEY (i d) NOT ENFORCED\r\n" +
>> ") WITH (\r\n" +
>> "  'connector' = 'upsert-kafka',\r\n" +
>> "  'topic' = 'employee',\r\n" +
>> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
>> "  'key.format' = 'raw',\r\n" +
>> "  'value.format' = 'avro-confluent',\r\n" +
>> "  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n"
>> +
>> ")";
>> Any help is appreciated TIA
>>
>> Thanks,
>> Elakiya
>>
>


Cannot find metata file metadats in directory

2023-09-28 Thread rui chen
When we use 1.13.2,we have the following error:
FileNotFoundException: Cannot find metata file metadats in directory
'hdfs://xx/f408dbe327f9e5053e76d7b5323d6e81/chk-173'.


Cannot find metata file metadats in directory

2023-09-28 Thread rui chen
When we use 1.13.2,we have the following error:
FileNotFoundException: Cannot find metata file metadats in directory
'hdfs://xx/f408dbe327f9e5053e76d7b5323d6e81/chk-173'.


Re: Flink Kafka offset commit issues

2023-09-28 Thread Feng Jin
Hi Elakiya

1. You can confirm if the checkpoint for the task has been triggered
normally?

2. Also, If you stop the job, you need to use "STOP WITH SAVEPOINT" and
specify the path to the savepoint when starting the Flink job for recovery.
This is necessary to continue consuming from the historical offset
correctly.


Best,
Feng


On Thu, Sep 28, 2023 at 4:41 PM elakiya udhayanan 
wrote:

> Hi team,
>
> I have a Kafka topic named employee which uses confluent avro schema and
> will emit the payload as below:
> {
> "id": "emp_123456",
> "employee": {
> "id": "123456",
> "name": "sampleName"
> }
> }
> I am using the upsert-kafka connector to consume the events from the above
> Kafka topic as below using the Flink SQL DDL statement.The problem is the
> connector is not committing the offset. Everytime, I submit the job, it
> reads Kafka events from the beginning. Please let me know if we can commit
> the offset for the read Kafka events.
>
> DDL Statement:
> String statement = "CREATE TABLE Employee (\r\n" +
> "  id STRING,\r\n" +
> "  employee  ROW(id STRING, name STRING\r\n" +
> "  ),\r\n" +
> "  PRIMARY KEY (i d) NOT ENFORCED\r\n" +
> ") WITH (\r\n" +
> "  'connector' = 'upsert-kafka',\r\n" +
> "  'topic' = 'employee',\r\n" +
> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
> "  'key.format' = 'raw',\r\n" +
> "  'value.format' = 'avro-confluent',\r\n" +
> "  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n"
> +
> ")";
> Any help is appreciated TIA
>
> Thanks,
> Elakiya
>


Re: Continuous errors with Azure ABFSS

2023-09-28 Thread Alexis Sarda-Espinosa
Hi Surendra,

there are no exceptions in the logs, nor anything salient with
INFO/WARN/ERROR levels. The checkpoints are definitely completing, we even
set the config

execution.checkpointing.tolerable-failed-checkpoints: 1

Regards,
Alexis.

Am Do., 28. Sept. 2023 um 09:32 Uhr schrieb Surendra Singh Lilhore <
surendralilh...@gmail.com>:

> Hi Alexis,
>
> Could you please check the TaskManager log for any exceptions?
>
> Thanks
> Surendra
>
>
> On Thu, Sep 28, 2023 at 7:06 AM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> We are using ABFSS for RocksDB's backend as well as the storage dir
>> required for Kubernetes HA. In the Azure Portal's monitoring insights I see
>> that every single operation contains failing transactions for the
>> GetPathStatus API. Unfortunately I don't see any additional details, but I
>> know the storage account is only used by Flink. Checkpointing isn't
>> failing, but I wonder if this could be an issue in the long term?
>>
>> Regards,
>> Alexis.
>>
>>


Re: Continuous errors with Azure ABFSS

2023-09-28 Thread Surendra Singh Lilhore
Hi Alexis,

Could you please check the TaskManager log for any exceptions?

Thanks
Surendra


On Thu, Sep 28, 2023 at 7:06 AM Alexis Sarda-Espinosa <
sarda.espin...@gmail.com> wrote:

> Hello,
>
> We are using ABFSS for RocksDB's backend as well as the storage dir
> required for Kubernetes HA. In the Azure Portal's monitoring insights I see
> that every single operation contains failing transactions for the
> GetPathStatus API. Unfortunately I don't see any additional details, but I
> know the storage account is only used by Flink. Checkpointing isn't
> failing, but I wonder if this could be an issue in the long term?
>
> Regards,
> Alexis.
>
>


Flink Kafka offset commit issues

2023-09-28 Thread elakiya udhayanan
Hi team,

I have a Kafka topic named employee which uses confluent avro schema and
will emit the payload as below:
{
"id": "emp_123456",
"employee": {
"id": "123456",
"name": "sampleName"
}
}
I am using the upsert-kafka connector to consume the events from the above
Kafka topic as below using the Flink SQL DDL statement.The problem is the
connector is not committing the offset. Everytime, I submit the job, it
reads Kafka events from the beginning. Please let me know if we can commit
the offset for the read Kafka events.

DDL Statement:
String statement = "CREATE TABLE Employee (\r\n" +
"  id STRING,\r\n" +
"  employee  ROW(id STRING, name STRING\r\n" +
"  ),\r\n" +
"  PRIMARY KEY (i d) NOT ENFORCED\r\n" +
") WITH (\r\n" +
"  'connector' = 'upsert-kafka',\r\n" +
"  'topic' = 'employee',\r\n" +
"  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
"  'key.format' = 'raw',\r\n" +
"  'value.format' = 'avro-confluent',\r\n" +
"  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n"
+
")";
Any help is appreciated TIA

Thanks,
Elakiya


Re: Custom Prometheus metrics disappeared in 1.16.2 => 1.17.1 upgrade

2023-09-28 Thread Javier Vegas
Thanks! I saw the first change but missed the third one, that is the
most that most probably explains my problem, most probably the metrics
I was sending with the twitter/finagle statsReceiver ended up in the
singleton default registry and were exposed by Flink with all the
other Flink metrics, but now that Flink uses its own registry I have
no idea where my custom metrics end up


El mié, 27 sept 2023 a las 4:56, Kenan Kılıçtepe
() escribió:
>
> Have you checked the metric  changes in 1.17.
>
> From release notes 1.17:
> https://nightlies.apache.org/flink/flink-docs-master/release-notes/flink-1.17/
>
> Metric Reporters #
> Only support reporter factories for instantiation #
> FLINK-24235 #
> Configuring reporters by their class is no longer supported. Reporter 
> implementations must provide a MetricReporterFactory, and all configurations 
> must be migrated to such a factory.
>
> UseLogicalIdentifier makes datadog consider metric as custom #
> FLINK-30383 #
> The Datadog reporter now adds a “flink.” prefix to metric identifiers if 
> “useLogicalIdentifier” is enabled. This is required for these metrics to be 
> recognized as Flink metrics, not custom ones.
>
> Use separate Prometheus CollectorRegistries #
> FLINK-30020 #
> The PrometheusReporters now use a separate CollectorRegistry for each 
> reporter instance instead of the singleton default registry. This generally 
> shouldn’t impact setups, but it may break code that indirectly interacts with 
> the reporter via the singleton instance (e.g., a test trying to assert what 
> metrics are reported).
>
>
>
> On Wed, Sep 27, 2023 at 11:11 AM Javier Vegas  wrote:
>>
>> I implemented some custom Prometheus metrics that were working on
>> 1.16.2, with my configuration
>>
>> metrics.reporter.prom.factory.class:
>> org.apache.flink.metrics.prometheus.PrometheusReporterFactory
>> metrics.reporter.prom.port: 
>>
>> I could see both Flink metrics and my custom metrics on port  of
>> my task managers
>>
>> After upgrading to 1.17.1, using the same configuration, I can see
>> only the FLink metrics on port  of the task managers,
>> the custom metrics are getting lost somewhere.
>>
>> The release notes for 1.17 mention
>> https://issues.apache.org/jira/browse/FLINK-24235
>> that removes instantiating reporters by name and forces using a
>> factory, which I was already doing in 1.16.2. Do I need to do
>> anything extra after those changes so my metrics are aggregated with
>> the Flink ones?
>>
>> I am also seeing this error message on application startup (which I
>> was already seeing in 1.16.2): "Multiple implementations of the same
>> reporter were found in 'lib' and/or 'plugins' directories for
>> org.apache.flink.metrics.prometheus.PrometheusReporterFactory. It is
>> recommended to remove redundant reporter JARs to resolve used
>> versions' ambiguity." Could that also explain the missing metrics?
>>
>> Thanks,
>>
>> Javier Vegas


Re: Continuous errors with Azure ABFSS

2023-09-28 Thread Alexis Sarda-Espinosa
Hi Ram,

Thanks for that. We configure a path with ABFSS scheme in the following
settings:

- state.checkpoints.dir
- state.savepoints.dir
- high-availability.storageDir

We use RocksDB with incremental checkpointing every minute.

I found the metrics from Azure in the storage account under Monitoring,
Insights, Failures, scrolling down. I'll attach a screenshot here, although
I'm not sure that works well with the distribution list.

Regards,
Alexis.

Am Do., 28. Sept. 2023 um 07:28 Uhr schrieb ramkrishna vasudevan <
ramvasu.fl...@gmail.com>:

> Can you help with more info here?
> The RocksDB backend itself is in ABFS instead of local? Or you mean the
> checkpoint is in ABFS but local dir for RocksDB is in local storage?
>
> GetPathSTatus is done by your monitoring pages? We run Flink on ABFS so we
> would like to see if we can help you out.
>
> Regards
> Ram
>
> On Thu, Sep 28, 2023 at 2:06 AM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> We are using ABFSS for RocksDB's backend as well as the storage dir
>> required for Kubernetes HA. In the Azure Portal's monitoring insights I see
>> that every single operation contains failing transactions for the
>> GetPathStatus API. Unfortunately I don't see any additional details, but I
>> know the storage account is only used by Flink. Checkpointing isn't
>> failing, but I wonder if this could be an issue in the long term?
>>
>> Regards,
>> Alexis.
>>
>>


Re: After using the jemalloc memory allocator for a period of time, checkpoint timeout occurs and tasks are stuck

2023-09-28 Thread Feng Jin
hi Rui,

We are using open jdk ,  `openjdk version "1.8.0_265"`


Best,
Feng

On Thu, Sep 28, 2023 at 2:15 PM rui chen  wrote:

> hi Feng
>
> Are you using the open jdk or the oracle jdk?
>
> Best,
> rui
>
> rui chen  于2023年9月27日周三 20:22写道:
>
>> hi Feng,
>>
>> Thanks for your reply, we are 8 u192 JDK, may be is the question, I found
>> a JDK issue:https://bugs.openjdk.org/browse/JDK-8215355.
>>
>> Best,
>> rui
>>
>> Feng Jin  于2023年9月27日周三 20:09写道:
>>
>>> hi Rui,
>>>
>>> Which version of JDK are you using?
>>>
>>> This issue could potentially be a bug in the JDK version.
>>>
>>> If you are using JDK 8, you can try using OpenJDK 8u265 as a possible
>>> solution.
>>>
>>>
>>> Best,
>>> Feng
>>>
>>>
>>> On Wed, Sep 27, 2023 at 8:08 PM rui chen  wrote:
>>>


 rui chen  于2023年9月27日周三 19:32写道:

> hi Feng,
>
> Thank you for your reply,We observed the GC situation, there is no
> change before and after replacement, several tasks on our line using
> jemalloc have appeared stuck, after removing jemalloc, no stuck situation
> has been found.
>
> Best,
> rui
>
> Feng Jin  于2023年9月27日周三 19:19写道:
>
>>
>> hi rui,
>>
>> In general, checkpoint timeouts are typically associated with the
>> job's processing performance. When using jemalloc, performance 
>> degradation
>> is generally not observed.
>>
>> It is advisable to analyze whether the job's garbage collection (GC)
>> has become more frequent.
>>
>>
>> Best,
>> Feng
>>
>>
>> On Mon, Sep 25, 2023 at 1:21 PM rui chen 
>> wrote:
>>
>>> After using the jemalloc memory allocator for a period of time,
>>> checkpoint timeout occurs and tasks are stuck. Who has encountered this?
>>> flink version:1.13.2, jiemalloc version: 5.3.0
>>>
>>


Re: After using the jemalloc memory allocator for a period of time, checkpoint timeout occurs and tasks are stuck

2023-09-28 Thread rui chen
hi Feng

Are you using the open jdk or the oracle jdk?

Best,
rui

rui chen  于2023年9月27日周三 20:22写道:

> hi Feng,
>
> Thanks for your reply, we are 8 u192 JDK, may be is the question, I found
> a JDK issue:https://bugs.openjdk.org/browse/JDK-8215355.
>
> Best,
> rui
>
> Feng Jin  于2023年9月27日周三 20:09写道:
>
>> hi Rui,
>>
>> Which version of JDK are you using?
>>
>> This issue could potentially be a bug in the JDK version.
>>
>> If you are using JDK 8, you can try using OpenJDK 8u265 as a possible
>> solution.
>>
>>
>> Best,
>> Feng
>>
>>
>> On Wed, Sep 27, 2023 at 8:08 PM rui chen  wrote:
>>
>>>
>>>
>>> rui chen  于2023年9月27日周三 19:32写道:
>>>
 hi Feng,

 Thank you for your reply,We observed the GC situation, there is no
 change before and after replacement, several tasks on our line using
 jemalloc have appeared stuck, after removing jemalloc, no stuck situation
 has been found.

 Best,
 rui

 Feng Jin  于2023年9月27日周三 19:19写道:

>
> hi rui,
>
> In general, checkpoint timeouts are typically associated with the
> job's processing performance. When using jemalloc, performance degradation
> is generally not observed.
>
> It is advisable to analyze whether the job's garbage collection (GC)
> has become more frequent.
>
>
> Best,
> Feng
>
>
> On Mon, Sep 25, 2023 at 1:21 PM rui chen  wrote:
>
>> After using the jemalloc memory allocator for a period of time,
>> checkpoint timeout occurs and tasks are stuck. Who has encountered this?
>> flink version:1.13.2, jiemalloc version: 5.3.0
>>
>