Re: [DISCUSS][2.0] Deprecating Accumulator in favor of MetricsGroup

2023-09-01 Thread Matthias Pohl
Thanks for the input. David is right: Beam is also utilizing the
accumulators [1]. In this sense you're right that this would require a more
wide-spread discussion whether other users would be affected as well. I
will give it a bit more thoughts.

[1]
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java#L167

On Tue, Aug 29, 2023 at 7:58 AM David Morávek  wrote:

> AFAIK Apache Beam also used acummulators for metric collection, which is
> indeed a major use case.
>
> I’m not convinced that MetricGroup is fuĺly replacing what acummulators
> have to offer though; OperatorCoordinators might be able to rplace
> remaining capabilities, but this need bit more thoughts, the missing part
> there would be that accumulators are part of the JobResult.
>
> On Tue 29. 8. 2023 at 6:12, Xintong Song  wrote:
>
> > Thanks for bringing this up, Matthias.
> >
> > One thing that a user may achieve with an accumulator but not with a
> metric
> > group is to programmatically fetch the job execution result, rather than
> > outputting the results to an external sink, in attached mode. This can
> also
> > be achieved by using CollectSink, which is still @Experimental and
> > internally uses accumulators. So I guess it depends on 1) how stable we
> > think CollectSink is now, and 2) how many users directly use accumulators
> > rather than CollectSink and whether their requirements can be fully
> covered
> > by CollectSink. For 2), we probably also need to involve the user@ ML.
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Wed, Aug 23, 2023 at 11:00 PM Matthias Pohl
> >  wrote:
> >
> > > Hi everyone,
> > > I was looking into serializing the ArchivedExecutionGraph for another
> > FLIP
> > > and came across Accumulators [1] (don't mix that one up with the window
> > > accumulators of the Table/SQL API). Accumulators were introduced in
> Flink
> > > quite a while ago in Statosphere PR #340 [2].
> > >
> > > I had a brief chat with Chesnay about it who pointed out that there was
> > an
> > > intention to use this for collecting metrics in the past. The
> Accumulator
> > > JavaDoc provides a hint that it was inspired by Hadoop's Counter
> concept
> > > [3] which also sounds like it is more or less equivalent to Flink's
> > > metrics.
> > >
> > > The Accumulator is currently accessible through the RuntimeContext
> > > interface which provides addAccumuator [4] and getAccumulator [5].
> Usages
> > > for these messages appear in the following classes:
> > > - CollectSinkFunction [6]: Here it's used to collect the final data
> when
> > > closing the function. This feels like a misuse of the feature. Instead,
> > the
> > > CollectSink could block the close call until all data was fetched from
> > the
> > > client program.
> > > - DataSet.collect() [7]: Uses CollectHelper utilizes
> > > SerializedListAccumulator to collect the final data similarly to
> > > CollectSinkFunction
> > > - EmptyFieldsCountAccumulator [8] is an example program that counts
> empty
> > > fields. This could be migrated to MetricGroups
> > > - ChecksumHashCodeHelper [9] is used in DataSetUtils where the calling
> > > method is marked as deprecated for 2.0 already
> > > - CollectOutputFormat [10] uses SerializedListAccumulator analogously
> to
> > > DataSet.collect(). This class will be removed with the removal of the
> > Scala
> > > API in 2.0.
> > >
> > > The initial investigation brings me to the conclusion that we can
> remove
> > > the Accumulator feature in favor of Metrics and proper collect
> > > implementations: That would also help cleaning up the
> > > (Archived)ExecutionGraph: IMHO, we should have a clear separation
> between
> > > Metrics (which are part of the ExecutionGraph) and processed data
> (which
> > > shouldn't be part of the ExecutionGraph).
> > >
> > > I'm curious what others think about this. Did I miss a scenario where
> > > Accumulators are actually needed? Or is this already part of some other
> > 2.0
> > > effort [11] which I missed? I would suggest removing it could be a
> > > nice-to-have item for 2.0.
> > >
> > > Best,
> > > Matthias
> > >
> > >
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java
> > > <
> > >
> >
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java#L40
> > > >
> > > [2] https://github.com/stratosphere/stratosphere/pull/340
> > > [3]
> > >
> > >
> >
> https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/Counters.html
> > > [4]
> > >
> > >
> >
> https://github.com/apache/flink/blob/63ee60859cac64f2bc6cfe2c5015ceb1199cea9c/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L156
> > > [5]
> > >
> > >
> >
> 

Re: [DISCUSS][2.0] Deprecating Accumulator in favor of MetricsGroup

2023-08-28 Thread David Morávek
AFAIK Apache Beam also used acummulators for metric collection, which is
indeed a major use case.

I’m not convinced that MetricGroup is fuĺly replacing what acummulators
have to offer though; OperatorCoordinators might be able to rplace
remaining capabilities, but this need bit more thoughts, the missing part
there would be that accumulators are part of the JobResult.

On Tue 29. 8. 2023 at 6:12, Xintong Song  wrote:

> Thanks for bringing this up, Matthias.
>
> One thing that a user may achieve with an accumulator but not with a metric
> group is to programmatically fetch the job execution result, rather than
> outputting the results to an external sink, in attached mode. This can also
> be achieved by using CollectSink, which is still @Experimental and
> internally uses accumulators. So I guess it depends on 1) how stable we
> think CollectSink is now, and 2) how many users directly use accumulators
> rather than CollectSink and whether their requirements can be fully covered
> by CollectSink. For 2), we probably also need to involve the user@ ML.
>
> Best,
>
> Xintong
>
>
>
> On Wed, Aug 23, 2023 at 11:00 PM Matthias Pohl
>  wrote:
>
> > Hi everyone,
> > I was looking into serializing the ArchivedExecutionGraph for another
> FLIP
> > and came across Accumulators [1] (don't mix that one up with the window
> > accumulators of the Table/SQL API). Accumulators were introduced in Flink
> > quite a while ago in Statosphere PR #340 [2].
> >
> > I had a brief chat with Chesnay about it who pointed out that there was
> an
> > intention to use this for collecting metrics in the past. The Accumulator
> > JavaDoc provides a hint that it was inspired by Hadoop's Counter concept
> > [3] which also sounds like it is more or less equivalent to Flink's
> > metrics.
> >
> > The Accumulator is currently accessible through the RuntimeContext
> > interface which provides addAccumuator [4] and getAccumulator [5]. Usages
> > for these messages appear in the following classes:
> > - CollectSinkFunction [6]: Here it's used to collect the final data when
> > closing the function. This feels like a misuse of the feature. Instead,
> the
> > CollectSink could block the close call until all data was fetched from
> the
> > client program.
> > - DataSet.collect() [7]: Uses CollectHelper utilizes
> > SerializedListAccumulator to collect the final data similarly to
> > CollectSinkFunction
> > - EmptyFieldsCountAccumulator [8] is an example program that counts empty
> > fields. This could be migrated to MetricGroups
> > - ChecksumHashCodeHelper [9] is used in DataSetUtils where the calling
> > method is marked as deprecated for 2.0 already
> > - CollectOutputFormat [10] uses SerializedListAccumulator analogously to
> > DataSet.collect(). This class will be removed with the removal of the
> Scala
> > API in 2.0.
> >
> > The initial investigation brings me to the conclusion that we can remove
> > the Accumulator feature in favor of Metrics and proper collect
> > implementations: That would also help cleaning up the
> > (Archived)ExecutionGraph: IMHO, we should have a clear separation between
> > Metrics (which are part of the ExecutionGraph) and processed data (which
> > shouldn't be part of the ExecutionGraph).
> >
> > I'm curious what others think about this. Did I miss a scenario where
> > Accumulators are actually needed? Or is this already part of some other
> 2.0
> > effort [11] which I missed? I would suggest removing it could be a
> > nice-to-have item for 2.0.
> >
> > Best,
> > Matthias
> >
> >
> >
> > [1]
> >
> >
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java
> > <
> >
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java#L40
> > >
> > [2] https://github.com/stratosphere/stratosphere/pull/340
> > [3]
> >
> >
> https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/Counters.html
> > [4]
> >
> >
> https://github.com/apache/flink/blob/63ee60859cac64f2bc6cfe2c5015ceb1199cea9c/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L156
> > [5]
> >
> >
> https://github.com/apache/flink/blob/63ee60859cac64f2bc6cfe2c5015ceb1199cea9c/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L165
> >
> > [6]
> >
> >
> https://github.com/apache/flink/blob/5ae8cb0503449b07f76d0ab621c3e81734496b26/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java#L304
> > [7]
> >
> >
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-java/src/main/java/org/apache/flink/api/java/Utils.java#L145
> > [8]
> >
> >
> 

Re: [DISCUSS][2.0] Deprecating Accumulator in favor of MetricsGroup

2023-08-28 Thread Xintong Song
Thanks for bringing this up, Matthias.

One thing that a user may achieve with an accumulator but not with a metric
group is to programmatically fetch the job execution result, rather than
outputting the results to an external sink, in attached mode. This can also
be achieved by using CollectSink, which is still @Experimental and
internally uses accumulators. So I guess it depends on 1) how stable we
think CollectSink is now, and 2) how many users directly use accumulators
rather than CollectSink and whether their requirements can be fully covered
by CollectSink. For 2), we probably also need to involve the user@ ML.

Best,

Xintong



On Wed, Aug 23, 2023 at 11:00 PM Matthias Pohl
 wrote:

> Hi everyone,
> I was looking into serializing the ArchivedExecutionGraph for another FLIP
> and came across Accumulators [1] (don't mix that one up with the window
> accumulators of the Table/SQL API). Accumulators were introduced in Flink
> quite a while ago in Statosphere PR #340 [2].
>
> I had a brief chat with Chesnay about it who pointed out that there was an
> intention to use this for collecting metrics in the past. The Accumulator
> JavaDoc provides a hint that it was inspired by Hadoop's Counter concept
> [3] which also sounds like it is more or less equivalent to Flink's
> metrics.
>
> The Accumulator is currently accessible through the RuntimeContext
> interface which provides addAccumuator [4] and getAccumulator [5]. Usages
> for these messages appear in the following classes:
> - CollectSinkFunction [6]: Here it's used to collect the final data when
> closing the function. This feels like a misuse of the feature. Instead, the
> CollectSink could block the close call until all data was fetched from the
> client program.
> - DataSet.collect() [7]: Uses CollectHelper utilizes
> SerializedListAccumulator to collect the final data similarly to
> CollectSinkFunction
> - EmptyFieldsCountAccumulator [8] is an example program that counts empty
> fields. This could be migrated to MetricGroups
> - ChecksumHashCodeHelper [9] is used in DataSetUtils where the calling
> method is marked as deprecated for 2.0 already
> - CollectOutputFormat [10] uses SerializedListAccumulator analogously to
> DataSet.collect(). This class will be removed with the removal of the Scala
> API in 2.0.
>
> The initial investigation brings me to the conclusion that we can remove
> the Accumulator feature in favor of Metrics and proper collect
> implementations: That would also help cleaning up the
> (Archived)ExecutionGraph: IMHO, we should have a clear separation between
> Metrics (which are part of the ExecutionGraph) and processed data (which
> shouldn't be part of the ExecutionGraph).
>
> I'm curious what others think about this. Did I miss a scenario where
> Accumulators are actually needed? Or is this already part of some other 2.0
> effort [11] which I missed? I would suggest removing it could be a
> nice-to-have item for 2.0.
>
> Best,
> Matthias
>
>
>
> [1]
>
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java
> <
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java#L40
> >
> [2] https://github.com/stratosphere/stratosphere/pull/340
> [3]
>
> https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/Counters.html
> [4]
>
> https://github.com/apache/flink/blob/63ee60859cac64f2bc6cfe2c5015ceb1199cea9c/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L156
> [5]
>
> https://github.com/apache/flink/blob/63ee60859cac64f2bc6cfe2c5015ceb1199cea9c/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L165
>
> [6]
>
> https://github.com/apache/flink/blob/5ae8cb0503449b07f76d0ab621c3e81734496b26/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java#L304
> [7]
>
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-java/src/main/java/org/apache/flink/api/java/Utils.java#L145
> [8]
>
> https://github.com/apache/flink/blob/aa98c18d2ba975479fcfa4930b0139fa575d303e/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java#L156
> [9]
>
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-java/src/main/java/org/apache/flink/api/java/Utils.java#L256
> [10]
>
> https://github.com/apache/flink/blob/91d81c427aa6312841ca868d54e8ce6ea721cd60/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/sinks/CollectTableSink.scala#L70
>
> [11] https://cwiki.apache.org/confluence/display/FLINK/2.0+Release
>
> --
>
> [image: Aiven] 
>
> *Matthias Pohl*
> Opensource Software Engineer, *Aiven*
> matthias.p...@aiven.io|  +49 170 9869525
> 

[DISCUSS][2.0] Deprecating Accumulator in favor of MetricsGroup

2023-08-23 Thread Matthias Pohl
Hi everyone,
I was looking into serializing the ArchivedExecutionGraph for another FLIP
and came across Accumulators [1] (don't mix that one up with the window
accumulators of the Table/SQL API). Accumulators were introduced in Flink
quite a while ago in Statosphere PR #340 [2].

I had a brief chat with Chesnay about it who pointed out that there was an
intention to use this for collecting metrics in the past. The Accumulator
JavaDoc provides a hint that it was inspired by Hadoop's Counter concept
[3] which also sounds like it is more or less equivalent to Flink's metrics.

The Accumulator is currently accessible through the RuntimeContext
interface which provides addAccumuator [4] and getAccumulator [5]. Usages
for these messages appear in the following classes:
- CollectSinkFunction [6]: Here it's used to collect the final data when
closing the function. This feels like a misuse of the feature. Instead, the
CollectSink could block the close call until all data was fetched from the
client program.
- DataSet.collect() [7]: Uses CollectHelper utilizes
SerializedListAccumulator to collect the final data similarly to
CollectSinkFunction
- EmptyFieldsCountAccumulator [8] is an example program that counts empty
fields. This could be migrated to MetricGroups
- ChecksumHashCodeHelper [9] is used in DataSetUtils where the calling
method is marked as deprecated for 2.0 already
- CollectOutputFormat [10] uses SerializedListAccumulator analogously to
DataSet.collect(). This class will be removed with the removal of the Scala
API in 2.0.

The initial investigation brings me to the conclusion that we can remove
the Accumulator feature in favor of Metrics and proper collect
implementations: That would also help cleaning up the
(Archived)ExecutionGraph: IMHO, we should have a clear separation between
Metrics (which are part of the ExecutionGraph) and processed data (which
shouldn't be part of the ExecutionGraph).

I'm curious what others think about this. Did I miss a scenario where
Accumulators are actually needed? Or is this already part of some other 2.0
effort [11] which I missed? I would suggest removing it could be a
nice-to-have item for 2.0.

Best,
Matthias



[1]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java

[2] https://github.com/stratosphere/stratosphere/pull/340
[3]
https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/Counters.html
[4]
https://github.com/apache/flink/blob/63ee60859cac64f2bc6cfe2c5015ceb1199cea9c/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L156
[5]
https://github.com/apache/flink/blob/63ee60859cac64f2bc6cfe2c5015ceb1199cea9c/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L165

[6]
https://github.com/apache/flink/blob/5ae8cb0503449b07f76d0ab621c3e81734496b26/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java#L304
[7]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-java/src/main/java/org/apache/flink/api/java/Utils.java#L145
[8]
https://github.com/apache/flink/blob/aa98c18d2ba975479fcfa4930b0139fa575d303e/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java#L156
[9]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-java/src/main/java/org/apache/flink/api/java/Utils.java#L256
[10]
https://github.com/apache/flink/blob/91d81c427aa6312841ca868d54e8ce6ea721cd60/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/sinks/CollectTableSink.scala#L70

[11] https://cwiki.apache.org/confluence/display/FLINK/2.0+Release

-- 

[image: Aiven] 

*Matthias Pohl*
Opensource Software Engineer, *Aiven*
matthias.p...@aiven.io|  +49 170 9869525
aiven.io    |   
     
*Aiven Deutschland GmbH*
Alexanderufer 3-7, 10117 Berlin
Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
Amtsgericht Charlottenburg, HRB 209739 B