Re: [DISCUSS] Change the default restart-strategy to exponential-delay

2023-12-05 Thread Mason Chen
Hi Rui,

Sorry for the late reply. I was suggesting that perhaps we could do some
testing with Kubernetes wrt configuring values for the exponential restart
strategy. We've noticed that the default strategy in 1.17 caused a lot of
requests to the K8s API server for unstable deployments.

However, people in different Kubernetes setups will have different limits
so it would be challenging to provide a general benchmark. Another thing I
found helpful in the past is to refer to Kubernetes--for example, the
default strategy is exponential for pod restarts and we could draw
inspiration from what they have set as a general purpose default config.

Best,
Mason

On Sun, Nov 19, 2023 at 9:43 PM Rui Fan <1996fan...@gmail.com> wrote:

> Hi David and Mason,
>
> Thanks for your feedback!
>
> To David:
>
> > Given that the new default feels more complex than the current behavior,
> if we decide to do this I think it will be important to include the
> rationale you've shared in the documentation.
>
> Sounds make sense to me, I will add the related doc if we
> update the default strategy.
>
> To Mason:
>
> > I suppose we could do some benchmarking on what works well for the
> resource providers that Flink relies on e.g. Kubernetes. Based on
> conferences and blogs,
> > it seems most people are relying on Kubernetes to deploy Flink and the
> restart strategy has a large dependency on how well Kubernetes can scale to
> requests to redeploy the job.
>
> Sorry, I didn't understand what type of benchmarking
> we should do, could you elaborate on it? Thanks a lot.
>
> Best,
> Rui
>
> On Sat, Nov 18, 2023 at 3:32 AM Mason Chen  wrote:
>
>> Hi Rui,
>>
>> I suppose we could do some benchmarking on what works well for the
>> resource providers that Flink relies on e.g. Kubernetes. Based on
>> conferences and blogs, it seems most people are relying on Kubernetes to
>> deploy Flink and the restart strategy has a large dependency on how well
>> Kubernetes can scale to requests to redeploy the job.
>>
>> Best,
>> Mason
>>
>> On Fri, Nov 17, 2023 at 10:07 AM David Anderson 
>> wrote:
>>
>>> Rui,
>>>
>>> I don't have any direct experience with this topic, but given the
>>> motivation you shared, the proposal makes sense to me. Given that the new
>>> default feels more complex than the current behavior, if we decide to do
>>> this I think it will be important to include the rationale you've shared in
>>> the documentation.
>>>
>>> David
>>>
>>> On Wed, Nov 15, 2023 at 10:17 PM Rui Fan <1996fan...@gmail.com> wrote:
>>>
>>>> Hi dear flink users and devs:
>>>>
>>>> FLIP-364[1] intends to make some improvements to restart-strategy
>>>> and discuss updating some of the default values of exponential-delay,
>>>> and whether exponential-delay can be used as the default
>>>> restart-strategy.
>>>> After discussing at dev mail list[2], we hope to collect more feedback
>>>> from Flink users.
>>>>
>>>> # Why does the default restart-strategy need to be updated?
>>>>
>>>> If checkpointing is enabled, the default value is fixed-delay with
>>>> Integer.MAX_VALUE restart attempts and '1 s' delay[3]. It means
>>>> the job will restart infinitely with high frequency when a job
>>>> continues to fail.
>>>>
>>>> When the Kafka cluster fails, a large number of flink jobs will be
>>>> restarted frequently. After the kafka cluster is recovered, a large
>>>> number of high-frequency restarts of flink jobs may cause the
>>>> kafka cluster to avalanche again.
>>>>
>>>> Considering the exponential-delay as the default strategy with
>>>> a couple of reasons:
>>>>
>>>> - The exponential-delay can reduce the restart frequency when
>>>>   a job continues to fail.
>>>> - It can restart a job quickly when a job fails occasionally.
>>>> - The restart-strategy.exponential-delay.jitter-factor can avoid r
>>>>   estarting multiple jobs at the same time. It’s useful to prevent
>>>>   avalanches.
>>>>
>>>> # What are the current default values[4] of exponential-delay?
>>>>
>>>> restart-strategy.exponential-delay.initial-backoff : 1s
>>>> restart-strategy.exponential-delay.backoff-multiplier : 2.0
>>>> restart-strategy.exponential-delay.jitter-factor : 0.1
>>>> restart-strategy.exponential-delay.max-backoff : 5 min
>>>> restart-strat

Re: [DISCUSS] Change the default restart-strategy to exponential-delay

2023-11-17 Thread Mason Chen
Hi Rui,

I suppose we could do some benchmarking on what works well for the resource
providers that Flink relies on e.g. Kubernetes. Based on conferences and
blogs, it seems most people are relying on Kubernetes to deploy Flink and
the restart strategy has a large dependency on how well Kubernetes can
scale to requests to redeploy the job.

Best,
Mason

On Fri, Nov 17, 2023 at 10:07 AM David Anderson 
wrote:

> Rui,
>
> I don't have any direct experience with this topic, but given the
> motivation you shared, the proposal makes sense to me. Given that the new
> default feels more complex than the current behavior, if we decide to do
> this I think it will be important to include the rationale you've shared in
> the documentation.
>
> David
>
> On Wed, Nov 15, 2023 at 10:17 PM Rui Fan <1996fan...@gmail.com> wrote:
>
>> Hi dear flink users and devs:
>>
>> FLIP-364[1] intends to make some improvements to restart-strategy
>> and discuss updating some of the default values of exponential-delay,
>> and whether exponential-delay can be used as the default restart-strategy.
>> After discussing at dev mail list[2], we hope to collect more feedback
>> from Flink users.
>>
>> # Why does the default restart-strategy need to be updated?
>>
>> If checkpointing is enabled, the default value is fixed-delay with
>> Integer.MAX_VALUE restart attempts and '1 s' delay[3]. It means
>> the job will restart infinitely with high frequency when a job
>> continues to fail.
>>
>> When the Kafka cluster fails, a large number of flink jobs will be
>> restarted frequently. After the kafka cluster is recovered, a large
>> number of high-frequency restarts of flink jobs may cause the
>> kafka cluster to avalanche again.
>>
>> Considering the exponential-delay as the default strategy with
>> a couple of reasons:
>>
>> - The exponential-delay can reduce the restart frequency when
>>   a job continues to fail.
>> - It can restart a job quickly when a job fails occasionally.
>> - The restart-strategy.exponential-delay.jitter-factor can avoid r
>>   estarting multiple jobs at the same time. It’s useful to prevent
>>   avalanches.
>>
>> # What are the current default values[4] of exponential-delay?
>>
>> restart-strategy.exponential-delay.initial-backoff : 1s
>> restart-strategy.exponential-delay.backoff-multiplier : 2.0
>> restart-strategy.exponential-delay.jitter-factor : 0.1
>> restart-strategy.exponential-delay.max-backoff : 5 min
>> restart-strategy.exponential-delay.reset-backoff-threshold : 1h
>>
>> backoff-multiplier=2 means that the delay time of each restart
>> will be doubled. The delay times are:
>> 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, 300s, 300s, etc.
>>
>> The delay time is increased rapidly, it will affect the recover
>> time for flink jobs.
>>
>> # Option improvements
>>
>> We think the backoff-multiplier between 1 and 2 is more sensible,
>> such as:
>>
>> restart-strategy.exponential-delay.backoff-multiplier : 1.2
>> restart-strategy.exponential-delay.max-backoff : 1 min
>>
>> After updating, the delay times are:
>>
>> 1s, 1.2s, 1.44s, 1.728s, 2.073s, 2.488s, 2.985s, 3.583s, 4.299s,
>> 5.159s, 6.191s, 7.430s, 8.916s, 10.699s, 12.839s, 15.407s, 18.488s,
>> 22.186s, 26.623s, 31.948s, 38.337s, etc
>>
>> They achieve the following goals:
>> - When restarts are infrequent in a short period of time, flink can
>>   quickly restart the job. (For example: the retry delay time when
>>   restarting 5 times is 2.073s)
>> - When restarting frequently in a short period of time, flink can
>>   slightly reduce the restart frequency to prevent avalanches.
>>   (For example: the retry delay time when retrying 10 times is 5.1 s,
>>   and the retry delay time when retrying 20 times is 38s, which is not
>> very
>> large.)
>>
>> As @Mingliang Liu   mentioned at dev mail list: the
>> one-size-fits-all
>> default values do not exist. So our goal is that the default values
>> can be suitable for most jobs.
>>
>> Looking forward to your thoughts and feedback, thanks~
>>
>> [1] https://cwiki.apache.org/confluence/x/uJqzDw
>> [2] https://lists.apache.org/thread/5cgrft73kgkzkgjozf9zfk0w2oj7rjym
>> [3]
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/config/#restart-strategy-type
>> [4]
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/task_failure_recovery/#exponential-delay-restart-strategy
>>
>> Best,
>> Rui
>>
>


Re: Metric to capture decoding failure in flink sources

2023-10-10 Thread Mason Chen
Hi Prateek,

I agree, the reader should ideally expose the context to record metrics
about deserialization. One option is to defer deserialization to another
operator, say a RichMapFunction that has access to the RuntimeContext.

Best,
Mason

On Mon, Oct 9, 2023 at 12:42 PM Prateek Kohli 
wrote:

> Hi,
>
> I need to get the difference between records which are collected by the
> source and the records which are emitted.
> For eg - If deserialization fails while reading records from kafka, in
> that case I want to expose the difference between records collected from
> Kafka Broker and records emitted from Kafka operator after deserialization
> as a metric.
>
> But I think flink does not provide any such metrics.
>
> In Kafka Source I can have a workaround to get this metric:
>
> I can override the open method from KafkaRecordDeserializationSchema where
> a metric can be added to show decoding failures:
>
> @Override
> public void open(DeserializationSchema.InitializationContext context)
> throws Exception {
> context.getMetricGroup().gauge("decodingFailures", new Gauge()
> {
> @Override
> public Integer getValue()
> {
> return decodingFailures;
> }
> });
> }
>
> and at the time of deserialization I can increment that counter as below:
>
> @Override
> public void deserialize(ConsumerRecord record,
> Collector out)
> {
> try
> {
> //deserialize
> }
> catch (IOException | MMException e)
> {
> logger.error(String.format("Error received while decoding, in
> partition [%d] for topic [%s] at offset [%d]: %s",
> partition, topic, offset, e.toString()));
>
> decodingFailures++;
> }
>
> *But there is no such way to implement this in FileSource, as
> SimpleStreamFormat/Reader does not provide access to Context in any way.*
>
> *Is there any way I can get this metric in both File & Kafka Collector or
> any generic way to get this agnostic to what collector is being used?*
>
> Regards,
> Prateek Kohli
>


Re: Custom Prometheus metrics disappeared in 1.16.2 => 1.17.1 upgrade

2023-10-03 Thread Mason Chen
Hi Javier,

Is there a particular reason why you aren't leveraging Flink metric API? It
seems that functionality was internal to the PrometheusReporter
implementation and your usecase should've continued working if it had
depended on Flink's  metric API.

Best,
Mason

On Thu, Sep 28, 2023 at 2:51 AM Javier Vegas  wrote:

> Thanks! I saw the first change but missed the third one, that is the
> most that most probably explains my problem, most probably the metrics
> I was sending with the twitter/finagle statsReceiver ended up in the
> singleton default registry and were exposed by Flink with all the
> other Flink metrics, but now that Flink uses its own registry I have
> no idea where my custom metrics end up
>
>
> El mié, 27 sept 2023 a las 4:56, Kenan Kılıçtepe
> () escribió:
> >
> > Have you checked the metric  changes in 1.17.
> >
> > From release notes 1.17:
> >
> https://nightlies.apache.org/flink/flink-docs-master/release-notes/flink-1.17/
> >
> > Metric Reporters #
> > Only support reporter factories for instantiation #
> > FLINK-24235 #
> > Configuring reporters by their class is no longer supported. Reporter
> implementations must provide a MetricReporterFactory, and all
> configurations must be migrated to such a factory.
> >
> > UseLogicalIdentifier makes datadog consider metric as custom #
> > FLINK-30383 #
> > The Datadog reporter now adds a “flink.” prefix to metric identifiers if
> “useLogicalIdentifier” is enabled. This is required for these metrics to be
> recognized as Flink metrics, not custom ones.
> >
> > Use separate Prometheus CollectorRegistries #
> > FLINK-30020 #
> > The PrometheusReporters now use a separate CollectorRegistry for each
> reporter instance instead of the singleton default registry. This generally
> shouldn’t impact setups, but it may break code that indirectly interacts
> with the reporter via the singleton instance (e.g., a test trying to assert
> what metrics are reported).
> >
> >
> >
> > On Wed, Sep 27, 2023 at 11:11 AM Javier Vegas  wrote:
> >>
> >> I implemented some custom Prometheus metrics that were working on
> >> 1.16.2, with my configuration
> >>
> >> metrics.reporter.prom.factory.class:
> >> org.apache.flink.metrics.prometheus.PrometheusReporterFactory
> >> metrics.reporter.prom.port: 
> >>
> >> I could see both Flink metrics and my custom metrics on port  of
> >> my task managers
> >>
> >> After upgrading to 1.17.1, using the same configuration, I can see
> >> only the FLink metrics on port  of the task managers,
> >> the custom metrics are getting lost somewhere.
> >>
> >> The release notes for 1.17 mention
> >> https://issues.apache.org/jira/browse/FLINK-24235
> >> that removes instantiating reporters by name and forces using a
> >> factory, which I was already doing in 1.16.2. Do I need to do
> >> anything extra after those changes so my metrics are aggregated with
> >> the Flink ones?
> >>
> >> I am also seeing this error message on application startup (which I
> >> was already seeing in 1.16.2): "Multiple implementations of the same
> >> reporter were found in 'lib' and/or 'plugins' directories for
> >> org.apache.flink.metrics.prometheus.PrometheusReporterFactory. It is
> >> recommended to remove redundant reporter JARs to resolve used
> >> versions' ambiguity." Could that also explain the missing metrics?
> >>
> >> Thanks,
> >>
> >> Javier Vegas
>


Re: Task manager creation in Flink native Kubernetes (application mode)

2023-07-27 Thread Mason Chen
I'm also curious about this and how to make it better in the current native
Kubernetes integration model. Is there some way for Flink to discover and
surface the oom kill signal from Kubernetes?

Best,
Mason

On Tue, Jul 25, 2023 at 6:11 AM Alexis Sarda-Espinosa <
sarda.espin...@gmail.com> wrote:

> Hi everyone,
>
> From its inception (at least AFAIK), application mode for native
> Kubernetes has always created "unmanaged" pods for task managers. I would
> like to know if there are any specific benefits to this, or if on the other
> hand there are specific reasons not to use Kubernetes Deployments instead.
>
> In my case, I ask for a very specific reason. With the current approach,
> it is almost impossible to determine if a task manager crash was due to an
> OOM kill, given that there isn't any kind of history for the unmanaged pods.
>
> I could add that these TM pods also confuse Argo CD and their state is
> always "progressing". That's not so critical, but I don't know if anyone
> else finds that odd.
>
> I would be happy to know what others think.
>
> Regards,
> Alexis.
>


Re: Kafka coordinator not available

2023-07-20 Thread Mason Chen
Hi Lars,

You are likely seeing this Kafka client bug:
https://issues.apache.org/jira/browse/KAFKA-13840. The latest versions of
Flink have updated Kafka clients dependency to include this fix.

Best,
Mason

On Thu, Jul 20, 2023 at 9:21 AM Lars Skjærven  wrote:

> Hello,
> I experienced
> CoordinatorNotAvailableException in my flink jobs after our kafka supplier
> (aiven) did a maintenance update of the cluster. This update is performed
> by starting up new kafka nodes, copying over data, and switching over
> internally. The flink jobs runs as expected,  with the only issue that they
> are unsuccessful in committing group offsets.
>
> Restarting the job from checkpoint/savepoint resolves the issue, but I
> would rather not restart all jobs after every kafka maintenance update.
>
> Any ideas ?
>
> Kind regards,
> Lars Skjærven
>


Re: Multiple Kafka Source for a Data Pipeline

2023-07-07 Thread Mason Chen
Hi all,

This should be fixed. Are you setting a different `client.id.prefix` for
each KafkaSource? See this:
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/kafka/#additional-properties
and
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/kafka/#kafka-consumer-metrics

Best,
Mason

On Thu, Jul 6, 2023 at 9:43 AM Yogesh Rao  wrote:

> Hello Yaroslavl,
>
> I am using flink 1.17.1, so I am assuming that this isn’t fixed.
>
> Regards,
> Yogesh
>
> On Thu, 6 Jul 2023 at 10:06 PM, Yaroslav Tkachenko 
> wrote:
>
>> Hi Yogesh,
>>
>> Multiple kafka sources are supported. This warning only indicates that
>> multiple consumers won't be able to register JMX metrics. There are several
>> bugs reported about this, but I believe it should be fixed for consumers in
>> the newer Flink versions (1.14+).
>>
>> On Wed, Jul 5, 2023 at 9:32 PM Yogesh Rao  wrote:
>>
>>> Hi,
>>>
>>> Wanted to know if multiple kafka sources are supported in a data
>>> pipeline within flink.
>>>
>>> I am looking at a scenario where data transformation and enrichment
>>> needs to be done when a message from both the sources is received based on
>>> a common identifier.
>>>
>>> I coded the logic and it looks to be working however I see a warning
>>> trace as below, which makes me believe perhaps it's not designed to be
>>> supported, I understand its just JMX registration which has failed and does
>>> not have any effect on the actual execution of business logic.
>>>
>>> 10:22:54,876 WARN  org.apache.kafka.common.utils.AppInfoParser
>>>[] - Error registering AppInfo mbean
>>> javax.management.InstanceAlreadyExistsException:
>>> kafka.consumer:type=app-info,id=xyz-processing642706448-5
>>> at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436)
>>> ~[?:?]
>>> at
>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)
>>> ~[?:?]
>>> at
>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)
>>> ~[?:?]
>>> at
>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)
>>> ~[?:?]
>>> at
>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)
>>> ~[?:?]
>>> at
>>> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>>> ~[?:?]
>>> at
>>> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
>>> ~[kafka-clients-3.2.3.jar:?]
>>> at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:816)
>>> ~[kafka-clients-3.2.3.jar:?]
>>> at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:666)
>>> ~[kafka-clients-3.2.3.jar:?]
>>> at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:647)
>>> ~[kafka-clients-3.2.3.jar:?]
>>> at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:627)
>>> ~[kafka-clients-3.2.3.jar:?]
>>> at
>>> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.(KafkaPartitionSplitReader.java:88)
>>> ~[flink-connector-kafka-1.17.1.jar:1.17.1]
>>> at
>>> org.apache.flink.connector.kafka.source.KafkaSource.lambda$createReader$1(KafkaSource.java:160)
>>> ~[flink-connector-kafka-1.17.1.jar:1.17.1]
>>> at
>>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.createSplitFetcher(SplitFetcherManager.java:196)
>>> ~[flink-connector-base-1.17.1.jar:1.17.1]
>>> at
>>> org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.addSplits(SingleThreadFetcherManager.java:107)
>>> ~[flink-connector-base-1.17.1.jar:1.17.1]
>>>
>>> I looked at the implementation as well and found not enough attributes
>>> were used to make the mbean id unique.
>>>
>>> Please let me know if this is a bug, I can raise a JIRA and perhaps even
>>> contribute towards its fix.
>>>
>>> Regards,
>>> -Yogesh
>>>
>>


Re: Difference between different values for starting offset

2023-07-03 Thread Mason Chen
Hi Oscar,

You are correct about the OffsetInitializer being only effective when there
is no Flink state--in addition, if you have partition discovery on, this
initializer will be reused for the new partitions (i.e. splits) discovered.
Assuming the job is continuing from the offset in Flink state, there is no
difference between the two strategies. This is because the
`auto.offset.reset` maps to the `OffsetResetStrategy` and
OffsetInitializer.earliest uses `earliest` too.

Best,
Mason

On Mon, Jul 3, 2023 at 6:56 AM Oscar Perez via user 
wrote:

> Hei,
>
> Looking at the flink documentation for kafkasource I see the following
> values for starting offset:
>
> OffsetInitializer.earliest
> OffsetInitializer.latest
> OffsetInitializer.commitedOffset(OffsetResetStrategy.EARLIEST)
>
> From what I understand OffsetInitializer.earliest uses earliest offset the
> first time but later deployments will use the committed offset in the flink
> state to resume from there. If that is the case what is the difference
> between OffsetInitializer.earliest and
> commitedOffset(OffsetResetStrategy.EARLIEST) if both continue from the
> committed offset after redeployment?
>
> Thanks!
> Oscar
>


Re: Kafka Quotas & Consumer Group Client ID (Flink 1.15)

2023-05-27 Thread Mason Chen
Hi Hatem,

Before a PR, you would need to create a JIRA to track this issue and have a
committer assign that JIRA to you. Make sure to go through
https://flink.apache.org/how-to-contribute/overview/ as it will make
contributions smoother.

Best,
Mason

On Thu, May 25, 2023 at 10:30 AM Hatem Mostafa  wrote:

> Hello Mason,
>
> I created that PR <https://github.com/apache/flink/pull/22662/files> for
> a suggestion on how to address the issue so that it would enable us to set
> client id. Happy to do any modifications to get this merged for the future.
>
>
> On Thu, May 25, 2023 at 12:55 AM Mason Chen 
> wrote:
>
>> Hi Hatem,
>>
>> The reason for setting different client ids is to due to Kafka client
>> metrics conflicts and the issue is documented here:
>> https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#kafka-consumer-metrics.
>> I think that the warning log is benign if you are using Flink's metric
>> system for monitoring the Kafka connector and it would be nice to introduce
>> an option in the connector to configure the same `client.id` across all
>> tasks for the quota feature you mentioned.
>>
>> Best,
>> Mason
>>
>> On Wed, May 24, 2023 at 5:18 AM Hatem Mostafa  wrote:
>>
>>> Hello Martijn,
>>>
>>> Yes, checkpointing is enabled and the offsets are committed without a
>>> problem. I think I might have figured out the answer to my second question
>>> based on my understanding of this code
>>> <https://github.com/apache/flink/blob/0612a997ddcc791ee54f500fbf1299ce04987679/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java>,
>>> flink uses low level consumer that does not trigger consumer.subscribe
>>> which makes the consumer group not appear as an active member using
>>> kafka-consumer-group tool, The consumer group functionality is fine though.
>>> However I am more interested in an answer for my first question. Kafka
>>> Quotas is one of the important features of using kafka and with flink
>>> setting a different client id for every consumer in the same consumer group
>>> makes it hard to set quotas for that consumer group. What is the reason
>>> behind setting different client ids?
>>>
>>> On Wed, May 24, 2023 at 1:13 PM Martijn Visser 
>>> wrote:
>>>
>>>> Hi Hatem,
>>>>
>>>> Could it be that you don't have checkpointing enabled? Flink only
>>>> commits its offset when a checkpoint has been completed successfully, as
>>>> explained on
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#consumer-offset-committing
>>>>
>>>> Best regards,
>>>>
>>>> Martijn
>>>>
>>>>
>>>> On Tue, May 23, 2023 at 6:43 PM Hatem Mostafa  wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I have two questions that are related to each other:
>>>>>
>>>>> *First question:*
>>>>>
>>>>> I have been trying to set `client.id` to set a kafka client quota
>>>>> <https://kafka.apache.org/documentation.html#design_quotas> for
>>>>> consumer_byte_rate since whenever our kafka job gets redeployed it reads a
>>>>> lot of data from our kafka cluster causing a denial of service for our
>>>>> kafka cluster. However `client.id` gets overridden by flink source
>>>>> here
>>>>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L87>.
>>>>> How would I enforce quotas for flink kafka source?
>>>>>
>>>>> *Second question:*
>>>>>
>>>>> Also something I didn't quite understand when describing our consumer
>>>>> group in kafka why I don't see the metadata for the consumer group
>>>>> information (consumer id, client id & host) and I get that the consumer
>>>>> group has no active members but it's actually active and consuming.
>>>>>
>>>>> *Example describing a flink consumer group*
>>>>>
>>>>>> ./kafka-consumer-groups.sh --bootstrap-server
>>>>>> kafka-server-address:9092   --describe --group flink-consumer-group
>>>>>> Consumer group 'flink-consumer-group' has no active members.
>>>>>>

Re: Kafka Quotas & Consumer Group Client ID (Flink 1.15)

2023-05-24 Thread Mason Chen
Hi Hatem,

The reason for setting different client ids is to due to Kafka client
metrics conflicts and the issue is documented here:
https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#kafka-consumer-metrics.
I think that the warning log is benign if you are using Flink's metric
system for monitoring the Kafka connector and it would be nice to introduce
an option in the connector to configure the same `client.id` across all
tasks for the quota feature you mentioned.

Best,
Mason

On Wed, May 24, 2023 at 5:18 AM Hatem Mostafa  wrote:

> Hello Martijn,
>
> Yes, checkpointing is enabled and the offsets are committed without a
> problem. I think I might have figured out the answer to my second question
> based on my understanding of this code
> ,
> flink uses low level consumer that does not trigger consumer.subscribe
> which makes the consumer group not appear as an active member using
> kafka-consumer-group tool, The consumer group functionality is fine though.
> However I am more interested in an answer for my first question. Kafka
> Quotas is one of the important features of using kafka and with flink
> setting a different client id for every consumer in the same consumer group
> makes it hard to set quotas for that consumer group. What is the reason
> behind setting different client ids?
>
> On Wed, May 24, 2023 at 1:13 PM Martijn Visser 
> wrote:
>
>> Hi Hatem,
>>
>> Could it be that you don't have checkpointing enabled? Flink only commits
>> its offset when a checkpoint has been completed successfully, as explained
>> on
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#consumer-offset-committing
>>
>> Best regards,
>>
>> Martijn
>>
>>
>> On Tue, May 23, 2023 at 6:43 PM Hatem Mostafa  wrote:
>>
>>> Hello,
>>>
>>> I have two questions that are related to each other:
>>>
>>> *First question:*
>>>
>>> I have been trying to set `client.id` to set a kafka client quota
>>>  for
>>> consumer_byte_rate since whenever our kafka job gets redeployed it reads a
>>> lot of data from our kafka cluster causing a denial of service for our
>>> kafka cluster. However `client.id` gets overridden by flink source here
>>> .
>>> How would I enforce quotas for flink kafka source?
>>>
>>> *Second question:*
>>>
>>> Also something I didn't quite understand when describing our consumer
>>> group in kafka why I don't see the metadata for the consumer group
>>> information (consumer id, client id & host) and I get that the consumer
>>> group has no active members but it's actually active and consuming.
>>>
>>> *Example describing a flink consumer group*
>>>
 ./kafka-consumer-groups.sh --bootstrap-server
 kafka-server-address:9092   --describe --group flink-consumer-group
 Consumer group 'flink-consumer-group' has no active members.
 GROUP   TOPIC   PARTITION
  CURRENT-OFFSET  LOG-END-OFFSET  LAG CONSUMER-ID HOST
  CLIENT-ID
 flink_consumer_group topic_name 1 514588965   514689721
   100756   -
  - -
>>>
>>>
>>>
>>> *Example describing a normal consumer group written using a confluent
>>> kafka python library.*
>>>
 ./kafka-consumer-groups.sh ---bootstrap-server
 kafka-server-address:9092  --describe --group
 python_confluent_kafka_consumer
 GROUPTOPIC
  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG CONSUMER-ID

   HOST   CLIENT-ID
 python_confluent_kafka_consumer topic_name   1
  17279532  17279908  376
 python_confluent_kafka_consumer-345fa1d1-1f76-4e38-9aad-dcc120c5a52e
 / python_confluent_kafka_consumer_client_id
>>>
>>>
>>>
>>> I am using flink version 1.15.
>>>
>>> Thanks,
>>> Hatem
>>>
>>>
>>>
>>>


Re: Question about Flink metrics

2023-05-05 Thread Mason Chen
Hi Neha,

For the jobs you care about, you can attach additional labels using
`scope-variables-additional` [1]. The example located in the same page
showcases how you can configure KV pairs in its map configuration. Be sure
to replace the reporter name with the name of your prometheus reporter!

[1]
https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/metric_reporters/#scope-variables-additional

Best,
Mason

On Thu, May 4, 2023 at 11:35 PM neha goyal  wrote:

> Hello,
> I have a question about the Prometheus metrics. I am able to fetch the
> metrics from the following expression.
> sum(flink_jobmanager_job_numRestarts{job_name="$job_name"}) by (job_name)
> Now I am interested in only a few jobs and I want to give them a label.
> How to achieve this? How to give an additional label to Flink Prometheus
> metrics so that I can fetch the metrics only for those jobs having that
> label? This tag I need to set on the job level. Few jobs will have that tag
> and others won't.
>
>
>


Re: Waiting for a signal on one stream to start processing on another

2023-03-08 Thread Mason Chen
Hi Yuval,

It seems you are trying to perform bootstrapping on a Flink job by doing
the bounded read first. A good pattern to follow is to use HybridSource [1]
and the docs have some examples with File and Kafka sources. The point of
switching can be coordinated by the source so that you can dynamically
infer where to start from stream B. Please take a look at the link and let
me know if you have additional questions.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/

Best,
Mason

On Wed, Mar 8, 2023 at 11:12 AM Yuval Itzchakov  wrote:

> Hi,
>
> I have a use-case where I have two streams, call them A and B. I need to
> consume stream A up to a certain point, and only then start processing on
> stream B.
>
> What could be a way to go about this?
>


Re: Various Flink Deployment States

2023-03-02 Thread Mason Chen
Also, I noticed that
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.4/docs/custom-resource/reference/#jobstatus
reflects the job status provided Flink. Can we use the `JobStatus` enum
from Flink instead of just a String (along the same idea to make using
these statuses a little clearer)?

On Thu, Mar 2, 2023 at 3:10 PM Mason Chen  wrote:

> Hi all,
>
> There are quite a few states or statuses for a Flink deployment e.g.
> deployment status, job status, job manager status etc. I understand these
> are useful to debug an error with deployment since there are multiple
> points of failure. However, I want to understand how a user can verify that
> a deployment upgrade has been successful.
>
> This requires two checks:
> 1. Deployment spec was consumed by the operator successfully (can check
> via current and last generation id).
> 2. Deployment running successfully (can check that job state is RUNNING)
>
> However, I found a resource lifecycle state [1] and I'm not sure how that
> fits in this flow. Is the intention that this is the singular state that
> allows the user to check if a deployment upgrade was successful (the
> documentation doesn't make it clear, but the source code does suggest
> this)? If so, how does this state satisfy point 1 above?
>
> [1]
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.4/docs/custom-resource/reference/#lifecycleState
>
> Best,
> Mason
>


Various Flink Deployment States

2023-03-02 Thread Mason Chen
Hi all,

There are quite a few states or statuses for a Flink deployment e.g.
deployment status, job status, job manager status etc. I understand these
are useful to debug an error with deployment since there are multiple
points of failure. However, I want to understand how a user can verify that
a deployment upgrade has been successful.

This requires two checks:
1. Deployment spec was consumed by the operator successfully (can check via
current and last generation id).
2. Deployment running successfully (can check that job state is RUNNING)

However, I found a resource lifecycle state [1] and I'm not sure how that
fits in this flow. Is the intention that this is the singular state that
allows the user to check if a deployment upgrade was successful (the
documentation doesn't make it clear, but the source code does suggest
this)? If so, how does this state satisfy point 1 above?

[1]
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.4/docs/custom-resource/reference/#lifecycleState

Best,
Mason


Re: Fast and slow stream sources for Interval Join

2023-02-27 Thread Mason Chen
Hi all,

It's true that the problem can be handled by caching records in state.
However, there is an alternative using `watermark alignment` with Flink
1.15+ [1] which does the desired synchronization that you described while
reducing the size of state from the former approach.

To use this with two topics of different speeds, you would need to define
two Kafka sources, each corresponding to a topic. This limitation is
documented in [1]. This limitation is resolved in Flink 1.17 by split level
(partition level in the case of Kafka) watermark alignment, so one Kafka
source reading various topics can align on the partitions of the different
topics.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment-_beta_

Best,
Mason

On Mon, Feb 27, 2023 at 8:11 AM Alexis Sarda-Espinosa <
sarda.espin...@gmail.com> wrote:

> Hello,
>
> I had this question myself and I've seen it a few times, the answer is
> always the same, there's currently no official way to handle it without
> state.
>
> Regards,
> Alexis.
>
> On Mon, 27 Feb 2023, 14:09 Remigiusz Janeczek,  wrote:
>
>> Hi,
>>
>> How to handle a case where one of the Kafka topics used for interval join
>> is slower than the other? (Or a case where one topic lags behind)
>> Is there a way to stop consuming from the fast topic and wait for the
>> slow one to catch up? I want to avoid running out of memory (or keeping a
>> very large state) and I don't want to discard any data from the fast topic
>> until a watermark from the slow topic allows that.
>>
>> Best Regards
>>
>


Flink K8s operator pod section of CRD

2023-02-23 Thread Mason Chen
Hi all,

Why does the FlinkDeployment CRD refer to the Pod class instead of the
PodTemplate class from the fabric8 library? As far as I can tell, the only
difference is that the Pod class exposes the PodStatus, which doesn't seem
mutable. Thanks in advance!

Best,
Mason


Re: Kafka Sink Kafka Producer metrics?

2023-02-06 Thread Mason Chen
Hi Andrew,

I misread the docs: `register.producer.metrics` is mentioned here, but it
is not on by default.
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-connector-metrics

Best,
Mason

On Mon, Feb 6, 2023 at 6:19 PM Mason Chen  wrote:

> Hi Andrew,
>
> Unfortunately, the functionality is undocumented, but you can set the
> property `register.producer.metrics` to true in your Kafka client
> properties map. This is a JIRA to document the feature:
> https://issues.apache.org/jira/browse/FLINK-30932
>
> Best,
> Mason
>
> On Mon, Feb 6, 2023 at 11:49 AM Andrew Otto  wrote:
>
>> Hi!
>>
>> Kafka Source will emit KafkaConsumer metrics
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-consumer-metrics>
>> .
>>
>> It looks like Kafka Sink
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#monitoring-1>
>> does not emit KafkaProducer metrics
>> <https://kafka.apache.org/documentation/#producer_monitoring>.  Is this
>> correct?  If so, why not?
>>
>> Thanks,
>> -Andrew Otto
>>  Wikimedia Foundation
>>
>


Re: Kafka Sink Kafka Producer metrics?

2023-02-06 Thread Mason Chen
Hi Andrew,

Unfortunately, the functionality is undocumented, but you can set the
property `register.producer.metrics` to true in your Kafka client
properties map. This is a JIRA to document the feature:
https://issues.apache.org/jira/browse/FLINK-30932

Best,
Mason

On Mon, Feb 6, 2023 at 11:49 AM Andrew Otto  wrote:

> Hi!
>
> Kafka Source will emit KafkaConsumer metrics
> 
> .
>
> It looks like Kafka Sink
> 
> does not emit KafkaProducer metrics
> .  Is this
> correct?  If so, why not?
>
> Thanks,
> -Andrew Otto
>  Wikimedia Foundation
>


Re: Flink Kubernetes Operator podTemplate and 'app' pod label bug?

2023-01-19 Thread Mason Chen
@Andrew I was also confused by this earlier and FYI this line where it is
referenced
https://github.com/apache/flink-kubernetes-operator/blame/7d5bf9536bdfbf86de5803766b28e503cd32ee04/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/utils/StandaloneKubernetesUtils.java#L43

On Thu, Jan 19, 2023 at 1:59 PM Őrhidi Mátyás 
wrote:

> On a side note, we should probably use a qualified label name instead of
> the pretty common app here. WDYT Gyula?
>
> On Thu, Jan 19, 2023 at 1:48 PM Gyula Fóra  wrote:
>
>> Hi!
>>
>> The app label itself is used by Flink internally for a different purpose
>> so it’s overriden. This is completely expected.
>>
>> I think it would be better to use some other label :)
>>
>> Cheers,
>> Gyula
>>
>> On Thu, 19 Jan 2023 at 19:02, Andrew Otto  wrote:
>>
>>> Hello!
>>>
>>> I'm seeing an unexpected label value assignment happening, and I'm not
>>> sure how it's happening.  It is possible it is in my own helm charts and
>>> templates somewhere, but I'm not seeing it, so I'm beginning to think this
>>> is happening in the FlinkDeployment CRD in the operator code somewhere.
>>>
>>> I'm using FlinkDeployment podTemplate
>>> 
>>> to add an 'app' label:
>>>
>>>  podTemplate:
>>> apiVersion: v1
>>> kind: Pod
>>> metadata:
>>>   labels:
>>> app: flink-app
>>> release: flink-example
>>> ...
>>>
>>> I also have this app label set in the FlinkDeployment labels:
>>>
>>> kind: FlinkDeployment
>>> metadata:
>>>   name: flink-app-flink-example
>>>   labels:
>>> app: flink-app
>>> chart: flink-app-0.1.1
>>> release: flink-example
>>>
>>> Since I've set app: flink-app in the podTemplate, I would expect all
>>> pods to get this label.  The FlinkDeployment resource has this label
>>> value as expected.  However, I see that in the pods, as well as the
>>> Deployment that are created by FlinkDeployment:
>>>
>>> *$ kubectl -n flink-app0 describe deployments flink-app-flink-example*
>>> ...
>>> Name:   flink-app-flink-example
>>> Namespace:  flink-app0
>>> CreationTimestamp:  Thu, 19 Jan 2023 12:42:05 -0500
>>> Labels: app=flink-app-flink-example
>>> component=jobmanager
>>> ...
>>>
>>> Pod Template:
>>>   Labels:   app=flink-app-flink-example
>>> component=jobmanager
>>> release=flink-example
>>> ...
>>>
>>>
>>> *$ kubectl -n flink-app0 describe pod
>>> flink-app-flink-example-d974cb595-788ch*
>>> ...
>>> Labels:   app=flink-app-flink-example
>>>   component=jobmanager
>>>   pod-template-hash=d974cb595
>>>   release=flink-example
>>> ...
>>>
>>>
>>> I'd expect the app label to be 'flink-app' for at least the Deployment
>>> PodTemplate and the Pod, if not the Deployment itself too.
>>>
>>> Something is overriding the app label in podTemplate, and I don't think
>>> it's my chart or installation.  I looked in flink-kubernetes-operator code
>>> and I didn't find where this was happening either.  I am not setting e.g.
>>> kubernetes.jobmanager.labels
>>> 
>>> .
>>>
>>> Is this expected?
>>>
>>> Thank you!
>>>
>>> -Andrew Otto
>>>  Wikimedia Foundation
>>>
>>>
>>>
>>>
>>>
>>>


Re: [Security] - Critical OpenSSL Vulnerability

2022-11-02 Thread Mason Chen
I saw the update from them as well, so +1, the downgrade in severity makes
it less urgent for us now and I’m fine with waiting for the next regular
Flink docker release. We would need to wait for the upstream image provider
to patch it as well. Thanks!

Best,
Mason

On Tue, Nov 1, 2022 at 9:18 AM Martijn Visser 
wrote:

> Hi all,
>
> Looking at the blog with details
> https://www.openssl.org/blog/blog/2022/11/01/email-address-overflows it's
> shown that vulnerability has been downgraded to High. I don't think that
> warrants an emergency re-release of the images.
>
> Best regards,
>
> Martijn
>
> Op di 1 nov. 2022 om 15:06 schreef Chesnay Schepler 
>
>> We just push new images with the same tags.
>>
>> On 01/11/2022 14:35, Matthias Pohl wrote:
>>
>> The Docker image for Flink 1.12.7 uses an older base image which comes
>> with openssl 1.1.1k. There was a previous post in the OpenSSL mailing list
>> reporting a low vulnerability being fixed with 3.0.6 and 1.1.1r (both
>> versions being explicitly mentioned) [1]. Therefore, I understand the post
>> in a way that only 3.0.x would be affected and, as a consequence, Docker
>> images below 1.13- would be fine.
>>
>> I verified Mason's finding that only 1.14+ Docker images are affected. No
>> entire release is necessary as far as I understand. Theoretically, we would
>> only have to push newer Docker images to the registry. I'm not sure what
>> the right approach is when it comes to versioning. I'm curious about
>> Chesnay's opinion on that one (CC'd).
>>
>> [1]
>> https://mta.openssl.org/pipermail/openssl-announce/2022-October/000233.html
>>
>> On Tue, Nov 1, 2022 at 7:06 AM Prasanna kumar <
>> prasannakumarram...@gmail.com> wrote:
>>
>>> Could we also get an emergency patch to 1.12 version as well , because
>>> upgrading flink to a newer version on production in a short time would be
>>> high in effort and longer in duration as well .
>>>
>>> Thanks,
>>> Prasanna
>>>
>>> On Tue, Nov 1, 2022 at 11:30 AM Prasanna kumar <
>>> prasannakumarram...@gmail.com> wrote:
>>>
>>>> If flink version 1.12 also affected ?
>>>>
>>>> Thanks,
>>>> Prasanna.
>>>>
>>>> On Tue, Nov 1, 2022 at 10:40 AM Mason Chen 
>>>> wrote:
>>>>
>>>>> Hi Tamir and Martjin,
>>>>>
>>>>> We have also noticed this internally. So far, we have found that the
>>>>> *latest* Flink Java 11/Scala 2.12 docker images *1.14, 1.15, and 1.16*
>>>>> are affected, which all have the *openssl 3.0.2 *dependency. It would
>>>>> be good to discuss an emergency release when this patch comes out
>>>>> tomorrow, as it is the highest priority level from their severity rating.
>>>>>
>>>>> Best,
>>>>> Mason
>>>>>
>>>>> On Mon, Oct 31, 2022 at 1:10 PM Martijn Visser <
>>>>> martijnvis...@apache.org> wrote:
>>>>>
>>>>>> Hi Tamir,
>>>>>>
>>>>>> That depends on a) if Flink is vulnerable and b) if yes, how
>>>>>> vulnerable that would be.
>>>>>>
>>>>>> Best regards,
>>>>>>
>>>>>> Martijn
>>>>>>
>>>>>> Op ma 31 okt. 2022 om 19:22 schreef Tamir Sagi <
>>>>>> tamir.s...@niceactimize.com>
>>>>>>
>>>>>>> Hey all,
>>>>>>>
>>>>>>> Following that link
>>>>>>> https://eu01.z.antigena.com/l/CjXA7qEmnn79gc24BA2Hb6K2OVR-yGlLfMyp4smo5aXj5Z6WC0dSiHCRPqjSz972DkRNssUoTbxKmp5Pi3IaaVB983yfLJ9MUZY9LYtnBMEKJP5DcQqmhR3SktltkbVG8b7nSRa84kWSnwNJFuXFLA2GrMLTVG7mXdy59-ykolsAWAVAJSDgRdWCv6xN0iczvQ
>>>>>>>
>>>>>>>
>>>>>>> due to critical vulnerability , there will be an important release
>>>>>>> of OpenSSl v3.0.7 tomorrow November 1st.
>>>>>>>
>>>>>>> Is there any plan to update Flink with the newest version?
>>>>>>>
>>>>>>> Thanks.
>>>>>>> Tamir
>>>>>>>
>>>>>>>
>>>>>>> Confidentiality: This communication and any attachments are intended
>>>>>>> for the above-named persons only and may be confidential and/or legally
>>>>>>> privileged. Any opinions expressed in this communication are not
>>>>>>> necessarily those of NICE Actimize. If this communication has come to 
>>>>>>> you
>>>>>>> in error you must take no action based on it, nor must you copy or show 
>>>>>>> it
>>>>>>> to anyone; please delete/destroy and inform the sender by e-mail
>>>>>>> immediately.
>>>>>>> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
>>>>>>> Viruses: Although we have taken steps toward ensuring that this
>>>>>>> e-mail and attachments are free from any virus, we advise that in 
>>>>>>> keeping
>>>>>>> with good computing practice the recipient should ensure they are 
>>>>>>> actually
>>>>>>> virus free.
>>>>>>>
>>>>>> --
>>>>>> Martijn
>>>>>> https://twitter.com/MartijnVisser82
>>>>>> https://github.com/MartijnVisser
>>>>>>
>>>>>
>>


Re: [Security] - Critical OpenSSL Vulnerability

2022-10-31 Thread Mason Chen
Hi Tamir and Martjin,

We have also noticed this internally. So far, we have found that the
*latest* Flink Java 11/Scala 2.12 docker images *1.14, 1.15, and 1.16* are
affected, which all have the *openssl 3.0.2 *dependency. It would be good
to discuss an emergency release when this patch comes out tomorrow, as it
is the highest priority level from their severity rating.

Best,
Mason

On Mon, Oct 31, 2022 at 1:10 PM Martijn Visser 
wrote:

> Hi Tamir,
>
> That depends on a) if Flink is vulnerable and b) if yes, how vulnerable
> that would be.
>
> Best regards,
>
> Martijn
>
> Op ma 31 okt. 2022 om 19:22 schreef Tamir Sagi <
> tamir.s...@niceactimize.com>
>
>> Hey all,
>>
>> Following that link
>>
>> https://mta.openssl.org/pipermail/openssl-announce/2022-October/000238.html
>>
>> due to critical vulnerability , there will be an important release of
>> OpenSSl v3.0.7 tomorrow November 1st.
>>
>> Is there any plan to update Flink with the newest version?
>>
>> Thanks.
>> Tamir
>>
>>
>> Confidentiality: This communication and any attachments are intended for
>> the above-named persons only and may be confidential and/or legally
>> privileged. Any opinions expressed in this communication are not
>> necessarily those of NICE Actimize. If this communication has come to you
>> in error you must take no action based on it, nor must you copy or show it
>> to anyone; please delete/destroy and inform the sender by e-mail
>> immediately.
>> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
>> Viruses: Although we have taken steps toward ensuring that this e-mail
>> and attachments are free from any virus, we advise that in keeping with
>> good computing practice the recipient should ensure they are actually virus
>> free.
>>
> --
> Martijn
> https://twitter.com/MartijnVisser82
> https://github.com/MartijnVisser
>


Re: Job uptime metric in Flink Operator managed cluster

2022-10-13 Thread Mason Chen
Hi all,

I think what Meghajit is trying to understand is how to measure the uptime
of a submitted Flink job. Prior to the K8s operator, perhaps the job
manager was torn down with the job shutdown so the uptime value would stop;
therefore, the uptime value also measures how long the job was running.
This is not the behavior with the k8s operator as you have described, so a
different metric must be used.

With 1.15, there are some new metrics [1]--does Time where
 is "running" work for you?

[1]
https://nightlies.apache.org/flink/flink-docs-stable/docs/ops/metrics/#availability

Best,
Mason

On Wed, Oct 12, 2022 at 11:36 PM Gyula Fóra  wrote:

> Sorry, what I said applies to Flink 1.15+ and the savepoint upgrade mode
> (not stateless).
>
> In any case if there is no job manager there are no metrics... So not sure
> how to answer your question.
>
> Gyula
>
> On Thu, Oct 13, 2022 at 8:24 AM Meghajit Mazumdar <
> meghajit.mazum...@gojek.com> wrote:
>
>> Hi Gyula,
>>
>> Thanks for the prompt response.
>>
>> > The Flink operator currently does not delete the jobmanager pod when a
>> deployment is suspended.
>> Are you sure this is true ? I have re-tried this many times, but each
>> time the pods get deleted, along with the deployment resources.
>>
>> Additionally, the flink-operator logs also denote that the resources are
>> being deleted ( highlighted in red) after I change the state in the
>> FlinkDeployment yaml from running --> suspended
>> ( note: my FlinkDeployment name is *my-sample-dagger-v7 *)
>>
>> 2022-10-13 06:11:47,392 o.a.f.k.o.c.FlinkDeploymentController [INFO
>> ][flink-operator/my-sample-dagger-v7] End of reconciliation
>> 2022-10-13 06:11:49,879 o.a.f.k.o.c.FlinkDeploymentController [INFO
>> ][flink-operator/parquet-savepoint-test] Starting reconciliation
>> 2022-10-13 06:11:49,880 o.a.f.k.o.o.JobStatusObserver  [INFO
>> ][flink-operator/parquet-savepoint-test] Observing job status
>> 2022-10-13 06:11:52,710 o.a.f.k.o.c.FlinkDeploymentController [INFO
>> ][flink-operator/my-sample-dagger-v7] Starting reconciliation
>> 2022-10-13 06:11:52,712 o.a.f.k.o.o.JobStatusObserver  [INFO
>> ][flink-operator/my-sample-dagger-v7] Observing job status
>> 2022-10-13 06:11:52,721 o.a.f.k.o.o.JobStatusObserver  [INFO
>> ][flink-operator/my-sample-dagger-v7] Job status (RUNNING) unchanged
>> 2022-10-13 06:11:52,723 o.a.f.k.o.c.FlinkConfigManager [INFO
>> ][flink-operator/my-sample-dagger-v7] Generating new config
>> 2022-10-13 06:11:52,725 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler
>> [INFO ][flink-operator/my-sample-dagger-v7] Detected spec change, starting
>> reconciliation.
>>
>>
>> 2022-10-13 06:11:52,788 o.a.f.k.o.r.d.AbstractJobReconciler [INFO
>> ][flink-operator/my-sample-dagger-v7] Stateless job, ready for upgrade
>> 2022-10-13 06:11:52,798 o.a.f.k.o.s.FlinkService   [INFO
>> ][flink-operator/my-sample-dagger-v7] Job is running, cancelling job.
>> 2022-10-13 06:11:52,815 o.a.f.k.o.s.FlinkService   [INFO
>> ][flink-operator/my-sample-dagger-v7] Job successfully cancelled.
>> 2022-10-13 06:11:52,815 o.a.f.k.o.u.FlinkUtils [INFO
>> ][flink-operator/my-sample-dagger-v7] Deleting JobManager deployment and HA
>> metadata.
>> 2022-10-13 06:11:56,863 o.a.f.k.o.u.FlinkUtils [INFO
>> ][flink-operator/my-sample-dagger-v7] Cluster shutdown completed.
>> 2022-10-13 06:11:56,903 o.a.f.k.o.u.FlinkUtils [INFO
>> ][flink-operator/my-sample-dagger-v7] Cluster shutdown completed.
>> 2022-10-13 06:11:56,904 o.a.f.k.o.c.FlinkDeploymentController [INFO
>> ][flink-operator/my-sample-dagger-v7] End of reconciliation
>> 2022-10-13 06:11:56,928 o.a.f.k.o.c.FlinkDeploymentController [INFO
>> ][flink-operator/my-sample-dagger-v7] Starting reconciliation
>> 2022-10-13 06:11:56,930 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler
>> [INFO ][flink-operator/my-sample-dagger-v7] Resource fully reconciled,
>> nothing to do...
>>
>> Also, my original doubt was around the uptime metric itself. What is the
>> correct metric to use for monitoring the status ( running or suspended) of
>> a job which is being managed by the Flink Operator ?
>> The  *jobmanager_job_uptime_value * seems to be giving the wrong status
>> as mentioned in the earlier mail.
>>
>> Regards,
>> Meghajit
>>
>>
>> On Wed, Oct 12, 2022 at 7:32 PM Gyula Fóra  wrote:
>>
>>> Hello!
>>> The Flink operator currently does not delete the jobmanager pod when a
>>> deployment is suspended.
>>> This way the rest api stay available but no other resources are consumed
>>> (taskmanagers are deleted)
>>>
>>> When you delete the FlinkDeployment resource completely, then the
>>> jobmanager deployment is also deleted.
>>>
>>> In theory we could improve the logic to eventually delete the jobmanager
>>> for suspended resources but we currently use this is a way to guarantee
>>> more resiliency for the operator flow.
>>>
>>> Cheers,
>>> Gyula
>>>
>>> On Wed, Oct 12, 2022 at 3:56 PM Meghajit Mazumdar <
>>> meghajit.mazum...@gojek.com> wrote:
>>>
 Hello,

Re: KafkaSource, consumer assignment, and Kafka ‘stretch’ clusters for multi-DC streaming apps

2022-10-13 Thread Mason Chen
Hi Andrew and Martijn,

Thanks for looping me in, this is an interesting discussion! I'm trying to
solve a higher level problem about Kafka topic routing/assignment with
FLIP-246. The main idea is that there can exist an external service that
can provide the coordination between Kafka and Flink to dynamically change
clusters and topics, but we don't get into the partition granularity.

I'm going to try to answer some of your questions, let me know if I have
missed something:

1.

> I don't think running a single Flink application cross DC would work well;
> there's too much inter-node traffic happening, and the Flink tasks don't
> have any DC awareness.


I agree. Running a single Flink application cross DC tends to increase
costs significantly with public cloud providers in which you accrue cross
DC costs. You can follow [1] to allow the KafkaSource to fully utilize
Kafka rack awareness.

2.

> But, this got me wondering...would it be possible to run a streaming app
> in an active/active mode, where in normal operation, half of the work was
> being done in each DC, and in failover, all of the work would automatically
> failover to the online DC.



> But would it be possible to run two separate streaming applications in
> each DC, but in the *same Kafka consumer group*? I believe that, if the
> streaming app was using Kafka's usual consumer assignment and rebalancing
> protocol, it would.  Kafka would just see clients connecting from either DC
> in the same consumer group, and assign each consumer an equal number of
> partitions to consume, resulting in equal partition balancing in DCs.  If
> we shut down one of the streaming apps, Kafka would automatically rebalance
> the consumers in the consumer group, assigning all of the work to the
> remaining streaming app in the other DC.


Your investigation is correct, the KafkaSource doesn't support the Kafka's
usual consumer assignment and rebalancing protocol. Technically, you can
already do this with the PartitionSetSubscriber [2] and split up available
partitions between different Flink jobs. A challenge here is that adding
partitions would be difficult--you can no longer use KafkaSource's dynamic
partition discovery--changes will require Flink job restart. I think this
would be overkill to manage partitions like this.

Another challenge you will face is balancing the traffic across DC--you can
compute metrics (throughput of a Flink job's assigned Kafka partitions) to
determine which DC to deploy the Flink job in to keep the cluster balanced.

3.

> I got excited about this possibility, only to learn that Flink's
> KafkaSource does not use Kafka for consumer assignment.  I think I
> understand why it does this: the Source API can do a lot more than Kafka,
> so having some kind of state management (offsets) and task assignment
> (Kafka consumer balance protocol) outside of the usual Flink Source would
> be pretty weird.  Implementing offset and task assignment inside of the
> KafkaSource allows it to work like any other Source implementation.
>

This is totally correct. The source API can also pause and resume Kafka
consumers for watermark alignment, but you shouldn't face this issue since
your applications don't join.

All that being said, it is possible to adapt this in the FLIP-246 design.
Another benefit of FLIP-246 is to do this coordination without full Flink
job restart--the design could potentially be extended to detect changes in
DC assignment (if we need to do this dynamically) and reconcile the Flink
jobs without restart, but again this is at the cluster/topic granularity.

This is the FLIP doc [4] and discussion thread [5] if you want to leave
feedback. I hope this helps and please let me know if you have any other
questions!

Best,
Mason

[1] https://issues.apache.org/jira/browse/FLINK-29398
[2]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java
[3]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#dynamic-partition-discovery
[4]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source
[5] https://lists.apache.org/thread/vz7nw5qzvmxwnpktnofc9p13s1dzqm6z


On Wed, Oct 5, 2022 at 8:42 AM Martijn Visser 
wrote:

> Hi Andrew,
>
> While definitely no expert on this topic, my first thought was if this
> idea could be solved with the idea that was proposed in FLIP-246
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source
>
> I'm also looping in Mason Chen who was the initiator of that FLIP :)
>
> Best regards,
>
> Martijn
>
> On Wed, Oct 5, 2022 at 10:00 AM Andrew Otto  wrote:
>
>> (Ah, note that I am considering very simple streaming apps here, e.g.
>> event enrichment apps.  No wi

Re: Switching kafka brokers

2022-10-06 Thread Mason Chen
Hi Lars,

That sounds like a painful process. Since the offsets are inconsistent, I
would suggest to reset the Kafka source state by changing the `uid`, set
the source to start from earliest if you haven't already, make the
bootstrap server change, and restart your job with allowNonRestoredState
enabled. This process effectively will retain Flink state, excluding the
Kafka source.

Here is a similar question, where Martjin gave the same answer, but about
resetting Kafka topics:
https://lists.apache.org/thread/xcfjm23xk7xy9nh887pvsxbw9z649p3q.

If you are interested, I talked about this exact problem at Flink Forward
and how we are trying to solve it with FLIP-246
https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source.
The discussion thread is linked at the topic if you want to give feedback
on the FLIP.

Best,
Mason

On Thu, Oct 6, 2022 at 9:40 AM Lars Skjærven  wrote:

> Hello,
>
> What is the recommended approach for migrating flink jobs to a new kafka
> server? I was naively hoping to use Kafka Mirror Maker to sync the old
> server with the new server, and simply continue from savepoint with updated
> URL's. Unfortunately, the kafka offsets are not identical for log compacted
> topics when using mirror maker. Any tips ?
>
> L
>


Re: Flink KafkaSource still referencing deleted topic

2022-10-04 Thread Mason Chen
Hi Martjin,

I notice that this question comes up quite often. Would this be a good
addition to the KafkaSource documentation? I'd be happy to contribute to
the documentation.

Best,
Mason

On Tue, Oct 4, 2022 at 11:23 AM Martijn Visser 
wrote:

> Hi Robert,
>
> Based on
> https://stackoverflow.com/questions/72870074/apache-flink-restoring-state-from-checkpoint-with-changes-kafka-topic
> I think you'll need to change the UID for your KafkaSource and restart your
> job with allowNonRestoredState enabled.
>
> Best regards,
>
> Martijn
>
> On Tue, Oct 4, 2022 at 12:40 PM Robert Cullen 
> wrote:
>
>> We've changed the KafkaSource to ingest from a new topic but the old name
>> is still being referenced:
>>
>> 2022-10-04 07:03:41org.apache.flink.util.FlinkException: Global failure 
>> triggered by OperatorCoordinator for 'Source: Grokfailures' (operator 
>> feca28aff5a3958840bee985ee7de4d3). at 
>> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:553)
>>   at 
>> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223)
>>at 
>> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285)
>>   at 
>> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:298)
>>  at 
>> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
>>at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>   at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>   at java.lang.Thread.run(Thread.java:748)Caused by: 
>> org.apache.flink.util.FlinkRuntimeException: Failed to handle partition 
>> splits change due to at 
>> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:239)
>>  at 
>> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:86)
>>  at 
>> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
>>... 3 moreCaused by: java.lang.RuntimeException: Failed to get topic 
>> metadata.  at 
>> org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:59)
>>at 
>> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:212)
>>  at 
>> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$start$0(KafkaSourceEnumerator.java:158)
>>   at 
>> org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83)
>>  at 
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)  
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>> at 
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>... 3 moreCaused by: java.util.concurrent.ExecutionException: 
>> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
>> does not host this topic-partition.  at 
>> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>>   at 
>> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>> at 
>> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>> at 
>> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>>   at 
>> org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:57)
>>... 10 moreCaused by: 
>> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
>> does not host this topic-partition.
>>
>>
>> --
>> Robert Cullen
>> 240-475-4490
>>
>


Re: Hybrid Source stop processing files after processing 128 SourceFactories

2022-07-27 Thread Mason Chen
Hi Michael,

That's a great catch! I filed another ticket since this is a separate
issue: https://issues.apache.org/jira/browse/FLINK-28722.

Best,
Mason

On Tue, Jul 26, 2022 at 10:34 PM Benenson, Michael <
mikhail_benen...@intuit.com> wrote:

> Hi, Mason
>
>
>
> Think, the problem is related to
> https://github.com/apache/flink/blob/release-1.14.3-rc1/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java#L358
>
>
>
> if (readerSourceIndex.get(e.getKey()) == (Integer) sourceIndex) {
>
>
>
> Integer is Java object, so comparison should use ‘equals()’, not ‘==’
>
> It is Java, not Scala 
>
>
>
> Could you please fix this issue in your fix for 
> *https://issues.apache.org/jira/browse/FLINK-27479 
> <https://issues.apache.org/jira/browse/FLINK-27479>?*
>
>
>
> And thanks for the original fix.
>
>
>
> *From: *Mason Chen 
> *Date: *Tuesday, July 26, 2022 at 9:57 PM
> *To: *Benenson, Michael 
> *Cc: *user@flink.apache.org , Deshpande, Omkar <
> omkar_deshpa...@intuit.com>, Rosensweig, JD ,
> Sana, Harish 
> *Subject: *Re: Hybrid Source stop processing files after processing 128
> SourceFactories
>
> This email is from an external sender.
>
>
>
> Hi Michael,
>
>
>
> I'm glad the CPU fix works for you!
>
>
>
> Regarding the behavior, HybridSource should only consume from Kafka after
> it finishes the bounded read of the files. At that time, files will not be
> read anymore. In addition, there is no limitation where there can only be
> 128 source factories (the upper limit should be integer max).
>
>
>
> Can you give more details on how the HybridSource is configured? Are all
> sources unbounded? When you say it stopped processing files, does this mean
> it stops reading from Kafka too? How do you know the program is stalling?
> Is the metric numRecordsInPerSecond from the source operator 0?
>
>
>
> Best,
>
> Mason
>
>
>
> On Mon, Jul 25, 2022 at 7:52 PM Benenson, Michael <
> mikhail_benen...@intuit.com> wrote:
>
> Hi, folks
>
>
>
> I have tried fix FLINK-27479
> <https://issues.apache.org/jira/browse/FLINK-27479> for Hybrid Source
> from https://github.com/apache/flink/pull/20215  in Flink 14.3
>
>
>
> It works fine, but Flink stops processing files after processing 128
> SourceFactories. I have run this program a few times, starting without
> savepoint, and each time the program hangs up, after processing 128
> SourceFactories. Program does not crash or terminate, but stop processing
> files.
>
>
>
> My program is like the Hybrid source example: reading multiple files, and
> then reading from Kafka
>
>
>
> In my case program reads a few hundred directories from s3, that contains
> snappy files, so for each directory it creates separate
> HybridSource.SourceFactory, and the last one is the SourceFactory for
> reading from Kafka.
>
>
>
> Any idea, what could be wring? Is it a known restriction, that there
> should be no more than 128 Source Factories?
>
> I have the program running now, so I could collect any additional info to
> clarify the cause of the problem.
>
>
>
> Here are the last few lines from JobManager before program stop processing
> files.
>
>
>
> 2022/07/26 01:02:35.248 INFO  o.a.f.c.f.s.i.StaticFileSplitEnumerator - No
> more splits available for subtask 0
>
> 2022/07/26 01:02:36.249 INFO  c.i.strmprocess.hybrid.ReadS3Hybrid1 -
> Reading input data from path
> s3://idl-kafka-connect-ued-raw-uw2-data-lake-e2e/data/topics/sbseg-qbo-clickstream/d_20220715-0800
> for 2022-07-15T08:00:00Z
>
> 2022/07/26 01:02:36.618 INFO  o.a.f.c.b.s.h.HybridSourceSplitEnumerator -
> Starting enumerator for sourceIndex=128
>
> 2022/07/26 01:02:36.619 INFO  o.a.f.r.s.c.SourceCoordinator - Source
> Source: hybrid-source received split request from parallel task 1
>
> 2022/07/26 01:02:36.619 INFO  o.a.f.r.s.c.SourceCoordinator - Source
> Source: hybrid-source received split request from parallel task 2
>
> 2022/07/26 01:02:36.619 INFO  o.a.f.r.s.c.SourceCoordinator - Source
> Source: hybrid-source received split request from parallel task 1
>
>
>
>
>
>


Re: Hybrid Source stop processing files after processing 128 SourceFactories

2022-07-26 Thread Mason Chen
Hi Michael,

I'm glad the CPU fix works for you!

Regarding the behavior, HybridSource should only consume from Kafka after
it finishes the bounded read of the files. At that time, files will not be
read anymore. In addition, there is no limitation where there can only be
128 source factories (the upper limit should be integer max).

Can you give more details on how the HybridSource is configured? Are all
sources unbounded? When you say it stopped processing files, does this mean
it stops reading from Kafka too? How do you know the program is stalling?
Is the metric numRecordsInPerSecond from the source operator 0?

Best,
Mason

On Mon, Jul 25, 2022 at 7:52 PM Benenson, Michael <
mikhail_benen...@intuit.com> wrote:

> Hi, folks
>
>
>
> I have tried fix FLINK-27479
>  for Hybrid Source
> from https://github.com/apache/flink/pull/20215  in Flink 14.3
>
>
>
> It works fine, but Flink stops processing files after processing 128
> SourceFactories. I have run this program a few times, starting without
> savepoint, and each time the program hangs up, after processing 128
> SourceFactories. Program does not crash or terminate, but stop processing
> files.
>
>
>
> My program is like the Hybrid source example: reading multiple files, and
> then reading from Kafka
>
>
>
> In my case program reads a few hundred directories from s3, that contains
> snappy files, so for each directory it creates separate
> HybridSource.SourceFactory, and the last one is the SourceFactory for
> reading from Kafka.
>
>
>
> Any idea, what could be wring? Is it a known restriction, that there
> should be no more than 128 Source Factories?
>
> I have the program running now, so I could collect any additional info to
> clarify the cause of the problem.
>
>
>
> Here are the last few lines from JobManager before program stop processing
> files.
>
>
>
> 2022/07/26 01:02:35.248 INFO  o.a.f.c.f.s.i.StaticFileSplitEnumerator - No
> more splits available for subtask 0
>
> 2022/07/26 01:02:36.249 INFO  c.i.strmprocess.hybrid.ReadS3Hybrid1 -
> Reading input data from path
> s3://idl-kafka-connect-ued-raw-uw2-data-lake-e2e/data/topics/sbseg-qbo-clickstream/d_20220715-0800
> for 2022-07-15T08:00:00Z
>
> 2022/07/26 01:02:36.618 INFO  o.a.f.c.b.s.h.HybridSourceSplitEnumerator -
> Starting enumerator for sourceIndex=128
>
> 2022/07/26 01:02:36.619 INFO  o.a.f.r.s.c.SourceCoordinator - Source
> Source: hybrid-source received split request from parallel task 1
>
> 2022/07/26 01:02:36.619 INFO  o.a.f.r.s.c.SourceCoordinator - Source
> Source: hybrid-source received split request from parallel task 2
>
> 2022/07/26 01:02:36.619 INFO  o.a.f.r.s.c.SourceCoordinator - Source
> Source: hybrid-source received split request from parallel task 1
>
>
>
>
>


Flink application mode, multiple jobs

2022-07-14 Thread Mason Chen
Hi all,

Is there any limitation on the number of jobs you can deploy together
within the same Flink application? We are noticing some exceptions related
to task slots at job startup. It typically recovers after 10-20 minutes.

What are some of the recommended configurations that we can tune to
alleviate these issues? I've copied some of the exception messages below.
The most frequent exception is TaskSubmissionException and we have more
than enough task slots for the job, so the last exception seems to be a red
herring.

org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException: No
task slot allocated for job ID 233b239f6f7e410fa84dcb3ed9bb958f and
allocation ID 0d4cd6b91fac49aa9b8d401c58b34f46.

org.apache.flink.util.FlinkException: TaskExecutor akka.tcp://flink@172.18.
112.121:40703/user/rpc/taskmanager_0 has no more allocated slots for job 233
b239f6f7e410fa84dcb3ed9bb958f.



Best,
Mason


Re: [DISCUSS] Contribution of Multi Cluster Kafka Source

2022-07-14 Thread Mason Chen
Hi all,

Circling back on this--I have created a first draft document in confluence:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source
.

Looking forward to hear all your feedback in this email thread!

Best,
Mason

On Thu, Jun 30, 2022 at 6:57 AM Thomas Weise  wrote:

> Hi Mason,
>
> I added mason6345 to the Flink confluence space, you should be able to
> add a FLIP now.
>
> Looking forward to the contribution!
>
> Thomas
>
> On Thu, Jun 30, 2022 at 9:25 AM Martijn Visser 
> wrote:
> >
> > Hi Mason,
> >
> > I'm sure there's a PMC (*hint*) out there who can grant you access to
> > create a FLIP. Looking forward to it, this sounds like an improvement
> that
> > users are looking forward to.
> >
> > Best regards,
> >
> > Martijn
> >
> > Op di 28 jun. 2022 om 09:21 schreef Mason Chen :
> >
> > > Hi all,
> > >
> > > Thanks for the feedback! I'm adding the users, who responded in the
> user
> > > mailing list, to this thread.
> > >
> > > @Qingsheng - Yes, I would prefer to reuse the existing Kafka connector
> > > module. It makes a lot of sense since the dependencies are the same
> and the
> > > implementation can also extend and improve some of the test utilities
> you
> > > have been working on for the FLIP 27 Kafka Source. I will enumerate the
> > > migration steps in the FLIP template.
> > >
> > > @Ryan - I don't have a public branch available yet, but I would
> appreciate
> > > your review on the FLIP design! When the FLIP design is approved by
> devs
> > > and the community, I can start to commit our implementation to a fork.
> > >
> > > @Andrew - Yup, one of the requirements of the connector is to read
> > > multiple clusters within a single source, so it should be able to work
> well
> > > with your use case.
> > >
> > > @Devs - what do I need to get started on the FLIP design? I see the
> FLIP
> > > template and I have an account (mason6345), but I don't have access to
> > > create a page.
> > >
> > > Best,
> > > Mason
> > >
> > >
> > >
> > >
> > > On Sun, Jun 26, 2022 at 8:08 PM Qingsheng Ren 
> wrote:
> > >
> > >> Hi Mason,
> > >>
> > >> It sounds like an exciting enhancement to the Kafka source and will
> > >> benefit a lot of users I believe.
> > >>
> > >> Would you prefer to reuse the existing flink-connector-kafka module or
> > >> create a new one for the new multi-cluster feature? Personally I
> prefer the
> > >> former one because users won’t need to introduce another dependency
> module
> > >> to their projects in order to use the feature.
> > >>
> > >> Thanks for the effort on this and looking forward to your FLIP!
> > >>
> > >> Best,
> > >> Qingsheng
> > >>
> > >> > On Jun 24, 2022, at 09:43, Mason Chen 
> wrote:
> > >> >
> > >> > Hi community,
> > >> >
> > >> > We have been working on a Multi Cluster Kafka Source and are
> looking to
> > >> > contribute it upstream. I've given a talk about the features and
> design
> > >> at
> > >> > a Flink meetup: https://youtu.be/H1SYOuLcUTI.
> > >> >
> > >> > The main features that it provides is:
> > >> > 1. Reading multiple Kafka clusters within a single source.
> > >> > 2. Adjusting the clusters and topics the source consumes from
> > >> dynamically,
> > >> > without Flink job restart.
> > >> >
> > >> > Some of the challenging use cases that these features solve are:
> > >> > 1. Transparent Kafka cluster migration without Flink job restart.
> > >> > 2. Transparent Kafka topic migration without Flink job restart.
> > >> > 3. Direct integration with Hybrid Source.
> > >> >
> > >> > In addition, this is designed with wrapping and managing the
> existing
> > >> > KafkaSource components to enable these features, so it can continue
> to
> > >> > benefit from KafkaSource improvements and bug fixes. It can be
> > >> considered
> > >> > as a form of a composite source.
> > >> >
> > >> > I think the contribution of this source could benefit a lot of
> users who
> > >> > have asked in the mailing list about Flink handling Kafka
> migrations and
> > >> > removing topics in the past. I would love to hear and address your
> > >> thoughts
> > >> > and feedback, and if possible drive a FLIP!
> > >> >
> > >> > Best,
> > >> > Mason
> > >>
> > >>
>


Re: Configure a kafka source dynamically (???)

2022-07-08 Thread Mason Chen
Hi Salva,

I was the contributor on the ticket and have updated the PR. Sorry for the
delay! Meanwhile, you can use reflection to set the KafkaSubscriber if you
need to have an immediate solution.

With respect to your control message idea, what is the motivation to use a
push based mechanism vs poll based mechanism? If it is about latency in
getting the changes, you could also reduce the discovery interval to be a
low number. From the source framework, there isn't a notion of an
entrypoint that could be exposed to external systems for control message
pushing.

Best,
Mason

On Thu, Jul 7, 2022 at 8:45 PM Salva Alcántara 
wrote:

> When using the kafka connector, you need to set the topics in advance (by
> giving a list of them or a regex pattern for the topic names). Imagine a
> situation where the topics are not known in advance, of course you could
> use an all-pass regex pattern to match all the topics in the broker but
> what I want to know is whether it's possible to read from new topics on
> demand.
>
> E.g., initially the source starts without any topics to read from so
> nothing is read until it gets a control msg (which could be pushed to a
> control topic, for example) specifying the set of topics to subscribe to. I
> guess this could be somehow implemented using custom subscribers once this
> issue is merged/closed:
>
> https://issues.apache.org/jira/browse/FLINK-24660
>
> but would it be possible to achieve this objective without having to
> periodically pull the broker, e.g., in a more reactive (push) way? I guess
> if the kafka source (or any other source for what it's worth) were to have
> a control signal like that then it would be more of an operator than a
> source, really...
>
> Salva
>
> PS: Does anyone know the current state of FLINK-24660? The changes seem to
> have been ready to merge for a while.
>


Re: influxdb metrics reporter - 4k series per job restart

2022-07-01 Thread Mason Chen
Hi all,

If you can wait for Flink 1.16, there is a new feature to filter metrics
(includes/excludes filter). Additionally, you can already take advantage of
dropping unnecessary labels with `scope.variables.excludes` in the current
release. Link to 1.16 metric features:
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/metric_reporters/#reporter

Best,
Mason

On Fri, Jul 1, 2022 at 3:55 AM Martijn Visser 
wrote:

> Have you considered setting the value for some of the series to a fixed
> value? For example, if you're not interested in the value for ,
> you could consider setting that to a fixed value "task_id" [1] ?
>
> Best regards,
>
> Martijn
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#system-scope
>
> Op do 30 jun. 2022 om 15:52 schreef Weihua Hu :
>
>> Hi, Filip
>>
>> You can modify the InfluxdbReporter code to rewrite the
>> notifyOfAddedMetric method and filter the required metrics for reporting.
>>
>> Best,
>> Weihua
>>
>>
>> On Thu, Jun 30, 2022 at 8:46 PM Filip Karnicki 
>> wrote:
>>
>>> Hi All
>>>
>>> We're using the influx reporter (flink 1.14.3), which seems to create a
>>> series per:
>>> -[task|job]manager
>>> - host
>>> - job_id
>>> - job_name
>>> - subtask_index
>>> - task_attempt_id
>>> - task_attempt_num
>>> - task_id
>>> - tm_id
>>>
>>> which amounts to about 4k of series each time our job restarts itself
>>>
>>> We are currently experiencing problems with checkpoint duration timeouts
>>> (> 60s) (unrelated) and every 60 secs our job restarts and creates further
>>> 4k series in influxdb.
>>>
>>> Needless to say, the team managing influxdb is not too happy with the
>>> amount of series we create.
>>>
>>> Is there anything I can do to either reduce the number of series, or
>>> reduce the number of types of metrics in order to produce fewer series? (we
>>> don't view all the available metrics in grafana, so we don't necessarily
>>> have to send all of them)
>>>
>>> The db caps at 1M series, and with our current problems with
>>> checkpointing we go through that many in a matter of hours
>>>
>>> Many thanks
>>> Fil
>>>
>>>


Re: [DISCUSS] Contribution of Multi Cluster Kafka Source

2022-06-28 Thread Mason Chen
Hi all,

Thanks for the feedback! I'm adding the users, who responded in the user
mailing list, to this thread.

@Qingsheng - Yes, I would prefer to reuse the existing Kafka connector
module. It makes a lot of sense since the dependencies are the same and the
implementation can also extend and improve some of the test utilities you
have been working on for the FLIP 27 Kafka Source. I will enumerate the
migration steps in the FLIP template.

@Ryan - I don't have a public branch available yet, but I would appreciate
your review on the FLIP design! When the FLIP design is approved by devs
and the community, I can start to commit our implementation to a fork.

@Andrew - Yup, one of the requirements of the connector is to read multiple
clusters within a single source, so it should be able to work well with
your use case.

@Devs - what do I need to get started on the FLIP design? I see the FLIP
template and I have an account (mason6345), but I don't have access to
create a page.

Best,
Mason




On Sun, Jun 26, 2022 at 8:08 PM Qingsheng Ren  wrote:

> Hi Mason,
>
> It sounds like an exciting enhancement to the Kafka source and will
> benefit a lot of users I believe.
>
> Would you prefer to reuse the existing flink-connector-kafka module or
> create a new one for the new multi-cluster feature? Personally I prefer the
> former one because users won’t need to introduce another dependency module
> to their projects in order to use the feature.
>
> Thanks for the effort on this and looking forward to your FLIP!
>
> Best,
> Qingsheng
>
> > On Jun 24, 2022, at 09:43, Mason Chen  wrote:
> >
> > Hi community,
> >
> > We have been working on a Multi Cluster Kafka Source and are looking to
> > contribute it upstream. I've given a talk about the features and design
> at
> > a Flink meetup: https://youtu.be/H1SYOuLcUTI.
> >
> > The main features that it provides is:
> > 1. Reading multiple Kafka clusters within a single source.
> > 2. Adjusting the clusters and topics the source consumes from
> dynamically,
> > without Flink job restart.
> >
> > Some of the challenging use cases that these features solve are:
> > 1. Transparent Kafka cluster migration without Flink job restart.
> > 2. Transparent Kafka topic migration without Flink job restart.
> > 3. Direct integration with Hybrid Source.
> >
> > In addition, this is designed with wrapping and managing the existing
> > KafkaSource components to enable these features, so it can continue to
> > benefit from KafkaSource improvements and bug fixes. It can be considered
> > as a form of a composite source.
> >
> > I think the contribution of this source could benefit a lot of users who
> > have asked in the mailing list about Flink handling Kafka migrations and
> > removing topics in the past. I would love to hear and address your
> thoughts
> > and feedback, and if possible drive a FLIP!
> >
> > Best,
> > Mason
>
>


[DISCUSS] Contribution of Multi Cluster Kafka Source

2022-06-23 Thread Mason Chen
Hi community,

We have been working on a Multi Cluster Kafka Source and are looking to
contribute it upstream. I've given a talk about the features and design at
a Flink meetup: https://youtu.be/H1SYOuLcUTI.

The main features that it provides is:
1. Reading multiple Kafka clusters within a single source.
2. Adjusting the clusters and topics the source consumes from dynamically,
without Flink job restart.

Some of the challenging use cases that these features solve are:
1. Transparent Kafka cluster migration without Flink job restart.
2. Transparent Kafka topic migration without Flink job restart.
3. Direct integration with Hybrid Source.

In addition, this is designed with wrapping and managing the existing
KafkaSource components to enable these features, so it can continue to
benefit from KafkaSource improvements and bug fixes. It can be considered
as a form of a composite source.

I think the contribution of this source could benefit a lot of users who
have asked in the mailing list about Flink handling Kafka migrations and
removing topics in the past. I would love to hear and address your thoughts
and feedback, and if possible drive a FLIP!

Best,
Mason


Re: Prometheus metrics does not work in 1.15.0 taskmanager

2022-05-05 Thread Mason Chen
Nice work Peter! Looking forward to the fix.

@ChangZhou Kafka metrics are emitted from the source and the process
function would be a different operator. For the datastream API, you can set
`KafkaSourceOptions.REGISTER_KAFKA_CONSUMER_METRICS.key()` as `false` in
your consumer properties.

Best,
Mason

On Wed, May 4, 2022 at 10:43 PM ChangZhuo Chen (陳昌倬) 
wrote:

> On Wed, May 04, 2022 at 01:53:01PM +0200, Chesnay Schepler wrote:
> > Disabling the kafka metrics _should_ work.
>
> Is there anyway to disable Kafka metrics when using low level process
> function?
>
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
>


Re: Prometheus metrics does not work in 1.15.0 taskmanager

2022-05-03 Thread Mason Chen
Hi ChangZhou,

The warning log indicates that the metric was previously defined and so the
runtime is handling the "duplicate" metric by ignoring it. This is
typically a benign message unless you rely on this metric. Is it possible
that you are using the same task name for different tasks? It would be
defined by the `.name(...)` API in your job graph instantiation.

Can you clarify what it means that your endpoint isn't working--some
metrics missing, endpoint is timing out, etc.? Also, can you confirm from
logs that the PrometheusReporter was created properly?

Best,
Mason

On Mon, May 2, 2022 at 7:25 PM ChangZhuo Chen (陳昌倬) 
wrote:

> Hi,
>
> We found that taskmanager Prometheus endpoint does not work after
> upgrading from 1.14.3 to 1.15.0. Jobmanager Prometheus endpoint is okay
> in 1.15.0, so we think the problem is not in image we used. Any idea how
> to fix this problem?
>
>
> Also, we found the following log in taskmanager, but not jobmanager. Not
> sure if they are related to this issue.
>
> 2022-05-03 01:48:32,839 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric
> with the name 'numBytesInLocal'. Metric will not be
> reported.[10.210.47.134, taskmanager, , , ,
> 8, Shuffle, Netty, Input]
> 2022-05-03 01:48:32,839 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric
> with the name 'numBytesInLocalPerSecond'. Metric will not be
> reported.[10.210.47.134, taskmanager, , , ,
> 8, Shuffle, Netty, Input]
> ...
>
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
>


Re: Discuss making KafkaSubscriber Public

2022-04-20 Thread Mason Chen
Hi all,

Just following up on this thread. The subject header may have been
misleading--I was proposing to make KafkaSubscriber @PublicEvolving and
expose a setter to pass a custom implementation. It seems logical since the
KafkaSource is also @PublicEvolving and this lets the user know that the
interface may be subject to change if requirements change in the future.
Anyone have any opinions? Thanks!

Best,
Mason

On Wed, Apr 13, 2022 at 10:07 AM Mason Chen  wrote:

> Hi Chesnay,
>
> Typically, users want to plug in a KafkaSubscriber that depends on an
> external system [1][2]. We could also provide a higher level interface that
> doesn’t depend on the Kafka Admin Client, but I think it would be more
> flexible to be able to re-use the one created by the enumerator if needed.
> If we don't want to expose the Kafka Admin Client and if users want to
> apply some complex filter, then we can also provide a pluggable interface
> used in a similar implementation to that of the subscriber used for topic
> pattern and allow users to filter topics after the Kafka API response.
>
> [1] https://www.mail-archive.com/user@flink.apache.org/msg44340.html
> [2] https://www.mail-archive.com/dev@flink.apache.org/msg52007.html
>
> Best,
> Mason
>
>
> On Wed, Apr 13, 2022 at 6:32 AM Chesnay Schepler 
> wrote:
>
>> Could you expand a bit on possible alternative implementations that
>> require this interface to become public, opposed to providing more
>> built-in ways to subscribe?
>>
>> On 13/04/2022 11:26, Qingsheng Ren wrote:
>> > Thanks for the proposal Mason! I think exposing `KafkaSubscriber` as
>> public API is helpful for users to implement more complex subscription
>> logics.
>> >
>> > +1 (non-binding)
>> >
>> > Cheers,
>> >
>> > Qingsheng
>> >
>> >> On Apr 12, 2022, at 11:46, Mason Chen  wrote:
>> >>
>> >> Hi Flink Devs,
>> >>
>> >> I was looking to contribute to
>> https://issues.apache.org/jira/browse/FLINK-24660, which is a ticket to
>> track changing the KafkaSubscriber from Internal to PublicEvolving.
>> >>
>> >> In the PR, it seems a few of us have agreement on making the
>> subscriber pluggable in the KafkaSource, but I'd like to raise the question
>> nevertheless. Furthermore, there is also interest from various Flink
>> mailing threads and on the Jira ticket itself for the ticket, so I think
>> the change would be beneficial to the users. There is already some feedback
>> to make the contract of handling removed splits by the KafkaSource and
>> subscriber clearer in the docs.
>> >>
>> >> I have yet to address all the PR feedback, but does anyone have any
>> concerns before I proceed further?
>> >>
>> >> Best,
>> >> Mason
>>
>>
>>


Re: Discuss making KafkaSubscriber Public

2022-04-13 Thread Mason Chen
Hi Chesnay,

Typically, users want to plug in a KafkaSubscriber that depends on an
external system [1][2]. We could also provide a higher level interface that
doesn’t depend on the Kafka Admin Client, but I think it would be more
flexible to be able to re-use the one created by the enumerator if needed.
If we don't want to expose the Kafka Admin Client and if users want to
apply some complex filter, then we can also provide a pluggable interface
used in a similar implementation to that of the subscriber used for topic
pattern and allow users to filter topics after the Kafka API response.

[1] https://www.mail-archive.com/user@flink.apache.org/msg44340.html
[2] https://www.mail-archive.com/dev@flink.apache.org/msg52007.html

Best,
Mason


On Wed, Apr 13, 2022 at 6:32 AM Chesnay Schepler  wrote:

> Could you expand a bit on possible alternative implementations that
> require this interface to become public, opposed to providing more
> built-in ways to subscribe?
>
> On 13/04/2022 11:26, Qingsheng Ren wrote:
> > Thanks for the proposal Mason! I think exposing `KafkaSubscriber` as
> public API is helpful for users to implement more complex subscription
> logics.
> >
> > +1 (non-binding)
> >
> > Cheers,
> >
> > Qingsheng
> >
> >> On Apr 12, 2022, at 11:46, Mason Chen  wrote:
> >>
> >> Hi Flink Devs,
> >>
> >> I was looking to contribute to
> https://issues.apache.org/jira/browse/FLINK-24660, which is a ticket to
> track changing the KafkaSubscriber from Internal to PublicEvolving.
> >>
> >> In the PR, it seems a few of us have agreement on making the subscriber
> pluggable in the KafkaSource, but I'd like to raise the question
> nevertheless. Furthermore, there is also interest from various Flink
> mailing threads and on the Jira ticket itself for the ticket, so I think
> the change would be beneficial to the users. There is already some feedback
> to make the contract of handling removed splits by the KafkaSource and
> subscriber clearer in the docs.
> >>
> >> I have yet to address all the PR feedback, but does anyone have any
> concerns before I proceed further?
> >>
> >> Best,
> >> Mason
>
>
>


Discuss making KafkaSubscriber Public

2022-04-11 Thread Mason Chen
Hi Flink Devs,

I was looking to contribute to
https://issues.apache.org/jira/browse/FLINK-24660, which is a ticket to
track changing the KafkaSubscriber from Internal to PublicEvolving.

In the PR, it seems a few of us have agreement on making the subscriber
pluggable in the KafkaSource, but I'd like to raise the question
nevertheless. Furthermore, there is also interest from various Flink
mailing threads and on the Jira ticket itself for the ticket, so I think
the change would be beneficial to the users. There is already some feedback
to make the contract of handling removed splits by the KafkaSource and
subscriber clearer in the docs.

I have yet to address all the PR feedback, but does anyone have any
concerns before I proceed further?

Best,
Mason


Re: KafkaPartitionSplitReader handleSplitsChanges

2022-03-02 Thread Mason Chen
Or is the motivation that resolving the committed/latest offsets is an
infrequent event (and only for bounded read) so the optimization is not
worth it?

On Wed, Mar 2, 2022 at 2:16 PM Mason Chen  wrote:

> Hi all,
>
> I noticed in the javadocs that SplitReaders should not have a blocking
> handleSplitsChanges implementation:
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java#L55
>
> However, for KafkaPartitionSplitReader, the implementation is blocking due
> to potential IO to retrieve committed offsets. I just wanted to double
> check: is this javadoc accurate or should the KafkaPartitionSplitReader
> implementation be optimized?
>
> Best,
> Mason
>
>


KafkaPartitionSplitReader handleSplitsChanges

2022-03-02 Thread Mason Chen
Hi all,

I noticed in the javadocs that SplitReaders should not have a blocking
handleSplitsChanges implementation:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java#L55

However, for KafkaPartitionSplitReader, the implementation is blocking due
to potential IO to retrieve committed offsets. I just wanted to double
check: is this javadoc accurate or should the KafkaPartitionSplitReader
implementation be optimized?

Best,
Mason


Re: MetricRegistryTestUtils java class (flink-runtime/metrics) not found in source code version 1.14.3

2022-02-25 Thread Mason Chen
Hi Prasanna,

Why do you need histograms vs summaries? I'm curious about the change and
want to see if it applies to my usage of the PrometheusReporter.

Best,
Mason

On Mon, Jan 31, 2022 at 11:51 PM Martijn Visser 
wrote:

> Hi Prasanna,
>
> Just a quick note that the Github links are all pointing to release
> candidate 1 for 1.14.3. The released version is in
> https://github.com/apache/flink/blob/release-1.14.3/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTestUtils.java
>
> Best regards,
>
> Martijn
>
> On Tue, 1 Feb 2022 at 07:35, Prasanna kumar 
> wrote:
>
>> NVM was able to find it in a different place
>>
>>
>> https://github.com/apache/flink/blob/release-1.14.3-rc1/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTestUtils.java
>>
>> On Tue, Feb 1, 2022 at 11:58 AM Prasanna kumar <
>> prasannakumarram...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Team, We are writing our own prometheus reporter to make sure that we
>>> are capturing data in histograms rather than summaries.
>>>
>>> We were able to do it successfully in version 1.12.7.
>>>
>>> But while upgrading to version 1.14.3 , we find
>>> that MetricRegistryTestUtils is not available in the src code given by
>>> flink in github.
>>>
>>> PrometheusReporterTest.java throws error that this file is unavailable
>>>
>>> [image: Screen Shot 2022-02-01 at 11.50.09 AM.png]
>>>
>>> [image: Screen Shot 2022-02-01 at 11.53.17 AM.png]
>>>
>>> The below code is in 1.12.7 but method
>>> defaultMetricRegistryConfiguration  is deprecated in the latest version.
>>>
>>> [image: Screen Shot 2022-02-01 at 11.51.46 AM.png]
>>>
>>> Looking at the Github link
>>> https://github.com/apache/flink/tree/release-1.14.3-rc1/flink-runtime/src/main/java/org/apache/flink/runtime/metrics
>>> also shows that the MetricRegistryTestUtils is not available. It's not
>>> available in
>>> https://github.com/apache/flink/tree/master/flink-runtime/src/main/java/org/apache/flink/runtime/metrics
>>> master branch as well.
>>>
>>> [image: Screen Shot 2022-02-01 at 11.55.19 AM.png]
>>>
>>> Could the community please add the Class file in GITHUB.
>>>
>>> Thanks,
>>> Prasanna.
>>>
>>


Prom Pushgateway Reporter HTTPS support

2022-01-18 Thread Mason Chen
Hi all,

There is some interest from our users to use prometheus push gateway reporter 
with a https endpoint. So, I've filed 
https://issues.apache.org/jira/browse/FLINK-25697 and I figured that it would 
be acceptable since influxdb reporter supports something similar. Could someone 
assign me this ticket--I’d like to help contribute this back to OSS!

Best,
Mason

Re: unaligned checkpoint for job with large start delay

2022-01-11 Thread Mason Chen
Hi Piotrek,

No worries—I hope you had a good break.

> Counting how many windows have been registered/fired and plotting that over 
> time.
It’s straightforward to count windows that are fired (the trigger exposes the 
run time context and we can collect the information in that code path). 
However, it’s not so clear how to count the windows that have been registered 
since the window assigner does not expose the run time context—is this even the 
right place to count? It’s not necessarily the case that an assignment results 
in a new window registered. Am I missing anything else relevant from the user 
facing interface perspective?

>  Unfortunately at the moment I don't know how to implement such a metric 
> without affecting performance on the critical path, so I don't see this 
> happening soon :(
Perhaps, it can be an opt in feature? I do it see it being really useful since 
most users aren’t really familiar with windows and these metrics can help 
easily identify the common problem of too many windows firing.

The additional metrics certainly help in diagnosing some of the symptoms of the 
root problem.

Best,
Mason

> On Jan 10, 2022, at 1:00 AM, Piotr Nowojski  wrote:
> 
> Hi Mason,
> 
> Sorry for a late reply, but I was OoO.
> 
> I think you could confirm it with more custom metrics. Counting how many 
> windows have been registered/fired and plotting that over time.
> 
> I think it would be more helpful in this case to check how long a task has 
> been blocked being "busy" processing for example timers. FLINK-25414 shows 
> only blocked on being hard/soft backpressure. Unfortunately at the moment I 
> don't know how to implement such a metric without affecting performance on 
> the critical path, so I don't see this happening soon :(
> 
> Best,
> Piotrek
> 
> wt., 4 sty 2022 o 18:02 Mason Chen  <mailto:mason.c...@apple.com>> napisał(a):
> Hi Piotrek,
> 
>> In other words, something (presumably a watermark) has fired more than 151 
>> 200 windows at once, which is taking ~1h 10minutes to process and during 
>> this time the checkpoint can not make any progress. Is this number of 
>> triggered windows plausible in your scenario?
> 
> It seems plausible—there are potentially many keys (and many windows). Is 
> there a way to confirm with metrics? We can add a window fire counter to the 
> window operator that only gets incremented at the end of windows evaluation, 
> in order to see the huge jumps in window fires. I can this benefiting other 
> users who troubleshoot the problem of large number of window firing.
> 
> Best,
> Mason
> 
>> On Dec 29, 2021, at 2:56 AM, Piotr Nowojski > <mailto:pnowoj...@apache.org>> wrote:
>> 
>> Hi Mason,
>> 
>> > and it has to finish processing this output before checkpoint can begin—is 
>> > this right?
>> 
>> Yes. Checkpoint will be only executed once all triggered windows will be 
>> fully processed. 
>> 
>> But from what you have posted it looks like all of that delay is coming from 
>> hundreds of thousands of windows firing all at the same time. Between 20:30 
>> and ~21:40 there must have been a bit more than 36 triggers/s * 60s/min * 
>> 70min = 151 200triggers fired at once (or in a very short interval). In 
>> other words, something (presumably a watermark) has fired more than 151 200 
>> windows at once, which is taking ~1h 10minutes to process and during this 
>> time the checkpoint can not make any progress. Is this number of triggered 
>> windows plausible in your scenario?
>> 
>> Best,
>> Piotrek
>> 
>> 
>> czw., 23 gru 2021 o 12:12 Mason Chen > <mailto:mason.c...@apple.com>> napisał(a):
>> Hi Piotr,
>> 
>> Thanks for the thorough response and the PR—will review later.
>> 
>> Clarifications:
>> 1. The flat map you refer to produces at most 1 record.
>> 2. The session window operator’s window process function emits at least 1 
>> record. 
>> 3. The 25 ms sleep is at the beginning of the window process function.
>> 
>> Your explanation about how records being bigger than the buffer size can 
>> cause blockage makes sense to me. However, my average record size is around 
>> 770 bytes coming out of the source and 960 bytes coming out of the window. 
>> Also, we don’t override the default `taskmanager.memory.segment-size`. My 
>> Flink job memory config is as follows:
>> 
>> ```
>> taskmanager.memory.jvm-metaspace.size: 512 mb
>> taskmanager.memory.jvm-overhead.max: 2Gb
>> taskmanager.memory.jvm-overhead.min: 512Mb
>> taskmanager.memory.managed.fraction: '0.4'
>>  

Re: unaligned checkpoint for job with large start delay

2022-01-04 Thread Mason Chen
Hi Piotrek,

> In other words, something (presumably a watermark) has fired more than 151 
> 200 windows at once, which is taking ~1h 10minutes to process and during this 
> time the checkpoint can not make any progress. Is this number of triggered 
> windows plausible in your scenario?

It seems plausible—there are potentially many keys (and many windows). Is there 
a way to confirm with metrics? We can add a window fire counter to the window 
operator that only gets incremented at the end of windows evaluation, in order 
to see the huge jumps in window fires. I can this benefiting other users who 
troubleshoot the problem of large number of window firing.

Best,
Mason

> On Dec 29, 2021, at 2:56 AM, Piotr Nowojski  wrote:
> 
> Hi Mason,
> 
> > and it has to finish processing this output before checkpoint can begin—is 
> > this right?
> 
> Yes. Checkpoint will be only executed once all triggered windows will be 
> fully processed. 
> 
> But from what you have posted it looks like all of that delay is coming from 
> hundreds of thousands of windows firing all at the same time. Between 20:30 
> and ~21:40 there must have been a bit more than 36 triggers/s * 60s/min * 
> 70min = 151 200triggers fired at once (or in a very short interval). In other 
> words, something (presumably a watermark) has fired more than 151 200 windows 
> at once, which is taking ~1h 10minutes to process and during this time the 
> checkpoint can not make any progress. Is this number of triggered windows 
> plausible in your scenario?
> 
> Best,
> Piotrek
> 
> 
> czw., 23 gru 2021 o 12:12 Mason Chen  <mailto:mason.c...@apple.com>> napisał(a):
> Hi Piotr,
> 
> Thanks for the thorough response and the PR—will review later.
> 
> Clarifications:
> 1. The flat map you refer to produces at most 1 record.
> 2. The session window operator’s window process function emits at least 1 
> record. 
> 3. The 25 ms sleep is at the beginning of the window process function.
> 
> Your explanation about how records being bigger than the buffer size can 
> cause blockage makes sense to me. However, my average record size is around 
> 770 bytes coming out of the source and 960 bytes coming out of the window. 
> Also, we don’t override the default `taskmanager.memory.segment-size`. My 
> Flink job memory config is as follows:
> 
> ```
> taskmanager.memory.jvm-metaspace.size: 512 mb
> taskmanager.memory.jvm-overhead.max: 2Gb
> taskmanager.memory.jvm-overhead.min: 512Mb
> taskmanager.memory.managed.fraction: '0.4'
> taskmanager.memory.network.fraction: '0.2'
> taskmanager.memory.network.max: 2Gb
> taskmanager.memory.network.min: 200Mb
> taskmanager.memory.process.size: 16Gb
> taskmanager.numberOfTaskSlots: '4'
> ```
> 
>>  Are you sure your job is making any progress? Are records being processed? 
>> Hasn't your job simply deadlocked on something?
> 
> To distinguish task blockage vs graceful backpressure, I have checked the 
> operator throughput metrics and have confirmed that during window task buffer 
> blockage, the window operator DOES emit records. Tasks look like they aren’t 
> doing anything but the window is emitting records.
> 
> 
> 
> 
> Furthermore, I created a custom trigger to wrap a metric counter for FIRED 
> counts to get a estimation of how many windows are fired at the same time. I 
> ran a separate job with the same configs—the results look as follows:
> 
> 
> On average, when the buffers are blocked, there are 36 FIREs per second. 
> Since each of these fires invokes the window process function, 25 ms * 36 = 
> 900 ms means we sleep almost a second cumulatively, per second—which is 
> pretty severe. Combined with the fact that the window process function can 
> emit many records, the task takes even longer to checkpoint since the 
> flatmap/kafka sink is chained with the window operator—and it has to finish 
> processing this output before checkpoint can begin—is this right? In 
> addition, when the window fires per second reduces, checkpoint is able to 
> continue and succeed.
> 
> So, I think that the surge of window firing combined with the sleep is the 
> source of the issue, which makes sense. I’m not sure how to confirm whether 
> or not the points about buffer sizes being insufficient for the window output 
> is also interplaying with this issue.
> 
> Best,
> Mason
> 
> 
>> On Dec 22, 2021, at 6:17 AM, Piotr Nowojski > <mailto:pnowoj...@apache.org>> wrote:
>> 
>> Hi Mason,
>> 
>> One more question. Are you sure your job is making any progress? Are records 
>> being processed? Hasn't your job simply deadlocked on some

Re: unaligned checkpoint for job with large start delay

2021-12-17 Thread Mason Chen
Hi Piotr,

Thanks for the link to the JIRA ticket, we actually don’t see much state size 
overhead between checkpoints in aligned vs unaligned, so we will go with your 
recommendation of using unaligned checkpoints with 0s alignment timeout.

For context, we are testing unaligned checkpoints with our application with 
these tasks: [kafka source, map, filter] -> keyby -> [session window] -> 
[various kafka sinks]. The first task has parallelism 40 and the rest of the 
tasks have parallelism 240. This is the FLIP 27 Kafka source.

We added an artificial sleep (25 ms per invocation of in process function) the 
session window task to simulate backpressure; however, we still see checkpoints 
failing due to task acknowledgement doesn’t complete within our checkpoint 
timeout (30 minutes).

I am able to correlate that the input buffers from window and output buffers 
from source being 100% usage corresponds to the checkpoint failures. When they 
are not full (input can drop to as low as 60% usage and output can drop to as 
low as 55% usage), the checkpoints succeed within less than 2 ms. In all cases, 
it is the session window task or source task failing to 100% acknowledge the 
barriers within timeout. I do see the source task acknowledgement taking long 
in some of the failures (e.g. 20 minutes, 30 minutes, 50 minutes, 1 hour, 2 
hours) and source is idle and not busy at this time.

All other input buffers are low usage (mostly 0). For output buffer, the usage 
is around 50% for window--everything else is near 0% all the time except the 
source mentioned before (makes sense since rest are just sinks).

We are also running a parallel Flink job with the same configurations, except 
with unaligned checkpoints disabled. Here we see observe the same behavior 
except now some of the checkpoints are failing due to the source task not 
acknowledging everything within timeout—however, most failures are still due to 
session window acknowledgement.

All the data seems to points an issue with the source? Now, I don’t know how to 
explain this behavior since unaligned checkpoints should overtake records in 
the buffers (once seen at the input buffer, forward immediately downstream to 
output buffer).

Just to confirm, this is our checkpoint configuration:
```
Option
Value
Checkpointing Mode  Exactly Once
Checkpoint Storage  FileSystemCheckpointStorage
State Backend   EmbeddedRocksDBStateBackend
Interval5m 0s
Timeout 30m 0s
Minimum Pause Between Checkpoints   2m 0s
Maximum Concurrent Checkpoints  1
Unaligned Checkpoints   Enabled
Persist Checkpoints Externally  Enabled (retain on cancellation)
Tolerable Failed Checkpoints10
```

Are there other metrics should I look at—why else should tasks fail 
acknowledgement in unaligned mode? Is it something about the implementation 
details of window function that I am not considering? My main hunch is 
something to do with the source.

Best,
Mason

> On Dec 16, 2021, at 12:25 AM, Piotr Nowojski  wrote:
> 
> Hi Mason,
> 
> In Flink 1.14 we have also changed the timeout behavior from checking against 
> the alignment duration, to simply checking how old is the checkpoint barrier 
> (so it would also account for the start delay) [1]. It was done in order to 
> solve problems as you are describing. Unfortunately we can not backport this 
> change to 1.13.x as it's a breaking change.
> 
> Anyway, from our experience I would recommend going all in with the unaligned 
> checkpoints, so setting the timeout back to the default value of 0ms. With 
> timeouts you are gaining very little (a tiny bit smaller state size if there 
> is no backpressure - tiny bit because without backpressure, even with timeout 
> set to 0ms, the amount of captured inflight data is basically insignificant), 
> while in practise you slow down the checkpoint barriers propagation time by 
> quite a lot.
> 
> Best,
> Piotrek
> 
> [1] https://issues.apache.org/jira/browse/FLINK-23041 
> <https://issues.apache.org/jira/browse/FLINK-23041>
> wt., 14 gru 2021 o 22:04 Mason Chen  <mailto:mas.chen6...@gmail.com>> napisał(a):
> Hi all,
> 
> I'm using Flink 1.13 and my job is experiencing high start delay, more so 
> than high alignment time. (our flip 27 kafka source is heavily 
> backpressured). Since our alignment timeout is set to 1s, the unaligned 
> checkpoint never triggers since alignment delay is always below the threshold.
> 
> It's seems there is only a configuration for alignment timeout but should 
> there also be one for start delay timeout: 
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#aligned-checkpoint-timeout
>  
> <https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#aligned-checkpoint-timeout>
> 
> I'm interested to know the reas

unaligned checkpoint for job with large start delay

2021-12-14 Thread Mason Chen
Hi all,

I'm using Flink 1.13 and my job is experiencing high start delay, more so
than high alignment time. (our flip 27 kafka source is heavily
backpressured). Since our alignment timeout is set to 1s, the unaligned
checkpoint never triggers since alignment delay is always below the
threshold.

It's seems there is only a configuration for alignment timeout but should
there also be one for start delay timeout:
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#aligned-checkpoint-timeout

I'm interested to know the reasoning why there isn't a timeout for start
delay as well--was it because it was deemed too complex for the user to
configure two parameters for unaligned checkpoints?

I'm aware of buffer debloating in 1.14 that could help but I'm trying to
see how far unaligned checkpointing can take me.

Best,
Mason


Log level for insufficient task slots message

2021-12-03 Thread Mason Chen
Hi all,

java.util.concurrent.CompletionException: 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Could not acquire the minimum required resources.

Is an exception/message that is thrown when the users misconfigures the job 
with insufficient task slots. Currently, this is logged as info level in Flink 
1.13.3 making it hard to find the error on our Splunk logging infrastructure.

What do people think about with regards to changing this logging to ERROR level?

Best,
Mason

Re: How do I configure commit offsets on checkpoint in new KafkaSource api?

2021-11-19 Thread Mason Chen
Hi Marco,

>
https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#additional-properties
In the new KafkaSource, you can configure it in your properties. You can
take a look at `KafkaSourceOptions#COMMIT_OFFSETS_ON_CHECKPOINT` for the
specific config, which is default to true if you have checkpoints enabled.

Best,
Mason

On Fri, Nov 19, 2021 at 7:45 AM Marco Villalobos 
wrote:

> The FlinkKafkaConsumer that will be deprecated has the method
> "setCommitOffsetsOnCheckpoints(boolan)" method.
>
> However, that functionality is not the new KafkaSource class.
>
> How is this behavior / functionality configured in the new API?
>
> -Marco A. Villalobos
>
>
>


Re: Kafka Source Recovery Behavior

2021-11-10 Thread Mason Chen
Hi all,

Any update on this?

Best,
Mason

On Sat, Oct 30, 2021 at 5:56 AM Arvid Heise  wrote:

> This seems to be a valid concern but I'm not deep enough to clearly say
> that this is indeed a bug. @renqschn  could you
> please double-check?
>
> On Thu, Oct 28, 2021 at 8:39 PM Mason Chen  wrote:
>
>> Hi all,
>>
>> I noticed that the KafkaSourceReader did not have a pointer to the
>> KafkaSubscriber, so I was wondering if this could be a bug:
>>
>> 1. User has a flink job with topic set A and takes savepoint
>> 2. User modifies flink job to read from topic set B; however, splits are
>> still read from topic set A (since there’s no logic to filter/remove
>> splits).
>>
>> To clarify, I haven’t tested this scenario out myself, but just through
>> reading the code, there was no logic to filter/remove splits. Just to add
>> and complete.
>>
>> Best,
>> Mason
>>
>


Re: How to refresh topics to ingest with KafkaSource?

2021-11-02 Thread Mason Chen
Hi Arvid,

I have some bandwidth to contribute to this task and am familiar with the code. 
Could you or another committer assign me this ticket?

Thanks,
Mason

> On Oct 30, 2021, at 5:24 AM, Arvid Heise  wrote:
> 
> Hi Mason,
> 
> thanks for creating that.
> 
> We are happy to take contribuitons (I flagged it as a starter task).
> 
> On Wed, Oct 27, 2021 at 2:36 AM Mason Chen  <mailto:mason.c...@apple.com>> wrote:
> Hi all,
> 
> I have a similar requirement to Preston. I created 
> https://issues.apache.org/jira/browse/FLINK-24660 
> <https://issues.apache.org/jira/browse/FLINK-24660> to track this effort.
> 
> Best,
> Mason
> 
>> On Oct 18, 2021, at 1:59 AM, Arvid Heise > <mailto:ar...@apache.org>> wrote:
>> 
>> Hi Preston,
>> 
>> if you still need to set KafkaSubscriber explicitly, could you please create 
>> a feature request for that? For now, you probably have to resort to 
>> reflection hacks and build against a the non-public KafkaSubscriber.
>> 
>> On Fri, Oct 15, 2021 at 4:03 AM Prasanna kumar 
>> mailto:prasannakumarram...@gmail.com>> wrote:
>> Yes you are right.
>> 
>> We tested recently to find that the flink jobs do not pick up the new topics 
>> that got created with the same pattern provided to flink kafka consumer.  
>> The topics are set only during the start of the jobs. 
>> 
>> Prasanna.
>> 
>> On Fri, 15 Oct 2021, 05:44 Preston Price, > <mailto:nacro...@gmail.com>> wrote:
>> Okay so topic discovery is possible with topic patterns, and maybe topic 
>> lists. However I don't believe it's possible to change the configured topic 
>> list, or topic pattern after the source is created.
>> 
>> On Thu, Oct 14, 2021, 3:52 PM Denis Nutiu > <mailto:denis.nu...@gmail.com>> wrote:
>> There is a setting for dynamic topic discovery 
>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#topic-and-partition-discovery
>>  
>> <https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#topic-and-partition-discovery>
>> Best,
>> 
>> Denis
>> 
>> 
>> On Fri, Oct 15, 2021 at 12:48 AM Denis Nutiu > <mailto:denis.nu...@gmail.com>> wrote:
>> Hi,
>> 
>> In my experience with the librdkafka client and the Go wrapper, the 
>> topic-pattern subscribe is reactive. The Flink Kafka connector might behave 
>> similarly. 
>> 
>> Best,
>> Denis
>> 
>> On Fri, Oct 15, 2021 at 12:34 AM Preston Price > <mailto:nacro...@gmail.com>> wrote:
>> No, the topic-pattern won't work for my case. Topics that I should subscribe 
>> to can be enabled/disabled based on settings I read from another system, so 
>> there's no way to craft a single regular expression that would fit the state 
>> of all potential topics. Additionally the documentation you linked seems to 
>> suggest that the regular expression is evaluated only once "when the job 
>> starts running". My understanding is it would not pick up new topics that 
>> match the pattern after the job starts.
>> 
>> 
>> On Wed, Oct 13, 2021 at 8:51 PM Caizhi Weng > <mailto:tsreape...@gmail.com>> wrote:
>> Hi!
>> 
>> I suppose you want to read from different topics every now and then? Does 
>> the topic-pattern option [1] in Table API Kafka connector meet your needs?
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/kafka/#topic-pattern
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/kafka/#topic-pattern>
>> Preston Price mailto:nacro...@gmail.com>> 
>> 于2021年10月14日周四 上午1:34写道:
>> The KafkaSource, and KafkaSourceBuilder appear to prevent users from 
>> providing their own KafkaSubscriber. Am I overlooking something?
>> 
>> In my case I have an external system that controls which topics we should be 
>> ingesting, and it can change over time. I need to add, and remove topics as 
>> we refresh configuration from this external system without having to stop 
>> and start our Flink job. Initially it appeared I could accomplish this by 
>> providing my own implementation of the `KafkaSubscriber` interface, which 
>> would be invoked periodically as configured by the 
>> `partition.discovery.interval.ms <http://partition.discovery.interval.ms/>` 
>> property. However there is no way to provide my implementation to the 
>> KafkaSource since the constructor for KafkaSource is package protected, and 
>> the KafkaSourceBuilder does not supply a way to provide the 
>> `KafkaSubscriber`.
>> 
>> How can I accomplish a period refresh of the topics to ingest?
>> 
>> Thanks
>> 
>> 
>> 
>> 
>> -- 
>> Regards,
>> Denis Nutiu
>> 
>> 
>> -- 
>> Regards,
>> Denis Nutiu
> 



Kafka Source Recovery Behavior

2021-10-28 Thread Mason Chen
Hi all,

I noticed that the KafkaSourceReader did not have a pointer to the
KafkaSubscriber, so I was wondering if this could be a bug:

1. User has a flink job with topic set A and takes savepoint
2. User modifies flink job to read from topic set B; however, splits are
still read from topic set A (since there’s no logic to filter/remove
splits).

To clarify, I haven’t tested this scenario out myself, but just through
reading the code, there was no logic to filter/remove splits. Just to add
and complete.

Best,
Mason


Re: How to refresh topics to ingest with KafkaSource?

2021-10-26 Thread Mason Chen
Hi all,

I have a similar requirement to Preston. I created 
https://issues.apache.org/jira/browse/FLINK-24660 
 to track this effort.

Best,
Mason

> On Oct 18, 2021, at 1:59 AM, Arvid Heise  wrote:
> 
> Hi Preston,
> 
> if you still need to set KafkaSubscriber explicitly, could you please create 
> a feature request for that? For now, you probably have to resort to 
> reflection hacks and build against a the non-public KafkaSubscriber.
> 
> On Fri, Oct 15, 2021 at 4:03 AM Prasanna kumar  > wrote:
> Yes you are right.
> 
> We tested recently to find that the flink jobs do not pick up the new topics 
> that got created with the same pattern provided to flink kafka consumer.  The 
> topics are set only during the start of the jobs. 
> 
> Prasanna.
> 
> On Fri, 15 Oct 2021, 05:44 Preston Price,  > wrote:
> Okay so topic discovery is possible with topic patterns, and maybe topic 
> lists. However I don't believe it's possible to change the configured topic 
> list, or topic pattern after the source is created.
> 
> On Thu, Oct 14, 2021, 3:52 PM Denis Nutiu  > wrote:
> There is a setting for dynamic topic discovery 
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#topic-and-partition-discovery
>  
> 
> Best,
> 
> Denis
> 
> 
> On Fri, Oct 15, 2021 at 12:48 AM Denis Nutiu  > wrote:
> Hi,
> 
> In my experience with the librdkafka client and the Go wrapper, the 
> topic-pattern subscribe is reactive. The Flink Kafka connector might behave 
> similarly. 
> 
> Best,
> Denis
> 
> On Fri, Oct 15, 2021 at 12:34 AM Preston Price  > wrote:
> No, the topic-pattern won't work for my case. Topics that I should subscribe 
> to can be enabled/disabled based on settings I read from another system, so 
> there's no way to craft a single regular expression that would fit the state 
> of all potential topics. Additionally the documentation you linked seems to 
> suggest that the regular expression is evaluated only once "when the job 
> starts running". My understanding is it would not pick up new topics that 
> match the pattern after the job starts.
> 
> 
> On Wed, Oct 13, 2021 at 8:51 PM Caizhi Weng  > wrote:
> Hi!
> 
> I suppose you want to read from different topics every now and then? Does the 
> topic-pattern option [1] in Table API Kafka connector meet your needs?
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/kafka/#topic-pattern
>  
> 
> Preston Price mailto:nacro...@gmail.com>> 于2021年10月14日周四 
> 上午1:34写道:
> The KafkaSource, and KafkaSourceBuilder appear to prevent users from 
> providing their own KafkaSubscriber. Am I overlooking something?
> 
> In my case I have an external system that controls which topics we should be 
> ingesting, and it can change over time. I need to add, and remove topics as 
> we refresh configuration from this external system without having to stop and 
> start our Flink job. Initially it appeared I could accomplish this by 
> providing my own implementation of the `KafkaSubscriber` interface, which 
> would be invoked periodically as configured by the 
> `partition.discovery.interval.ms ` 
> property. However there is no way to provide my implementation to the 
> KafkaSource since the constructor for KafkaSource is package protected, and 
> the KafkaSourceBuilder does not supply a way to provide the `KafkaSubscriber`.
> 
> How can I accomplish a period refresh of the topics to ingest?
> 
> Thanks
> 
> 
> 
> 
> -- 
> Regards,
> Denis Nutiu
> 
> 
> -- 
> Regards,
> Denis Nutiu



FlinkKafkaConsumer -> KafkaSource State Migration

2021-10-26 Thread Mason Chen
Hi all,

I read these instructions for migrating to the KafkaSource:
https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
.

Do we need to employ any uid/allowNonRestoredState tricks if our Flink job
is also stateful outside of the source? Or what is the mechanism that
resolves the topic/partition/offsets in the stateful upgrade? Will
restoring from FlinkKafkaConsumer cause an exception due to incompatibility
of the union state to the current (what is it again)?

Best,
Mason


Re: SplitEnumeratorContext callAsync() cleanup

2021-10-26 Thread Mason Chen
Hi Fabian,

Unfortunately, I don't have the log since I was just testing it out on my
local setup. I can try to reproduce it later in the week.

Best,
Mason

On Mon, Oct 25, 2021 at 8:09 AM Fabian Paul 
wrote:

> Hi Mason,
>
> Thanks for opening the ticket. Can you also share the log with us when the
> KafkaEnumerator closed before the async call finished?
>
> Best,
> Fabian


Re: SplitEnumeratorContext callAsync() cleanup

2021-10-22 Thread Mason Chen
Hi Fabian,

Here we are: https://issues.apache.org/jira/browse/FLINK-24622 


Feel free to modify the description as I lazily copied and pasted our 
discussion here.

Best,
Mason

> On Oct 22, 2021, at 3:31 AM, Fabian Paul  wrote:
> 
> Hi Mason,
> 
> This seems to be a bug with the current KafkaSource and also the unified 
> Sources in general. Can you open a bug ticket in jira? I think the enumerator 
> should take of first joining all the async threads before closing the 
> enumerator.
> 
> Best,
> Fabian



SplitEnumeratorContext callAsync() cleanup

2021-10-21 Thread Mason Chen
Hi all,

I was wondering how to cancel a task that is enqueued by the callAsync()
method, the one that takes in a time interval. For example, the KafkaSource
uses this for topic partition discovery. It would be straightforward if the
API returned the underlying future so that a process can cancel it.

For Kafka, the enumerator shutdown seems to be unclean since it only closes
the admin client and kafka consumer but not the topic partition discovery
task. Furthermore, exceptions from that task will cause job failure and can
potentially happen if the task is still running with the admin client
closed. How can we address this?

Best,
Mason


SplitFetcherManager custom error handler

2021-10-18 Thread Mason Chen
Hi all,

I am implementing a Kafka connector with some custom error handling
that is aligned with our internal infrastructure. `SplitFetcherManager` has
a hardcoded error handler in the constructor and I was wondering if it
could be exposed by the classes that extend it. Happy to contribute if
people are interested.

Best,
Mason


Removing metrics

2021-10-14 Thread Mason Chen
Hi all,

Suppose I have a short lived process within a UDF that defines metrics.
After the process has completed, the underlying resources should be cleaned
up. Is there an API to remove/unregister metrics?

Best,
Mason


Kafka Partition Discovery

2021-09-22 Thread Mason Chen
Hi all,

We are sometimes facing a connection issue with Kafka when a broker restarts

```
java.lang.RuntimeException:
org.apache.kafka.common.errors.TimeoutException: Timeout expired while
fetching topic metadata
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:846)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:828)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired
while fetching topic metadata
```

Can a retry be added to the partition discovery mechanism?

Best,
Mason


Re: Tracking Total Metrics Reported

2021-09-15 Thread Mason Chen
For it to be most useful, the user should be able to obtain the total
number of counters, gauges, meters, and histograms, separately.

On Wed, Sep 15, 2021 at 6:23 PM Mason Chen  wrote:

> Hi all,
>
> Does Flink have any sort of feature to track the total number of metrics
> reported by the Flink job? Ideally, the total would be reported by the job
> manager. Even if there is a log that exposes this information, that would
> be helpful!
>
> Best,
> Mason
>


Tracking Total Metrics Reported

2021-09-15 Thread Mason Chen
Hi all,

Does Flink have any sort of feature to track the total number of metrics
reported by the Flink job? Ideally, the total would be reported by the job
manager. Even if there is a log that exposes this information, that would
be helpful!

Best,
Mason


Obtaining Flink Conf in User Code

2021-09-03 Thread Mason Chen
Hi all,

Is it possible to obtain the Flink configuration in the user code? I've
tried the Configuration parameter in the open method of rich functions and
StreamExecutionEnvironment.getConfig().getGlobalJobParameters()--both do
not give the configs in the flink configuration.

Best,
Mason


Re: Kafka Metrics

2021-08-23 Thread Mason Chen
Sweet, I suspected it but I thought I might ask anyway.

Consequently, I've implemented a deny list feature for my reporter (based
on groupNameKey and metricName). The reporter will skip reporting metrics
if a metric's variables set contains keys that map to the groupNameKey and
if the metric has a name equal to the specified metricName.

Configurations are specified as follows
`groupNameKey1:metricName1;groupNameKey2:metricName2`. Thus, I can deny
list KafkaConsumer (group name key) and committed_offsets (metric name)
which correspond to the legacy kafka metrics.

Would Flink appreciate this as a contribution? I can see this being used
generically over all reporters.

Best,
Mason

On Mon, Aug 23, 2021 at 8:21 AM Arvid Heise  wrote:

> Hi Mason,
>
> I'm afraid it's an all-or-nothing. Either you get the proxied metrics with
> all partitions or none.
>
> You could also implement a custom MetricReporter that delegates to your
> actual reporter and filters the respective metrics.
>
> Best,
>
> Arvid
>
> On Fri, Aug 20, 2021 at 8:16 AM Mason Chen  wrote:
>
>> FYI, I'm referring to the legacy offsets metric gauges.
>>
>> On Thu, Aug 19, 2021 at 4:53 PM Mason Chen 
>> wrote:
>>
>>> Hi all,
>>>
>>> We have found that the per partition Kafka metrics contributes to a lot
>>> of metrics being indexed by our metrics system.
>>>
>>> We would still like to have the proxied kafka metrics from the kafka
>>> clients library. Is there a flag to only exclude Flink's additional Kafka
>>> metrics?
>>>
>>> Best,
>>> Mason
>>>
>>>


Re: Kafka Metrics

2021-08-20 Thread Mason Chen
FYI, I'm referring to the legacy offsets metric gauges.

On Thu, Aug 19, 2021 at 4:53 PM Mason Chen  wrote:

> Hi all,
>
> We have found that the per partition Kafka metrics contributes to a lot of
> metrics being indexed by our metrics system.
>
> We would still like to have the proxied kafka metrics from the kafka
> clients library. Is there a flag to only exclude Flink's additional Kafka
> metrics?
>
> Best,
> Mason
>
>


Kafka Metrics

2021-08-19 Thread Mason Chen
Hi all,

We have found that the per partition Kafka metrics contributes to a lot of
metrics being indexed by our metrics system.

We would still like to have the proxied kafka metrics from the kafka
clients library. Is there a flag to only exclude Flink's additional Kafka
metrics?

Best,
Mason


1.13 Flamegraphs

2021-08-06 Thread Mason Chen
Hi all,

Does the sample processing also sample threads that do not belong to the
Flink framework? For example, a background thread that is created by and
managed by the user?

Best,
Mason


Re: as-variable configuration for state ac

2021-07-27 Thread Mason Chen
+ user mailing list

I don't have permission to assign to you, but here is the JIRA ticket:
https://issues.apache.org/jira/browse/FLINK-23519

Thanks!

On Tue, Jul 27, 2021 at 4:40 AM Yun Tang  wrote:

> Hi Mason,
>
> I think this request is reasonable and you could create a JIRA ticket so
> that we could resolve it later.
>
>
> Best,
> Yun Tang
> ------
> *From:* Mason Chen 
> *Sent:* Tuesday, July 27, 2021 15:15
> *To:* Yun Tang 
> *Cc:* Mason Chen ; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: as-variable configuration for state ac
>
> Yup, your understand is correct—that was the analogy I was trying to make!
>
> On Jul 26, 2021, at 7:57 PM, Yun Tang  wrote:
>
> Hi Mason,
>
> In rocksDB, one state is corresponding to a column family and we could
> aggregate all RocksDB native metrics per column family. If my understanding
> is right, are you hoping that all state latency metrics for a particular
> state could be aggregated per state level?
>
>
> Best
> Yun Tang
> --
> *From:* Mason Chen 
> *Sent:* Tuesday, July 27, 2021 4:24
> *To:* user@flink.apache.org 
> *Subject:* as-variable configuration for state ac
>
> We have been using the state backend latency tracking metrics from Flink
> 1.13. To make metrics aggregation easier, could there be a config to expose
> something like `state.backend.rocksdb.metrics.column-family-as-variable`
> that rocksdb provides to do aggregation across column families.
>
> In this case, it would be the various components of state.
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-metrics-column-family-as-variable
>
>
>


Re: as-variable configuration for state ac

2021-07-27 Thread Mason Chen
Yup, your understand is correct—that was the analogy I was trying to make!

> On Jul 26, 2021, at 7:57 PM, Yun Tang  wrote:
> 
> Hi Mason,
> 
> In rocksDB, one state is corresponding to a column family and we could 
> aggregate all RocksDB native metrics per column family. If my understanding 
> is right, are you hoping that all state latency metrics for a particular 
> state could be aggregated per state level? 
> 
> 
> Best
> Yun Tang
> From: Mason Chen 
> Sent: Tuesday, July 27, 2021 4:24
> To: user@flink.apache.org 
> Subject: as-variable configuration for state ac
>  
> We have been using the state backend latency tracking metrics from Flink 
> 1.13. To make metrics aggregation easier, could there be a config to expose 
> something like `state.backend.rocksdb.metrics.column-family-as-variable` that 
> rocksdb provides to do aggregation across column families.
> 
> In this case, it would be the various components of state.
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-metrics-column-family-as-variable
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-metrics-column-family-as-variable>


as-variable configuration for state ac

2021-07-26 Thread Mason Chen
We have been using the state backend latency tracking metrics from Flink
1.13. To make metrics aggregation easier, could there be a config to expose
something like `state.backend.rocksdb.metrics.column-family-as-variable`
that rocksdb provides to do aggregation across column families.

In this case, it would be the various components of state.

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-metrics-column-family-as-variable


Re: User Classpath from Plugin

2021-07-13 Thread Mason Chen
I've read this page (
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/filesystems/plugins/),
but would like to know more about modifying the whitelist so I can read the
class.

On Tue, Jul 13, 2021 at 2:54 PM Mason Chen  wrote:

> Hi all,
>
> How can I read the user classpath from a Flink plugin (e.g. one of the
> metric reporters)?
>
> Best,
> Mason
>


User Classpath from Plugin

2021-07-13 Thread Mason Chen
Hi all,

How can I read the user classpath from a Flink plugin (e.g. one of the
metric reporters)?

Best,
Mason


Flink Metric Reporting from Job Manager

2021-07-07 Thread Mason Chen
Hi all,

Does Flink support reporting metrics from the main method that is ran on the 
Job Manager? In this case, we want to report a failure to add an operator to 
the Job Graph.

Best,
Mason

Re: Flink exported metrics scope configuration

2021-06-03 Thread Mason Chen
Hi Kai,

You can use the excluded variables config for the reporter. 
metrics.reporter..scope.variables.excludes: (optional) A semi-colon (;) 
separate list of variables that should be ignored by tag-based reporters (e.g., 
Prometheus, InfluxDB).

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/metric_reporters/#reporter
 


Best,
Mason

> On Jun 3, 2021, at 9:31 PM, Kai Fu  wrote:
> 
> Hi team,
> 
> We noticed that Prometheus metrics exporter exports all of the metrics at the 
> most fine-grained level, which is tremendous for the prometheus server 
> especially when the parallelism is high. The metrics volume crawled from a 
> single host(parallelism 8) is around 40MB for us currently. This is due to 
> task_name attribute in the metrics generated by the engine being very long. 
> The task_name attribute is auto-generated from SQL job, and it seems it's 
> attaching all field names onto it.
> 
> We want to reduce the metrics volume by either drop task_name or at some more 
> coarse-grained level. But I cannot find any related documents about this, any 
> advice on that? 
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/metric_reporters/
>  
> 
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/metrics/#list-of-all-variables
>  
> 
> 
> -- 
> Best wishes,
> - Kai



Re: Prometheus Reporter Enhancement

2021-06-02 Thread Mason Chen
Hi Chesnay,

I would like to take on https://issues.apache.org/jira/browse/FLINK-17495 
<https://issues.apache.org/jira/browse/FLINK-17495> as a contribution to OSS, 
if that’s alright with the team. We can discuss implementation details here or 
in the ticket, but I was thinking about modifying the ReporterScopedSettings to 
enable this generic tag support.

Best,
Mason

> On May 20, 2021, at 4:36 AM, Chesnay Schepler  wrote:
> 
> There is no plan to generally exclude label keys from the metric 
> identifier/logical scope. They ensure that the label set for a given 
> identifier/scope is unique, i.e., you can't have 2 metrics called 
> "numRecordsIn" with different label sets. Changing this would also break all 
> existing setups, so if anything if would have to be an opt-in feature.
> 
> What I envision more is for the user to have more control over the metric 
> identifier/logical scope via the scope formats. They are currently quite 
> limited by only   controlling part of the final identifier, while the 
> logical scope isn't controllable at all.
> 
> Generally though, there's a fair bit of internal re-structuring that we'd 
> like to do before extending the metric system further, because we've been 
> tacking on more and more things since it was released in 1.3.0 (!!!) but 
> barely refactored things to properly fit together.
> 
> On 5/20/2021 12:58 AM, Mason Chen wrote:
>> Are there any plans to rework some of the metric name formulations 
>> (getMetricIdentifier or getLogicalScope)? Currently, the label keys and/or 
>> label values are concatenated in the metric name and the information is 
>> redundant and makes the metric names longer.
>> 
>> Would it make sense to remove the tag related information 
>> (getAllVariables())?
>> 
>>> On May 18, 2021, at 3:45 PM, Chesnay Schepler >> <mailto:ches...@apache.org>> wrote:
>>> 
>>> There is already a ticket for this. Note that this functionality should be 
>>> implemented in a generic fashion to be usable for all reporters.
>>> 
>>> https://issues.apache.org/jira/browse/FLINK-17495 
>>> <https://issues.apache.org/jira/browse/FLINK-17495>
>>> 
>>> On 5/18/2021 8:16 PM, Andrew Otto wrote:
>>>> Sounds useful!
>>>> 
>>>> On Tue, May 18, 2021 at 2:02 PM Mason Chen >>> <mailto:mason.c...@apple.com>> wrote:
>>>> Hi all,
>>>> 
>>>> Would people appreciate enhancements to the prometheus reporter to include 
>>>> extra labels via a configuration, as a contribution to Flink? I can see it 
>>>> being useful for adding labels that are not job specific, but infra 
>>>> specific.
>>>> 
>>>> The change would be nicely integrated with the Flink’s ConfigOptions and 
>>>> unit tested.
>>>> 
>>>> Best,
>>>> Mason
>>> 
>> 
> 



Re: Flink Metrics Naming

2021-06-01 Thread Mason Chen
Upon further inspection, it seems like the user scope is not universal (i.e. 
comes through the connectors and not UDFs (like rich map function)), but the 
question still stands if the process makes sense.

> On Jun 1, 2021, at 10:38 AM, Mason Chen  wrote:
> 
> Makes sense. We are primarily concerned with removing the metric labels from 
> the names as the user metrics get too long. i.e. the groups from `addGroup` 
> are concatenated in the metric name.
> 
> Do you think there would be any issues with removing the group information in 
> the metric name and putting them into a label instead? In seems like most 
> metrics internally, don’t use `addGroup` to create group information but 
> rather by creating another subclass of metric group.
> 
> Perhaps, I should ONLY apply this custom logic to metrics with the “user” 
> scope? Other scoped metrics (e.g. operator, task operator, etc.) shouldn’t 
> have these group names in the metric names in my experience...
> 
> An example just for clarity, 
> flink__group1_group2_metricName{group1=…, group2=…, flink tags}
> 
> =>
> 
> flink__metricName{group_info=group1_group2, group1=…, group2=…, 
> flink tags}
> 
>> On Jun 1, 2021, at 9:57 AM, Chesnay Schepler > <mailto:ches...@apache.org>> wrote:
>> 
>> The uniqueness of metrics and the naming of the Prometheus reporter are 
>> somewhat related but also somewhat orthogonal.
>> 
>> Prometheus works similar to JMX in that the metric name (e.g., 
>> taskmanager.job.task.operator.numRecordsIn) is more or less a _class_ of 
>> metrics, with tags/labels allowing you to select a specific instance of that 
>> metric.
>> 
>> Restricting metric names to 1 level of the hierarchy would present a few 
>> issues:
>> a) Effectively, all metric names that Flink uses effectively become reserved 
>> keywords that users must not use, which will lead to headaches when adding 
>> more metrics or forwarding metrics from libraries (e.g., kafka), because we 
>> could always break existing user-defined metrics.
>> b) You'd need a cluster-wide lookup that is aware of all hierarchies to 
>> ensure consistency across all processes.
>> 
>> In the end, there are significantly easier ways to solve the issue of the 
>> metric name being too long, i.e., give the user more control over the 
>> logical scope (taskmanager.job.task.operator), be it shortening the names 
>> (t.j.t.o), limiting the depth (e.g, operator.numRecordsIn), removing it 
>> outright (but I'd prefer some context to be present for clarity) or 
>> supporting something similar to scope formats.
>> I'm reasonably certain there are some tickets already in this direction, we 
>> just don't get around to doing them because for the most part the metric 
>> system works good enough and there are bigger fish to fry.
>> 
>> On 6/1/2021 3:39 PM, Till Rohrmann wrote:
>>> Hi Mason,
>>> 
>>> The idea is that a metric is not uniquely identified by its name alone but 
>>> instead by its path. The groups in which it is defined specify this path 
>>> (similar to directories). That's why it is valid to specify two metrics 
>>> with the same name if they reside in different groups.
>>> 
>>> I think Prometheus does not support such a tree structure and that's why 
>>> the path is exposed via labels if I am not mistaken. So long story short, 
>>> what you are seeing is a combination of how Flink organizes metrics and 
>>> what can be reported to Prometheus. 
>>> 
>>> I am also pulling in Chesnay who is more familiar with this part of the 
>>> code.
>>> 
>>> Cheers,
>>> Till
>>> 
>>> On Fri, May 28, 2021 at 7:33 PM Mason Chen >> <mailto:mason.c...@apple.com>> wrote:
>>> Can anyone give insight as to why Flink allows 2 metrics with the same 
>>> “name”?
>>> 
>>> For example,
>>> 
>>> getRuntimeContext.addGroup(“group”, “group1”).counter(“myMetricName”);
>>> 
>>> And
>>> 
>>> getRuntimeContext.addGroup(“other_group”, 
>>> “other_group1”).counter(“myMetricName”);
>>> 
>>> Are totally valid.
>>> 
>>> 
>>> It seems that it has lead to some not-so-great implementations—the 
>>> prometheus reporter and attaching the labels to the metric name, making the 
>>> name quite verbose.
>>> 
>>> 
>> 
> 



Re: Flink Metrics Naming

2021-06-01 Thread Mason Chen
Makes sense. We are primarily concerned with removing the metric labels from 
the names as the user metrics get too long. i.e. the groups from `addGroup` are 
concatenated in the metric name.

Do you think there would be any issues with removing the group information in 
the metric name and putting them into a label instead? In seems like most 
metrics internally, don’t use `addGroup` to create group information but rather 
by creating another subclass of metric group.

Perhaps, I should ONLY apply this custom logic to metrics with the “user” 
scope? Other scoped metrics (e.g. operator, task operator, etc.) shouldn’t have 
these group names in the metric names in my experience...

An example just for clarity, 
flink__group1_group2_metricName{group1=…, group2=…, flink tags}

=>

flink__metricName{group_info=group1_group2, group1=…, group2=…, 
flink tags}

> On Jun 1, 2021, at 9:57 AM, Chesnay Schepler  wrote:
> 
> The uniqueness of metrics and the naming of the Prometheus reporter are 
> somewhat related but also somewhat orthogonal.
> 
> Prometheus works similar to JMX in that the metric name (e.g., 
> taskmanager.job.task.operator.numRecordsIn) is more or less a _class_ of 
> metrics, with tags/labels allowing you to select a specific instance of that 
> metric.
> 
> Restricting metric names to 1 level of the hierarchy would present a few 
> issues:
> a) Effectively, all metric names that Flink uses effectively become reserved 
> keywords that users must not use, which will lead to headaches when adding 
> more metrics or forwarding metrics from libraries (e.g., kafka), because we 
> could always break existing user-defined metrics.
> b) You'd need a cluster-wide lookup that is aware of all hierarchies to 
> ensure consistency across all processes.
> 
> In the end, there are significantly easier ways to solve the issue of the 
> metric name being too long, i.e., give the user more control over the logical 
> scope (taskmanager.job.task.operator), be it shortening the names (t.j.t.o), 
> limiting the depth (e.g, operator.numRecordsIn), removing it outright (but 
> I'd prefer some context to be present for clarity) or supporting something 
> similar to scope formats.
> I'm reasonably certain there are some tickets already in this direction, we 
> just don't get around to doing them because for the most part the metric 
> system works good enough and there are bigger fish to fry.
> 
> On 6/1/2021 3:39 PM, Till Rohrmann wrote:
>> Hi Mason,
>> 
>> The idea is that a metric is not uniquely identified by its name alone but 
>> instead by its path. The groups in which it is defined specify this path 
>> (similar to directories). That's why it is valid to specify two metrics with 
>> the same name if they reside in different groups.
>> 
>> I think Prometheus does not support such a tree structure and that's why the 
>> path is exposed via labels if I am not mistaken. So long story short, what 
>> you are seeing is a combination of how Flink organizes metrics and what can 
>> be reported to Prometheus. 
>> 
>> I am also pulling in Chesnay who is more familiar with this part of the code.
>> 
>> Cheers,
>> Till
>> 
>> On Fri, May 28, 2021 at 7:33 PM Mason Chen > <mailto:mason.c...@apple.com>> wrote:
>> Can anyone give insight as to why Flink allows 2 metrics with the same 
>> “name”?
>> 
>> For example,
>> 
>> getRuntimeContext.addGroup(“group”, “group1”).counter(“myMetricName”);
>> 
>> And
>> 
>> getRuntimeContext.addGroup(“other_group”, 
>> “other_group1”).counter(“myMetricName”);
>> 
>> Are totally valid.
>> 
>> 
>> It seems that it has lead to some not-so-great implementations—the 
>> prometheus reporter and attaching the labels to the metric name, making the 
>> name quite verbose.
>> 
>> 
> 



Flink Metrics Naming

2021-05-28 Thread Mason Chen
Can anyone give insight as to why Flink allows 2 metrics with the same “name”?

For example,

getRuntimeContext.addGroup(“group”, “group1”).counter(“myMetricName”);

And

getRuntimeContext.addGroup(“other_group”, 
“other_group1”).counter(“myMetricName”);

Are totally valid.


It seems that it has lead to some not-so-great implementations—the prometheus 
reporter and attaching the labels to the metric name, making the name quite 
verbose.




Re: Prometheus Reporter Enhancement

2021-05-19 Thread Mason Chen
Are there any plans to rework some of the metric name formulations 
(getMetricIdentifier or getLogicalScope)? Currently, the label keys and/or 
label values are concatenated in the metric name and the information is 
redundant and makes the metric names longer.

Would it make sense to remove the tag related information (getAllVariables())?

> On May 18, 2021, at 3:45 PM, Chesnay Schepler  wrote:
> 
> There is already a ticket for this. Note that this functionality should be 
> implemented in a generic fashion to be usable for all reporters.
> 
> https://issues.apache.org/jira/browse/FLINK-17495 
> <https://issues.apache.org/jira/browse/FLINK-17495>
> 
> On 5/18/2021 8:16 PM, Andrew Otto wrote:
>> Sounds useful!
>> 
>> On Tue, May 18, 2021 at 2:02 PM Mason Chen > <mailto:mason.c...@apple.com>> wrote:
>> Hi all,
>> 
>> Would people appreciate enhancements to the prometheus reporter to include 
>> extra labels via a configuration, as a contribution to Flink? I can see it 
>> being useful for adding labels that are not job specific, but infra specific.
>> 
>> The change would be nicely integrated with the Flink’s ConfigOptions and 
>> unit tested.
>> 
>> Best,
>> Mason
> 



Prometheus Reporter Enhancement

2021-05-18 Thread Mason Chen
Hi all,

Would people appreciate enhancements to the prometheus reporter to include 
extra labels via a configuration, as a contribution to Flink? I can see it 
being useful for adding labels that are not job specific, but infra specific.

The change would be nicely integrated with the Flink’s ConfigOptions and unit 
tested.

Best,
Mason