[jira] [Created] (FLINK-33191) Kafka Connector should directly depend on 3rd-party libs instead of flink-shaded repo

2023-10-04 Thread Jing Ge (Jira)
Jing Ge created FLINK-33191:
---

 Summary: Kafka Connector should directly depend on 3rd-party libs 
instead of flink-shaded repo
 Key: FLINK-33191
 URL: https://issues.apache.org/jira/browse/FLINK-33191
 Project: Flink
  Issue Type: Sub-task
Reporter: Jing Ge






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33190) Externalized Connectors should directly depend on 3rd-party libs instead of shaded repo

2023-10-04 Thread Jing Ge (Jira)
Jing Ge created FLINK-33190:
---

 Summary: Externalized Connectors should directly depend on 
3rd-party libs instead of shaded repo 
 Key: FLINK-33190
 URL: https://issues.apache.org/jira/browse/FLINK-33190
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Affects Versions: 1.18.0
Reporter: Jing Ge


Connectors shouldn't depend on flink-shaded.
The overhead and/or risks of doing/supporting that right now far
outweigh the benefits.
( Because we either have to encode the full version for all dependencies
into the package, or accept the risk of minor/patch dependency clashes)
Connectors are small enough in scope that depending directly on
guava/jackson/etc. is a fine approach, and they have plenty of other
dependencies that they need to manage anyway; let's treat these the same
way.

 

https://lists.apache.org/thread/mtypmprz2b5p20gj064d0wsz3k0ofpco



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [Discuss] FLIP-362: Support minimum resource limitation

2023-10-04 Thread xiangyu feng
Hi David,

Glad to hear you back!

> Agreed; in my mind, this boils down to the ability to quickly allocate new
slots (TMs). This might differ between environments though.

Yes, for interactive queries cold-start is a very tricky issue to dealing
with,
we should consider not only about allocating new resources ASAP but also
warming up the newly added TaskManagers.
Internally, we have done lots of work to address this problem.
Minimum resource limitation is the first step, we will
keep working on this.

Appreciate your feedback again.

Regards,
Xiangyu


David Morávek  于2023年10月4日周三 22:58写道:

> > If not, what is the difference between the spare resources and redundant
> taskmanagers?
>
> I wasn't aware of this one; good catch! The main difference is that you
> don't express the spare resources in terms of slots but in terms of task
> managers. Also, those options serve slightly different purpose, and users
> configuring slot manager might not look for another option somewhere else.
>
> > Secondly, IMHO the difference between min-reserved resource and spare
> resources is that we could configure a rather large min-reserved resource
>
> Agreed; in my mind, this boils down to the ability to quickly allocate new
> slots (TMs). This might differ between environments though. In most cases,
> there should be some time between interactive queries unless they're
> submitted programmatically. I can see the value of having both (min + slots
> to keep around).
>
> All in all, I don't have a strong opinion here, it's a significant
> improvement either way. This was just the first thing that I thought about
> after reading the flip.
>
> Best,
> D.
>
> On Tue, Oct 3, 2023 at 2:10 PM xiangyu feng  wrote:
>
> > Hi David,
> >
> > Thx for your feedback.
> >
> > First of all, for keeping some spare resources around, do you mean
> > 'Redundant TaskManagers'[1]? If not, what is the difference between the
> > spare resources and redundant taskmanagers?
> >
> > Secondly, IMHO the difference between min-reserved resource and spare
> > resources is that we could configure a rather large min-reserved resource
> > for user cases submitting lots of short-lived jobs concurrently, but we
> > don't want to configure a large spare resource since this might double
> the
> > total resource usage and lead to resource waste.
> >
> > Looking forward to hearing from you.
> >
> > Regards,
> > Xiangyu
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-18625
> >
> > David Morávek  于2023年10月3日周二 05:00写道:
> >
> > > H Xiangyui,
> > >
> > > The sentiment of the FLIP makes sense, but I keep wondering whether
> this
> > > is the best way to think about the problem. I assume that "interactive
> > > session cluster" users always want to keep some spare resources around
> > (up
> > > to a configured threshold) to reduce cold start instead of statically
> > > configuring the minimum.
> > >
> > > It's just a tiny change from the original proposal, but it could make
> all
> > > the difference (eliminate overprovisioning, maintain latencies with a
> > > growing # of jobs, ..)
> > >
> > > WDYT?
> > >
> > > Best,
> > > D.
> > >
> > > On Mon, Sep 25, 2023 at 5:11 PM Jing Ge 
> > > wrote:
> > >
> > >> Hi Yangze,
> > >>
> > >> Thanks for the clarification. The example of two batch jobs team up
> with
> > >> one streaming job is interesting.
> > >>
> > >> Best regards,
> > >> Jing
> > >>
> > >> On Wed, Sep 20, 2023 at 7:19 PM Yangze Guo 
> wrote:
> > >>
> > >> > Thanks for the comments, Jing.
> > >> >
> > >> > > Will the minimum resource configuration also take effect for
> > streaming
> > >> > jobs in application mode?
> > >> > > Since it is not recommended to configure
> > >> slotmanager.number-of-slots.max
> > >> > for streaming jobs, does it make sense to disable it for common
> > >> streaming
> > >> > jobs? At least disable the check for avoiding the oscillation?
> > >> >
> > >> > Yes. The minimum resource configuration will only disabled in
> > >> > standalone cluster atm. I agree it make sense to disable it for a
> pure
> > >> > streaming job, however:
> > >> > - By default, the minimum resource is configured to 0. If users do
> not
> > >> > proactively set it, either the oscillation check or the minimum
> > >> > restriction can be considered as disabled.
> > >> > - The minimum resource is a cluster-level configuration rather than
> a
> > >> > job-level configuration. If a user has an application with two batch
> > >> > jobs preceding the streaming job, they may also require this
> > >> > configuration to accelerate the execution of batch jobs.
> > >> >
> > >> > WDYT?
> > >> >
> > >> > Best,
> > >> > Yangze Guo
> > >> >
> > >> > On Thu, Sep 21, 2023 at 4:49 AM Jing Ge  >
> > >> > wrote:
> > >> > >
> > >> > > Hi Xiangyu,
> > >> > >
> > >> > > Thanks for driving it! There is one thing I am not really sure if
> I
> > >> > > understand you correctly.
> > >> > >
> > >> > > According to the FLIP: "The minimum resource limitation will be
> > >> > implemented
> 

Re: [DISCUSS] [FLINK-32873] Add a config to allow disabling Query hints

2023-10-04 Thread Jing Ge
Hi Dawid,

Thanks for the clarification. If you could go through the discussion, you
would be aware that the focus has been moved from "disable" to "ignore".
There was an alignment only on "ignore hints". Your suggestion bypassed the
alignment and mixed everything together. That confused me a bit. On one
hand, there is no clear reason why we should disable(throwing exception) it
globally, and on the other hand, some functionalities, e.g. lookup join
pointed out by Ron, are depending on it. Would you like to elaborate the
must-have requirement for the "disabled" scenario? Thanks!

Best regards,
Jing

On Thu, Oct 5, 2023 at 12:23 AM Sergey Nuyanzin  wrote:

> Hi Dawid,
>
> Thanks for bringing this.
> I would agree with enum approach
> ignored option would allow to follow Oracle's behavior as well
>
> >table.optimizer.query-options = ENABLED/DISABLED/IGNORED
>
> nit: Can we have "hint" in config option name
> e.g. table.optimizer.query-options-hints ?
>
>
> On Tue, Oct 3, 2023 at 5:58 PM Dawid Wysakowicz 
> wrote:
>
> > Hey all,
> > My understanding was that from the first message we were discussing
> > throwing an exception. Oracle was only shown as an example of a system
> > that have a flag for hints behaviour.
> >
> > Let's get back to the discussion and agree on the behavior. My
> > suggestion is to introduce an enum instead of a boolean flag since
> > apparently there are different requirements. My take is that it is worth
> > to have an option to throw an exception if hints are disabled and are
> > provided in the SQL query. This is the same behavior as disabling
> > OPTIONS hint we already have[1]
> >
> > Since you both @Jing and @Sergey would rather like to have an option to
> > ignore them we can introduce
> >
> > table.optimizer.query-options = ENABLED/DISABLED/IGNORED
> >
> > ENABLED: hints just work
> >
> > DISABLED: throw an exception
> >
> > IGNORED: ignore hints
> >
> > Are you two fine with that option @Jing @Sergey?
> >
> > Since this thread had a few misunderstandings already, I'd suggest to
> > convert it to a FLIP and follow with a VOTE shortly. @Bonnie would you
> > like to help with that?
> >
> > Best,
> >
> > Dawid
> >
> > [1]
> >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-dynamic-table-options-enabled
> >
> > On 02/10/2023 18:18, Jing Ge wrote:
> > > Hi,
> > >
> > > I have the same concern as Sergey and would like to make sure the
> purpose
> > > of this discussion is to globally ignore hints without changing any
> other
> > > behaviours, if I am not mistaken. Thanks!
> > >
> > > Best regards,
> > > Jing
> > >
> > > On Mon, Oct 2, 2023 at 3:40 PM Sergey Nuyanzin 
> > wrote:
> > >
> > >> Hi Bonnie,
> > >>
> > >> I think changing it to something like .enabled
> > >> could lead to misunderstanding
> > >> for instance when we set this flag to false what should it mean?
> > >> 1. Just ignore hints and execute a query like the same query however
> > with
> > >> removed hints
> > >> 2. Fail on validation because hints are disabled
> > >> 3. something else
> > >>
> > >> I tend to think that we are talking about just ignoring hints, so
> > option 1
> > >> (and Oracle follows option 1 as well as mentioned at [1]).
> > >> So I would suggest to keep ignore in property name to make it more
> clear
> > >>
> > >> Please let me know if I miss anything
> > >>
> > >> [1]
> > >>
> > >>
> >
> https://docs.oracle.com/en/database/oracle/oracle-database/23/refrn/OPTIMIZER_IGNORE_HINTS.html#GUID-D62CA6D8-D0D8-4A20-93EA-EEB4B3144347
> > >>
> > >> On Fri, Sep 29, 2023 at 7:20 PM Bonnie Arogyam Varghese
> > >>  wrote:
> > >>
> > >>> Hi Jark,
> > >>>   A minor suggestion. Would it be ok if we changed the config name
> to `
> > >>> table.optimizer.query-options.enabled`?
> > >>> This is inline with other existing configurations such as:
> > >>>
> > >>> table.dynamic-table-options.enabled
> > >>> table.optimizer.distinct-agg.split.enabled
> > >>> table.optimizer.dynamic-filtering.enabled
> > >>>
> > >>>
> > >>> On Wed, Sep 27, 2023 at 9:57 AM Bonnie Arogyam Varghese <
> > >>> bvargh...@confluent.io> wrote:
> > >>>
> >  Hi Martjin,
> >  Yes, the suggestion for the configuration name made by Jark sounds
> > >> good.
> >  Also, thanks to everyone who participated in this discussion.
> > 
> >  On Tue, Sep 19, 2023 at 2:40 PM Martijn Visser <
> > >> martijnvis...@apache.org
> >  wrote:
> > 
> > > Hey Jark,
> > >
> > > Sounds fine to me.
> > > @Bonnie does that also work for you?
> > >
> > > Best regards,
> > >
> > > Martijn
> > >
> > > On Fri, Sep 15, 2023 at 1:28 PM Jark Wu  wrote:
> > >> Hi Martijn,
> > >>
> > >> Thanks for the investigation. I found the blog[1] shows a use case
> > >> that they use "OPTIMIZER_IGNORE_HINTS" to check if embedded
> > >> hints can be removed in legacy code. I think this is a useful tool
> > >> to
> > >> improve queries without complex hints strewn 

[jira] [Created] (FLINK-33189) FsCompletedCheckpointStorageLocation#disposeStorageLocation non-recursively deletes a directory

2023-10-04 Thread Vlado Vojdanovski (Jira)
Vlado Vojdanovski created FLINK-33189:
-

 Summary: 
FsCompletedCheckpointStorageLocation#disposeStorageLocation non-recursively 
deletes a directory
 Key: FLINK-33189
 URL: https://issues.apache.org/jira/browse/FLINK-33189
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Reporter: Vlado Vojdanovski


FsCompletedCheckpointStorageLocation attempts to non-recursively delete a 
directory
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCompletedCheckpointStorageLocation.java#L74]

However, per the documentation of Flink's FileSystem Interface, such attempts 
are supposed to fail. 
"   * @param recursive if path is a directory and set to true, the 
directory is
     *     deleted else throws an exception. In case of a file the recursive 
can be set to either
     *     true or false "
[https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java#L689]

I am sure there is a non-negligible chance I am missing some flink internals 
here considering the class has not been touched since 2018 but my read is the 
above is either a bug, or it would be nice to update the FileSystem#delete docs.

Thanks for taking a look :) !



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] [FLINK-32873] Add a config to allow disabling Query hints

2023-10-04 Thread Sergey Nuyanzin
Hi Dawid,

Thanks for bringing this.
I would agree with enum approach
ignored option would allow to follow Oracle's behavior as well

>table.optimizer.query-options = ENABLED/DISABLED/IGNORED

nit: Can we have "hint" in config option name
e.g. table.optimizer.query-options-hints ?


On Tue, Oct 3, 2023 at 5:58 PM Dawid Wysakowicz 
wrote:

> Hey all,
> My understanding was that from the first message we were discussing
> throwing an exception. Oracle was only shown as an example of a system
> that have a flag for hints behaviour.
>
> Let's get back to the discussion and agree on the behavior. My
> suggestion is to introduce an enum instead of a boolean flag since
> apparently there are different requirements. My take is that it is worth
> to have an option to throw an exception if hints are disabled and are
> provided in the SQL query. This is the same behavior as disabling
> OPTIONS hint we already have[1]
>
> Since you both @Jing and @Sergey would rather like to have an option to
> ignore them we can introduce
>
> table.optimizer.query-options = ENABLED/DISABLED/IGNORED
>
> ENABLED: hints just work
>
> DISABLED: throw an exception
>
> IGNORED: ignore hints
>
> Are you two fine with that option @Jing @Sergey?
>
> Since this thread had a few misunderstandings already, I'd suggest to
> convert it to a FLIP and follow with a VOTE shortly. @Bonnie would you
> like to help with that?
>
> Best,
>
> Dawid
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-dynamic-table-options-enabled
>
> On 02/10/2023 18:18, Jing Ge wrote:
> > Hi,
> >
> > I have the same concern as Sergey and would like to make sure the purpose
> > of this discussion is to globally ignore hints without changing any other
> > behaviours, if I am not mistaken. Thanks!
> >
> > Best regards,
> > Jing
> >
> > On Mon, Oct 2, 2023 at 3:40 PM Sergey Nuyanzin 
> wrote:
> >
> >> Hi Bonnie,
> >>
> >> I think changing it to something like .enabled
> >> could lead to misunderstanding
> >> for instance when we set this flag to false what should it mean?
> >> 1. Just ignore hints and execute a query like the same query however
> with
> >> removed hints
> >> 2. Fail on validation because hints are disabled
> >> 3. something else
> >>
> >> I tend to think that we are talking about just ignoring hints, so
> option 1
> >> (and Oracle follows option 1 as well as mentioned at [1]).
> >> So I would suggest to keep ignore in property name to make it more clear
> >>
> >> Please let me know if I miss anything
> >>
> >> [1]
> >>
> >>
> https://docs.oracle.com/en/database/oracle/oracle-database/23/refrn/OPTIMIZER_IGNORE_HINTS.html#GUID-D62CA6D8-D0D8-4A20-93EA-EEB4B3144347
> >>
> >> On Fri, Sep 29, 2023 at 7:20 PM Bonnie Arogyam Varghese
> >>  wrote:
> >>
> >>> Hi Jark,
> >>>   A minor suggestion. Would it be ok if we changed the config name to `
> >>> table.optimizer.query-options.enabled`?
> >>> This is inline with other existing configurations such as:
> >>>
> >>> table.dynamic-table-options.enabled
> >>> table.optimizer.distinct-agg.split.enabled
> >>> table.optimizer.dynamic-filtering.enabled
> >>>
> >>>
> >>> On Wed, Sep 27, 2023 at 9:57 AM Bonnie Arogyam Varghese <
> >>> bvargh...@confluent.io> wrote:
> >>>
>  Hi Martjin,
>  Yes, the suggestion for the configuration name made by Jark sounds
> >> good.
>  Also, thanks to everyone who participated in this discussion.
> 
>  On Tue, Sep 19, 2023 at 2:40 PM Martijn Visser <
> >> martijnvis...@apache.org
>  wrote:
> 
> > Hey Jark,
> >
> > Sounds fine to me.
> > @Bonnie does that also work for you?
> >
> > Best regards,
> >
> > Martijn
> >
> > On Fri, Sep 15, 2023 at 1:28 PM Jark Wu  wrote:
> >> Hi Martijn,
> >>
> >> Thanks for the investigation. I found the blog[1] shows a use case
> >> that they use "OPTIMIZER_IGNORE_HINTS" to check if embedded
> >> hints can be removed in legacy code. I think this is a useful tool
> >> to
> >> improve queries without complex hints strewn throughout the code.
> >> Therefore, I'm fine to support this now.
> >>
> >> Maybe we can follow Oracle to name the config
> >> "table.optimizer.ignore-query-hints=false"?
> >>
> >> Best,
> >> Jark
> >>
> >> [1]: https://dbaora.com/optimizer_ignore_hints-oracle-database-18c/
> >>
> >> On Fri, 15 Sept 2023 at 17:57, Martijn Visser <
> >>> martijnvis...@apache.org
> >> wrote:
> >>
> >>> Hi Jark,
> >>>
> >>> Oracle has/had a generic "OPTIMIZER_IGNORE_HINTS" [1]. It looks
> >> like
> > MSSQL
> >>> has configuration options to disable specific hints [2] instead
> >> of a
> >>> generic solution.
> >>>
> >>> [1]
> >>>
> >>>
> >>
> https://docs.oracle.com/en/database/oracle/oracle-database/23/refrn/OPTIMIZER_IGNORE_HINTS.html#GUID-D62CA6D8-D0D8-4A20-93EA-EEB4B3144347
> >>> [2]
> >>>
> >>>
> >>
> 

PyFlink MapState with Types.ROW() throws exception

2023-10-04 Thread Elkhan Dadashov
Hi Flinkers,

I'm trying to use MapState, where the value will be a list of  type elements.

Wanted to check if anyone else faced the same issue while trying to use
MapState in PyFlink with complex types.

Here is the code:

from pyflink.common import Time
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import (
KeyedCoProcessFunction,
KeySelector,
RuntimeContext,
)
from pyflink.datastream.state import (
MapStateDescriptor,
StateTtlConfig,
ValueStateDescriptor,
ListStateDescriptor
)
from pyflink.table import DataTypes, StreamTableEnvironment


class MyKeyedCoProcessFunction(KeyedCoProcessFunction):
def __init__(self):
self.my_map_state = None

def open(self, runtime_context: RuntimeContext):
state_ttl_config = (
StateTtlConfig.new_builder(Time.seconds(1))
.set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite)
.disable_cleanup_in_background()
.build()
)

my_map_state_descriptor = MapStateDescriptor(
"my_map_state",
Types.SQL_TIMESTAMP(),
Types.LIST(Types.ROW([
Types.STRING(),
Types.STRING(),
Types.STRING(),
Types.STRING(),
Types.STRING(),
Types.STRING(),
Types.STRING(),
Types.STRING(),
Types.SQL_TIMESTAMP(),
Types.SQL_TIMESTAMP(),
Types.SQL_TIMESTAMP(),
Types.BIG_INT()
]))
)
my_map_state_descriptor.enable_time_to_live(state_ttl_config)
self.my_map_state =
runtime_context.get_map_state(my_map_state_descriptor)

But while running this code, it fails with this exception at job startup
(at runtime_context.get_map_state(my_map_state_descriptor)), even without
trying to add anything to the state.

File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 249, in
pyflink.fn_execution.beam.beam_operations_fast.StatefulFunctionOperation
.__init__
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 132, in
pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
File
"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/operations.py",
line 127, in open
self.open_func()
File
"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/operations.py",
line 296, in open_func
process_function.open(runtime_context)
File "/tmp/ipykernel_83481/1603226134.py", line 57, in open
File
"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/runtime_context.py",
line 125, in get_map_state
map_coder = from_type_info(state_descriptor.type_info) # type: MapCoder
File
"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py",
line 812, in from_type_info
from_type_info(type_info._key_type_info),
from_type_info(type_info._value_type_info))
File
"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py",
line 809, in from_type_info
return GenericArrayCoder(from_type_info(type_info.elem_type))
File
"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py",
line 819, in from_type_info
[f for f in type_info.get_field_names()])
File "/usr/local/lib64/python3.9/site-packages/pyflink/common/typeinfo.py",
line 377, in get_field_names
j_field_names = self.get_java_type_info().getFieldNames()
File "/usr/local/lib64/python3.9/site-packages/pyflink/common/typeinfo.py",
line 391, in get_java_type_info
j_types_array = get_gateway()\
File "/usr/local/lib64/python3.9/site-packages/pyflink/java_gateway.py",
line 62, in get_gateway
_gateway = launch_gateway()
File "/usr/local/lib64/python3.9/site-packages/pyflink/java_gateway.py",
line 86, in launch_gateway
raise Exception("It's launching the PythonGatewayServer during Python UDF
execution "
Exception: It's launching the PythonGatewayServer during Python UDF
execution which is unexpected. It usually happens when the job codes are in
the top level of the Python script file and are not enclosed in a `if name
== 'main'` statement.

If I switch from Tupes.ROW to Types.TUPLE() it works without any exception.

This works:

my_map_state_descriptor = MapStateDescriptor(
"my_map_state",
Types.SQL_TIMESTAMP(),
Types.LIST(Types.TUPLE([
Types.STRING(),
Types.STRING(),
Types.STRING(),
Types.STRING(),
Types.STRING(),
Types.STRING(),
Types.STRING(),
Types.STRING(),
Types.SQL_TIMESTAMP(),
Types.SQL_TIMESTAMP(),
Types.SQL_TIMESTAMP(),
Types.BIG_INT()
]))
)

Also created Jira-FLINK-33188


Thanks.


[jira] [Created] (FLINK-33188) PyFlink MapState with Types.ROW() throws exception

2023-10-04 Thread Elkhan Dadashov (Jira)
Elkhan Dadashov created FLINK-33188:
---

 Summary: PyFlink MapState with Types.ROW() throws exception
 Key: FLINK-33188
 URL: https://issues.apache.org/jira/browse/FLINK-33188
 Project: Flink
  Issue Type: Bug
  Components: API / Python, API / Type Serialization System
Affects Versions: 1.17.1
Reporter: Elkhan Dadashov


I'm trying to use MapState, where the value will be a list of  type elements.
 
Wanted to check if anyone else faced the same issue while trying to use 
MapState in PyFlink with complex types.
 
Here is the code:
 
from pyflink.common import Time
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import (
    KeyedCoProcessFunction,
    KeySelector,
    RuntimeContext,
)
from pyflink.datastream.state import (
    MapStateDescriptor,
    StateTtlConfig,
    ValueStateDescriptor,
    ListStateDescriptor
)
from pyflink.table import DataTypes, StreamTableEnvironment


class MyKeyedCoProcessFunction(KeyedCoProcessFunction):
    def __init__(self):
        self.my_map_state = None

    def open(self, runtime_context: RuntimeContext):
        state_ttl_config = (
            StateTtlConfig.new_builder(Time.seconds(1))
            .set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite)
            .disable_cleanup_in_background()
            .build()
        )

        my_map_state_descriptor = MapStateDescriptor(
            "my_map_state",
            Types.SQL_TIMESTAMP(),
            Types.LIST(Types.ROW([
                Types.STRING(), 
                Types.STRING(), 
                Types.STRING(), 
                Types.STRING(), 
                Types.STRING(), 
                Types.STRING(), 
                Types.STRING(), 
                Types.STRING(), 
                Types.SQL_TIMESTAMP(), 
                Types.SQL_TIMESTAMP(), 
                Types.SQL_TIMESTAMP(), 
                Types.BIG_INT() 
            ]))
        )
        my_map_state_descriptor.enable_time_to_live(state_ttl_config)
        self.my_map_state = 
runtime_context.get_map_state(my_map_state_descriptor)
 
But while running this code, it fails with this exception at job startup (at 
runtime_context.get_map_state(my_map_state_descriptor)), even without trying to 
add anything to the state.
 
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 249, in 
pyflink.fn_execution.beam.beam_operations_fast.StatefulFunctionOperation.__init__
File"pyflink/fn_execution/beam/beam_operations_fast.pyx", line 132, in 
pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/operations.py",
 line 127, in open
self.open_func()
File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/operations.py",
 line 296, in open_func
process_function.open(runtime_context)
File"/tmp/ipykernel_83481/1603226134.py", line 57, in open
File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/runtime_context.py",
 line 125, in get_map_state
map_coder = from_type_info(state_descriptor.type_info) # type: MapCoder
File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py", 
line 812, in from_type_info
from_type_info(type_info._key_type_info), 
from_type_info(type_info._value_type_info))
File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py", 
line 809, in from_type_info
returnGenericArrayCoder(from_type_info(type_info.elem_type))
File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py", 
line 819, in from_type_info
[f for f in type_info.get_field_names()])
File"/usr/local/lib64/python3.9/site-packages/pyflink/common/typeinfo.py", line 
377, in get_field_names
j_field_names = self.get_java_type_info().getFieldNames()
File"/usr/local/lib64/python3.9/site-packages/pyflink/common/typeinfo.py", line 
391, in get_java_type_info
j_types_array = get_gateway()\
File"/usr/local/lib64/python3.9/site-packages/pyflink/java_gateway.py", line 
62, in get_gateway
_gateway = launch_gateway()
File"/usr/local/lib64/python3.9/site-packages/pyflink/java_gateway.py", line 
86, in launch_gateway
raise Exception("It's launching the PythonGatewayServer during Python UDF 
execution "
Exception: It's launching the PythonGatewayServer during Python UDF execution 
which is unexpected. It usually happens when the job codes are in the top level 
of the Python script file and are not enclosed in a `if name == 'main'` 
statement.If I switch from Tupes.ROW to Types.TUPLE() it works without any 
exception.
 
This works:
 
my_map_state_descriptor = MapStateDescriptor(
            "my_map_state",
            Types.SQL_TIMESTAMP(),
            Types.LIST(Types.TUPLE([
                Types.STRING(),
                Types.STRING(),
              

Re: [DISCUSS] FLIP-371: SinkV2 Committer imporvements

2023-10-04 Thread Martijn Visser
Hi all,

Peter, Marton, Gordon and I had an offline sync on SinkV2 and I'm
happy with this first FLIP on the topic. +1

Best regards,

Martijn

On Wed, Oct 4, 2023 at 5:48 PM Márton Balassi  wrote:
>
> Thanks, Peter. I agree that this is needed for Iceberg and beneficial for
> other connectors too.
>
> +1
>
> On Wed, Oct 4, 2023 at 3:56 PM Péter Váry 
> wrote:
>
> > Hi Team,
> >
> > In my previous email[1] I have described our challenges migrating the
> > existing Iceberg SinkFunction based implementation, to the new SinkV2 based
> > implementation.
> >
> > As a result of the discussion around that topic, I have created the first
> > [2] of the FLIP-s addressing the missing features there.
> >
> > The main goal of the FLIP-371 is to extend the currently existing Committer
> > API by providing an initial context on Committer creation. This context
> > will contain - among other, less interesting data -
> > the SinkCommitterMetricGroup which could be used to store the generic
> > commit related metrics, and also provide a way for the Committer to publish
> > its own metrics.
> >
> > The feature has already been requested through FLINK-25857 [3].
> >
> > Please use this thread to discuss the FLIP related questions, proposals,
> > feedback.
> >
> > Thanks,
> > Peter
> >
> > - [1] https://lists.apache.org/thread/h3kg7jcgjstpvwlhnofq093vk93ylgsn
> > - [2]
> >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-371%3A+SinkV2+Committer+imporvements
> > - [3] https://issues.apache.org/jira/browse/FLINK-25857
> >


[jira] [Created] (FLINK-33187) Don't send duplicate event to Kafka if no change

2023-10-04 Thread Clara Xiong (Jira)
Clara Xiong created FLINK-33187:
---

 Summary: Don't send duplicate event to Kafka if no change
 Key: FLINK-33187
 URL: https://issues.apache.org/jira/browse/FLINK-33187
 Project: Flink
  Issue Type: Improvement
  Components: Autoscaler
Affects Versions: 1.17.1
Reporter: Clara Xiong


Problem:
Some events are sent to Kafka repeatedly such as ScalingReport when autoscaling 
is not enable,  which consists 99% of all Kafka events in our prod env. This 
wastes Kafka resources and cause performance  downstream.

Proposal:
Suppress duplicate event within an interval defined by a new operator dynamic 
config "suppress-event.interval" in second, defaulted to 30 min.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33186) CheckpointAfterAllTasksFinishedITCase.testRestoreAfterSomeTasksFinished fails on AZP

2023-10-04 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-33186:
---

 Summary:  
CheckpointAfterAllTasksFinishedITCase.testRestoreAfterSomeTasksFinished fails 
on AZP
 Key: FLINK-33186
 URL: https://issues.apache.org/jira/browse/FLINK-33186
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.19.0
Reporter: Sergey Nuyanzin


This build 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=53509=logs=baf26b34-3c6a-54e8-f93f-cf269b32f802=8c9d126d-57d2-5a9e-a8c8-ff53f7b35cd9=8762
fails as
{noformat}
Sep 28 01:23:43 Caused by: 
org.apache.flink.runtime.checkpoint.CheckpointException: Task local checkpoint 
failure.
Sep 28 01:23:43 at 
org.apache.flink.runtime.checkpoint.PendingCheckpoint.abort(PendingCheckpoint.java:550)
Sep 28 01:23:43 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2248)
Sep 28 01:23:43 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2235)
Sep 28 01:23:43 at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$null$9(CheckpointCoordinator.java:817)
Sep 28 01:23:43 at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
Sep 28 01:23:43 at 
java.util.concurrent.FutureTask.run(FutureTask.java:266)
Sep 28 01:23:43 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
Sep 28 01:23:43 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
Sep 28 01:23:43 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
Sep 28 01:23:43 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Sep 28 01:23:43 at java.lang.Thread.run(Thread.java:748)

{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33185) HybridShuffleITCase fails with TimeoutException: Pending slot request timed out in slot pool.

2023-10-04 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-33185:
---

 Summary: HybridShuffleITCase fails with TimeoutException: Pending 
slot request timed out in slot pool.
 Key: FLINK-33185
 URL: https://issues.apache.org/jira/browse/FLINK-33185
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.19.0
Reporter: Sergey Nuyanzin


This build 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=53519=logs=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3=0c010d0c-3dec-5bf1-d408-7b18988b1b2b=8641
fails as 
{noformat}
Sep 29 05:13:54 Caused by: java.util.concurrent.CompletionException: 
java.util.concurrent.CompletionException: 
java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: 
Pending slot request with SlotRequestId{b6e57c09274f4edc50697300bc8859a8} has 
been released.
Sep 29 05:13:54 at 
org.apache.flink.runtime.scheduler.DefaultExecutionDeployer.lambda$assignResource$4(DefaultExecutionDeployer.java:226)
Sep 29 05:13:54 ... 36 more
Sep 29 05:13:54 Caused by: java.util.concurrent.CompletionException: 
java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: 
Pending slot request with SlotRequestId{b6e57c09274f4edc50697300bc8859a8} has 
been released.
Sep 29 05:13:54 at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
Sep 29 05:13:54 at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
Sep 29 05:13:54 at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
Sep 29 05:13:54 at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
Sep 29 05:13:54 ... 34 more
Sep 29 05:13:54 Caused by: org.apache.flink.util.FlinkException: 
org.apache.flink.util.FlinkException: Pending slot request with 
SlotRequestId{b6e57c09274f4edc50697300bc8859a8} has been released.
Sep 29 05:13:54 at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.releaseSlot(DeclarativeSlotPoolBridge.java:373)
Sep 29 05:13:54 ... 30 more
Sep 29 05:13:54 Caused by: java.util.concurrent.TimeoutException: 
java.util.concurrent.TimeoutException: Pending slot request timed out in slot 
pool.
Sep 29 05:13:54 ... 30 more

{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Java Record support

2023-10-04 Thread Peter Huang
+1 for the convenience of users.

On Wed, Oct 4, 2023 at 8:05 AM Matthias Pohl 
wrote:

> +1 Sounds like a good idea.
>
> On Wed, Oct 4, 2023 at 5:04 PM Gyula Fóra  wrote:
>
> > I will share my initial implementation soon, it seems to be pretty
> > straightforward.
> >
> > Biggest challenge so far is setting tests so we can still compile against
> > older versions but have tests for records . But I have working proposal
> for
> > that as well.
> >
> > Gyula
> >
> > On Wed, 4 Oct 2023 at 16:45, Chesnay Schepler 
> wrote:
> >
> > > Kryo isn't required for this; newer versions do support records but we
> > > want something like a PojoSerializer for records to be performant.
> > >
> > > The core challenges are
> > > a) detecting records during type extraction
> > > b) ensuring parameters are passed to the constructor in the right
> order.
> > >
> > >  From what I remember from my own experiments this shouldn't exactly
> > > /difficult/, but just a bit tedious to integrate into the Type
> > > extraction stack.
> > >
> > > On 04/10/2023 16:14, Őrhidi Mátyás wrote:
> > > > +1 This would be great
> > > >
> > > > On Wed, Oct 4, 2023 at 7:04 AM Gyula Fóra 
> > wrote:
> > > >
> > > > Hi All!
> > > >
> > > > Flink 1.18 contains experimental Java 17 support but it misses
> out
> > > > on Java
> > > > records which can be one of the nice benefits of actually using
> > > > newer java
> > > > versions.
> > > >
> > > > There is already a Jira to track this feature [1] but I am not
> > > > aware of any
> > > > previous efforts so far.
> > > >
> > > > Since records have pretty strong guarantees and many users would
> > > > probably
> > > > want to migrate from their POJOs, I think we should enhance the
> > > > current
> > > > Pojo TypeInfo/Serializer to accommodate for the records.
> > > >
> > > > I experimented with this locally and the changes are not huge as
> > > > we only
> > > > need to allow instantiating records through the constructor
> instead
> > > of
> > > > setters. This would mean that the serialization format is
> basically
> > > > equivalent to the same non-record pojo, giving us backward
> > > > compatibility
> > > > and all the features of the Pojo serializer for basically free.
> > > >
> > > > We should make sure to not introduce any performance regression
> in
> > > the
> > > > PojoSerializer but I am happy to open a preview PR if there is
> > > > interest.
> > > >
> > > > There were mentions of upgrading Kryo to support this but I think
> > > that
> > > > would add unnecessary complexity.
> > > >
> > > > What do you all think?
> > > >
> > > > Cheers,
> > > > Gyula
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-32380
> > > >
> > >
> >
>


[jira] [Created] (FLINK-33184) HybridShuffleITCase fails with exception in resource cleanup of task Map

2023-10-04 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-33184:
---

 Summary: HybridShuffleITCase fails with exception in resource 
cleanup of task Map
 Key: FLINK-33184
 URL: https://issues.apache.org/jira/browse/FLINK-33184
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.19.0
Reporter: Sergey Nuyanzin


This build fails 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=53548=logs=baf26b34-3c6a-54e8-f93f-cf269b32f802=8c9d126d-57d2-5a9e-a8c8-ff53f7b35cd9=8710
{noformat} 
Map (5/10)#0] ERROR org.apache.flink.runtime.taskmanager.Task   
 [] - FATAL - exception in resource cleanup of task Map (5/10)#0 
(159f887fbd200ea7cfa4aaeb1127c4ab_0a448493b4782967b150582570326227_4_0)
.
java.lang.IllegalStateException: Leaking buffers.
at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) 
~[flink-core-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManagerImpl.release(TieredStorageMemoryManagerImpl.java:236)
 ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at java.util.ArrayList.forEach(ArrayList.java:1259) ~[?:1.8.0_292]
at 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry.clearResourceFor(TieredStorageResourceRegistry.java:59)
 ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition.releaseInternal(TieredResultPartition.java:195)
 ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.partition.ResultPartition.release(ResultPartition.java:262)
 ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.partition.ResultPartitionManager.releasePartition(ResultPartitionManager.java:88)
 ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.partition.ResultPartition.fail(ResultPartition.java:284)
 ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at 
org.apache.flink.runtime.taskmanager.Task.failAllResultPartitions(Task.java:1004)
 ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at 
org.apache.flink.runtime.taskmanager.Task.releaseResources(Task.java:990) 
~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:838) 
[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
01:17:22,375 [flink-pekko.actor.default-dispatcher-5] INFO  
org.apache.flink.runtime.taskmanager.Task[] - Task Sink: 
Unnamed (3/10)#0 is already in state CANCELING
01:17:22,375 [Map (5/10)#0] ERROR 
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - FATAL - 
exception in resource cleanup of task Map (5/10)#0 
(159f887fbd200ea7cfa4aaeb1127c4ab_0a448493b4782967b150582570326227_4_0)
.
java.lang.IllegalStateException: Leaking buffers.
at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) 
~[flink-core-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManagerImpl.release(TieredStorageMemoryManagerImpl.java:236)
 ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at java.util.ArrayList.forEach(ArrayList.java:1259) ~[?:1.8.0_292]
at 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry.clearResourceFor(TieredStorageResourceRegistry.java:59)
 ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartition.releaseInternal(TieredResultPartition.java:195)
 ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.partition.ResultPartition.release(ResultPartition.java:262)
 ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.partition.ResultPartitionManager.releasePartition(ResultPartitionManager.java:88)
 ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.partition.ResultPartition.fail(ResultPartition.java:284)
 ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at 
org.apache.flink.runtime.taskmanager.Task.failAllResultPartitions(Task.java:1004)
 ~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at 
org.apache.flink.runtime.taskmanager.Task.releaseResources(Task.java:990) 
[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:838) 

Re: [ANNOUNCE] Release 1.18.0, release candidate #0

2023-10-04 Thread Jing Ge
Hi David,

First of all, we should have enough time to wait for those issues to
be resolved. Secondly, it makes less sense to block upstream release by
downstream build issues. In case, those issues might need more time, we
should move forward with the Flink release without waiting for them. WDYT?

Best regards,
Jing

On Wed, Oct 4, 2023 at 6:15 PM David Radley  wrote:

> Hi ,
> As release 1.18 removes  the kafka connector from the core Flink
> repository, I assume we will wait until the kafka connector nightly build
> issues https://issues.apache.org/jira/browse/FLINK-33104 and
> https://issues.apache.org/jira/browse/FLINK-33017 are resolved before
> releasing 1.18?
>
>  Kind regards, David.
>
>
> From: Jing Ge 
> Date: Wednesday, 27 September 2023 at 15:11
> To: dev@flink.apache.org 
> Subject: [EXTERNAL] Re: [ANNOUNCE] Release 1.18.0, release candidate #0
> Hi Folks,
>
> @Ryan FYI: CI passed and the PR has been merged. Thanks!
>
> If there are no more other concerns, I will start publishing 1.18-rc1.
>
> Best regards,
> Jing
>
> On Mon, Sep 25, 2023 at 1:40 PM Jing Ge  wrote:
>
> > Hi Ryan,
> >
> > Thanks for reaching out. It is fine to include it but we need to wait
> > until the CI passes. I am not sure how long it will take, since there
> seems
> > to be some infra issues.
> >
> > Best regards,
> > Jing
> >
> > On Mon, Sep 25, 2023 at 11:34 AM Ryan Skraba
> 
> > wrote:
> >
> >> Hello!  There's a security fix that probably should be applied to 1.18
> >> in the next RC1 : https://github.com/apache/flink/pull/23461  (bump to
> >> snappy-java).  Do you think this would be possible to include?
> >>
> >> All my best, Ryan
> >>
> >> [1]: https://issues.apache.org/jira/browse/FLINK-33149  "Bump
> >> snappy-java to 1.1.10.4"
> >>
> >>
> >>
> >> On Mon, Sep 25, 2023 at 3:54 PM Jing Ge 
> >> wrote:
> >> >
> >> > Thanks Zakelly for the update! Appreciate it!
> >> >
> >> > @Piotr Nowojski  If you do not have any other
> >> > concerns, I will move forward to create 1.18 rc1 and start voting.
> WDYT?
> >> >
> >> > Best regards,
> >> > Jing
> >> >
> >> > On Mon, Sep 25, 2023 at 2:20 AM Zakelly Lan 
> >> wrote:
> >> >
> >> > > Hi Jing and everyone,
> >> > >
> >> > > I have conducted three rounds of benchmarking with Java11, comparing
> >> > > release 1.18 (commit: deb07e99560[1]) with commit 6d62f9918ea[2].
> The
> >> > > results are attached[3]. Most of the tests show no obvious
> regression.
> >> > > However, I did observe significant change in several tests. Upon
> >> > > reviewing the historical results from the previous pipeline, I also
> >> > > discovered a substantial variance in those tests, as shown in the
> >> > > timeline pictures included in the sheet[3]. I believe this variance
> >> > > has existed for a long time and requires further investigation, and
> >> > > fully measuring the variance requires more rounds (15 or more). I
> >> > > think for now it is not a blocker for release 1.18. WDYT?
> >> > >
> >> > >
> >> > > Best,
> >> > > Zakelly
> >> > >
> >> > > [1]
> >> > >
> >>
> https://github.com/apache/flink/commit/deb07e99560b45033a629afc3f90666ad0a32feb
> >> > > [2]
> >> > >
> >>
> https://github.com/apache/flink/commit/6d62f9918ea2cbb8a10c705a25a4ff6deab60711
> >> > > [3]
> >> > >
> >>
> https://docs.google.com/spreadsheets/d/1V0-duzNTgu7H6R7kioF-TAPhlqWl7Co6Q9ikTBuaULo/edit?usp=sharing
> >> > >
> >> > > On Sun, Sep 24, 2023 at 11:29 AM ConradJam 
> >> wrote:
> >> > > >
> >> > > > +1 for testing with Java 17
> >> > > >
> >> > > > Jing Ge  于2023年9月24日周日 09:40写道:
> >> > > >
> >> > > > > +1 for testing with Java 17 too. Thanks Zakelly for your effort!
> >> > > > >
> >> > > > > Best regards,
> >> > > > > Jing
> >> > > > >
> >> > > > > On Fri, Sep 22, 2023 at 1:01 PM Zakelly Lan <
> >> zakelly@gmail.com>
> >> > > wrote:
> >> > > > >
> >> > > > > > Hi Jing,
> >> > > > > >
> >> > > > > > I agree we could wait for the result with Java 11. And it
> >> should be
> >> > > > > > available next Monday.
> >> > > > > > Additionally, I could also build a pipeline with Java 17 later
> >> since
> >> > > > > > it is supported in 1.18[1].
> >> > > > > >
> >> > > > > >
> >> > > > > > Best regards,
> >> > > > > > Zakelly
> >> > > > > >
> >> > > > > > [1]
> >> > > > > >
> >> > > > >
> >> > >
> >>
> https://github.com/apache/flink/commit/9c1318ca7fa5b2e7b11827068ad1288483aaa464#diff-8310c97396d60e96766a936ca8680f1e2971ef486cfc2bc55ec9ca5a5333c47fR53
> >> > > > > >
> >> > > > > > On Fri, Sep 22, 2023 at 5:57 PM Jing Ge
> >> 
> >> > > > > > wrote:
> >> > > > > > >
> >> > > > > > > Hi Zakelly,
> >> > > > > > >
> >> > > > > > > Thanks for your effort and the update! Since Java 8 has been
> >> > > > > > deprecated[1],
> >> > > > > > > let's wait for the result with Java 11. It should be
> available
> >> > > after
> >> > > > > the
> >> > > > > > > weekend and there should be no big surprise. WDYT?
> >> > > > > > >
> >> > > > > > > Best regards,
> >> > > > > > > Jing
> >> > > > > > >
> >> > > > > > > [1]
> >> > > > > > >

RE: [ANNOUNCE] Release 1.18.0, release candidate #0

2023-10-04 Thread David Radley
Hi ,
As release 1.18 removes  the kafka connector from the core Flink repository, I 
assume we will wait until the kafka connector nightly build issues 
https://issues.apache.org/jira/browse/FLINK-33104 and 
https://issues.apache.org/jira/browse/FLINK-33017 are resolved before releasing 
1.18?

 Kind regards, David.


From: Jing Ge 
Date: Wednesday, 27 September 2023 at 15:11
To: dev@flink.apache.org 
Subject: [EXTERNAL] Re: [ANNOUNCE] Release 1.18.0, release candidate #0
Hi Folks,

@Ryan FYI: CI passed and the PR has been merged. Thanks!

If there are no more other concerns, I will start publishing 1.18-rc1.

Best regards,
Jing

On Mon, Sep 25, 2023 at 1:40 PM Jing Ge  wrote:

> Hi Ryan,
>
> Thanks for reaching out. It is fine to include it but we need to wait
> until the CI passes. I am not sure how long it will take, since there seems
> to be some infra issues.
>
> Best regards,
> Jing
>
> On Mon, Sep 25, 2023 at 11:34 AM Ryan Skraba 
> wrote:
>
>> Hello!  There's a security fix that probably should be applied to 1.18
>> in the next RC1 : https://github.com/apache/flink/pull/23461  (bump to
>> snappy-java).  Do you think this would be possible to include?
>>
>> All my best, Ryan
>>
>> [1]: https://issues.apache.org/jira/browse/FLINK-33149  "Bump
>> snappy-java to 1.1.10.4"
>>
>>
>>
>> On Mon, Sep 25, 2023 at 3:54 PM Jing Ge 
>> wrote:
>> >
>> > Thanks Zakelly for the update! Appreciate it!
>> >
>> > @Piotr Nowojski  If you do not have any other
>> > concerns, I will move forward to create 1.18 rc1 and start voting. WDYT?
>> >
>> > Best regards,
>> > Jing
>> >
>> > On Mon, Sep 25, 2023 at 2:20 AM Zakelly Lan 
>> wrote:
>> >
>> > > Hi Jing and everyone,
>> > >
>> > > I have conducted three rounds of benchmarking with Java11, comparing
>> > > release 1.18 (commit: deb07e99560[1]) with commit 6d62f9918ea[2]. The
>> > > results are attached[3]. Most of the tests show no obvious regression.
>> > > However, I did observe significant change in several tests. Upon
>> > > reviewing the historical results from the previous pipeline, I also
>> > > discovered a substantial variance in those tests, as shown in the
>> > > timeline pictures included in the sheet[3]. I believe this variance
>> > > has existed for a long time and requires further investigation, and
>> > > fully measuring the variance requires more rounds (15 or more). I
>> > > think for now it is not a blocker for release 1.18. WDYT?
>> > >
>> > >
>> > > Best,
>> > > Zakelly
>> > >
>> > > [1]
>> > >
>> https://github.com/apache/flink/commit/deb07e99560b45033a629afc3f90666ad0a32feb
>> > > [2]
>> > >
>> https://github.com/apache/flink/commit/6d62f9918ea2cbb8a10c705a25a4ff6deab60711
>> > > [3]
>> > >
>> https://docs.google.com/spreadsheets/d/1V0-duzNTgu7H6R7kioF-TAPhlqWl7Co6Q9ikTBuaULo/edit?usp=sharing
>> > >
>> > > On Sun, Sep 24, 2023 at 11:29 AM ConradJam 
>> wrote:
>> > > >
>> > > > +1 for testing with Java 17
>> > > >
>> > > > Jing Ge  于2023年9月24日周日 09:40写道:
>> > > >
>> > > > > +1 for testing with Java 17 too. Thanks Zakelly for your effort!
>> > > > >
>> > > > > Best regards,
>> > > > > Jing
>> > > > >
>> > > > > On Fri, Sep 22, 2023 at 1:01 PM Zakelly Lan <
>> zakelly@gmail.com>
>> > > wrote:
>> > > > >
>> > > > > > Hi Jing,
>> > > > > >
>> > > > > > I agree we could wait for the result with Java 11. And it
>> should be
>> > > > > > available next Monday.
>> > > > > > Additionally, I could also build a pipeline with Java 17 later
>> since
>> > > > > > it is supported in 1.18[1].
>> > > > > >
>> > > > > >
>> > > > > > Best regards,
>> > > > > > Zakelly
>> > > > > >
>> > > > > > [1]
>> > > > > >
>> > > > >
>> > >
>> https://github.com/apache/flink/commit/9c1318ca7fa5b2e7b11827068ad1288483aaa464#diff-8310c97396d60e96766a936ca8680f1e2971ef486cfc2bc55ec9ca5a5333c47fR53
>> > > > > >
>> > > > > > On Fri, Sep 22, 2023 at 5:57 PM Jing Ge
>> 
>> > > > > > wrote:
>> > > > > > >
>> > > > > > > Hi Zakelly,
>> > > > > > >
>> > > > > > > Thanks for your effort and the update! Since Java 8 has been
>> > > > > > deprecated[1],
>> > > > > > > let's wait for the result with Java 11. It should be available
>> > > after
>> > > > > the
>> > > > > > > weekend and there should be no big surprise. WDYT?
>> > > > > > >
>> > > > > > > Best regards,
>> > > > > > > Jing
>> > > > > > >
>> > > > > > > [1]
>> > > > > > >
>> > > > > >
>> > > > >
>> > >
>> https://nightlies.apache.org/flink/flink-docs-master/release-notes/flink-1.15/#jdk-upgrade
>> > > > > > >
>> > > > > > > On Fri, Sep 22, 2023 at 11:26 AM Zakelly Lan <
>> > > zakelly@gmail.com>
>> > > > > > wrote:
>> > > > > > >
>> > > > > > > > Hi everyone,
>> > > > > > > >
>> > > > > > > > I want to provide an update on the benchmark results that I
>> have
>> > > been
>> > > > > > > > working on. After spending some time preparing the
>> environment
>> > > and
>> > > > > > > > adjusting the benchmark script, I finally got a comparison
>> > > between
>> > > > > > > > release 1.18 (commit: 2aeb99804ba[1]) and the 

Re: [DISCUSS] FLIP-371: SinkV2 Committer imporvements

2023-10-04 Thread Márton Balassi
Thanks, Peter. I agree that this is needed for Iceberg and beneficial for
other connectors too.

+1

On Wed, Oct 4, 2023 at 3:56 PM Péter Váry 
wrote:

> Hi Team,
>
> In my previous email[1] I have described our challenges migrating the
> existing Iceberg SinkFunction based implementation, to the new SinkV2 based
> implementation.
>
> As a result of the discussion around that topic, I have created the first
> [2] of the FLIP-s addressing the missing features there.
>
> The main goal of the FLIP-371 is to extend the currently existing Committer
> API by providing an initial context on Committer creation. This context
> will contain - among other, less interesting data -
> the SinkCommitterMetricGroup which could be used to store the generic
> commit related metrics, and also provide a way for the Committer to publish
> its own metrics.
>
> The feature has already been requested through FLINK-25857 [3].
>
> Please use this thread to discuss the FLIP related questions, proposals,
> feedback.
>
> Thanks,
> Peter
>
> - [1] https://lists.apache.org/thread/h3kg7jcgjstpvwlhnofq093vk93ylgsn
> - [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-371%3A+SinkV2+Committer+imporvements
> - [3] https://issues.apache.org/jira/browse/FLINK-25857
>


Kafka Connector

2023-10-04 Thread David Radley
Hi,
I was looking at the pr backlog in the Flink repository and realise that there 
are 51 hits  on the search 
https://github.com/apache/flink/pulls?q=is%3Apr+is%3Aopen+kafka-connector.

And 25 hits on
https://github.com/apache/flink/pulls?q=is%3Apr+is%3Aopen+kafka-connector+label%3Acomponent%3DConnectors%2FKafka

I am updating the prs on the Kafka connector component indicating that the pr 
is no longer relevant to this repo.
I suggest we close out these prs, as they cannot be merged with a pointer to 
the other repo -to port any required code changes.

Once the prs have been removed, I suggest we also remove the connector/kafka 
component from the core flink repo – so no new prs come in with it.

What do you think?
Kind regards, David.






Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU


RE: FW: RE: Close orphaned/stale PRs

2023-10-04 Thread David Radley
Hi Ryan,

I agree that good communication is key to determining what can be worked on.

In terms of metrics , we can use the gh cli to list prs and we can export 
issues from Jira. A view across them, you could join on the Flink issue (at the 
start of the pr comment and the flink issue itself – you could then see which 
prs have an assigned Jira would be expected to be reviewed. There is no 
explicit reviewer field in the Jira issue; I am not sure if we can easily get 
this info without having a custom field (which others have tried).

In terms of what prs a committer could / should review – I would think that 
component ownership helps scope the subset of prs to review / merge.

Kind regards, David.


From: Ryan Skraba 
Date: Wednesday, 4 October 2023 at 15:09
To: dev@flink.apache.org 
Subject: [EXTERNAL] Re: FW: RE: Close orphaned/stale PRs
Hey, this has been an interesting discussion -- this is something that
has been on my mind as an open source contributor and committer (I'm
not a Flink committer).

A large number of open PRs doesn't _necessarily_ mean a project is
unhealthy or has technical debt. If it's fun and easy to get your
contribution accepted and committed, even for a small fix, you're more
likely to raise another PR, and another.  I wouldn't be surprised if
there's a natural equilibrium where adding capacity to smoothly review
and manage more PRs cause more PRs to be submitted.  Everyone wins!

I don't think there's a measure for the "average PR lifetime", or
"time to first comment", but those would be more interesting things to
know and those are the worrisome ones.

As a contributor, I'm pretty willing to wait as long as necessary (and
rebase and fix merge conflicts) if there's good communication in
place. I'm pretty patient, especially if I knew that the PR would be
looked at and merged for a specific fix version (for example).  I'd
expect simple and obvious fixes with limited scope to take less time
than a more complex, far-reaching change.  I'd probably appreciate
that the boring-cyborg welcomes me on my first PR, but I'd be pretty
irritated if any PR were closed without any human interaction.

As a reviewer or committer, it's just overwhelming to see the big
GitHub list, and sometimes it feels random just "picking one near the
top" to look at.  In projects where I have the committer role, I
sometimes feel more badly about work I'm *not* doing than the work I'm
getting done! This isn't sustainable either.  A lot of people on the
project are volunteering after hours, and grooming, reviewing and
commenting PRs shouldn't be a thankless, unending job to feel bad
about.

As a contributor, one "magic" solution that I'd love to see is a
better UI that could show (for example) tentative "review dates", like
the number at a butcher shop, and proposed reviewers.

If I was committing to reviewing a PR every day, it would be great if
I could know which ones were the best "next" candidates to review: the
one waiting longest, or a new, critical fix in my domain.  As it
stands, there's next to no chance that the PRs in the middle of the
list are going to get any attention, but closing them stand to lose
valuable work or (worse) turn off a potential contributor forever.

Taking a look at some open PRs that I authored or interacted with: I
found one that should have been closed, one that was waiting for MY
attention for a merge-squash-rebase (oops), another where I made some
requested changes and it's back in review limbo.  Unfortunately, I
don't think any of these would have been brought to my attention by a
nag-bot. I don't think I'm alone; automated emails get far less
attention  with sometime not giving automated emails much attention.

OK, one more thing to think about: some underrepresented groups in
tech can find it difficult to demand attention, through constant
pinging and commenting and reminding...  So finding ways to make sure
that non-squeaky wheels also get some love is a really fair goal.

There's some pretty good ideas in this conversation, and I'm really
glad to hear it being brought up!  I'd love to hear any other
brainstorming for ideas, and get the virtual circle that David
mentioned!

All my best, Ryan







On Wed, Oct 4, 2023 at 12:03 PM David Radley  wrote:
>
> Hi,
> To add I agree with Martijn’s insights; I think we are saying similar things. 
> To progress agreed upon work, and not blanket close all stale prs,
>   Kind regards, David.
>
> From: David Radley 
> Date: Wednesday, 4 October 2023 at 10:59
> To: dev@flink.apache.org 
> Subject: [EXTERNAL] RE: Close orphaned/stale PRs
> Hi ,
> I agree Venkata this issue is bigger than closing out stale prs.
>
> We can see that issues are being raised at a rate way above the resolution 
> time. 
> 

Re: [DISCUSS] Java Record support

2023-10-04 Thread Matthias Pohl
+1 Sounds like a good idea.

On Wed, Oct 4, 2023 at 5:04 PM Gyula Fóra  wrote:

> I will share my initial implementation soon, it seems to be pretty
> straightforward.
>
> Biggest challenge so far is setting tests so we can still compile against
> older versions but have tests for records . But I have working proposal for
> that as well.
>
> Gyula
>
> On Wed, 4 Oct 2023 at 16:45, Chesnay Schepler  wrote:
>
> > Kryo isn't required for this; newer versions do support records but we
> > want something like a PojoSerializer for records to be performant.
> >
> > The core challenges are
> > a) detecting records during type extraction
> > b) ensuring parameters are passed to the constructor in the right order.
> >
> >  From what I remember from my own experiments this shouldn't exactly
> > /difficult/, but just a bit tedious to integrate into the Type
> > extraction stack.
> >
> > On 04/10/2023 16:14, Őrhidi Mátyás wrote:
> > > +1 This would be great
> > >
> > > On Wed, Oct 4, 2023 at 7:04 AM Gyula Fóra 
> wrote:
> > >
> > > Hi All!
> > >
> > > Flink 1.18 contains experimental Java 17 support but it misses out
> > > on Java
> > > records which can be one of the nice benefits of actually using
> > > newer java
> > > versions.
> > >
> > > There is already a Jira to track this feature [1] but I am not
> > > aware of any
> > > previous efforts so far.
> > >
> > > Since records have pretty strong guarantees and many users would
> > > probably
> > > want to migrate from their POJOs, I think we should enhance the
> > > current
> > > Pojo TypeInfo/Serializer to accommodate for the records.
> > >
> > > I experimented with this locally and the changes are not huge as
> > > we only
> > > need to allow instantiating records through the constructor instead
> > of
> > > setters. This would mean that the serialization format is basically
> > > equivalent to the same non-record pojo, giving us backward
> > > compatibility
> > > and all the features of the Pojo serializer for basically free.
> > >
> > > We should make sure to not introduce any performance regression in
> > the
> > > PojoSerializer but I am happy to open a preview PR if there is
> > > interest.
> > >
> > > There were mentions of upgrading Kryo to support this but I think
> > that
> > > would add unnecessary complexity.
> > >
> > > What do you all think?
> > >
> > > Cheers,
> > > Gyula
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-32380
> > >
> >
>


Re: [DISCUSS] Java Record support

2023-10-04 Thread Gyula Fóra
I will share my initial implementation soon, it seems to be pretty
straightforward.

Biggest challenge so far is setting tests so we can still compile against
older versions but have tests for records . But I have working proposal for
that as well.

Gyula

On Wed, 4 Oct 2023 at 16:45, Chesnay Schepler  wrote:

> Kryo isn't required for this; newer versions do support records but we
> want something like a PojoSerializer for records to be performant.
>
> The core challenges are
> a) detecting records during type extraction
> b) ensuring parameters are passed to the constructor in the right order.
>
>  From what I remember from my own experiments this shouldn't exactly
> /difficult/, but just a bit tedious to integrate into the Type
> extraction stack.
>
> On 04/10/2023 16:14, Őrhidi Mátyás wrote:
> > +1 This would be great
> >
> > On Wed, Oct 4, 2023 at 7:04 AM Gyula Fóra  wrote:
> >
> > Hi All!
> >
> > Flink 1.18 contains experimental Java 17 support but it misses out
> > on Java
> > records which can be one of the nice benefits of actually using
> > newer java
> > versions.
> >
> > There is already a Jira to track this feature [1] but I am not
> > aware of any
> > previous efforts so far.
> >
> > Since records have pretty strong guarantees and many users would
> > probably
> > want to migrate from their POJOs, I think we should enhance the
> > current
> > Pojo TypeInfo/Serializer to accommodate for the records.
> >
> > I experimented with this locally and the changes are not huge as
> > we only
> > need to allow instantiating records through the constructor instead
> of
> > setters. This would mean that the serialization format is basically
> > equivalent to the same non-record pojo, giving us backward
> > compatibility
> > and all the features of the Pojo serializer for basically free.
> >
> > We should make sure to not introduce any performance regression in
> the
> > PojoSerializer but I am happy to open a preview PR if there is
> > interest.
> >
> > There were mentions of upgrading Kryo to support this but I think
> that
> > would add unnecessary complexity.
> >
> > What do you all think?
> >
> > Cheers,
> > Gyula
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-32380
> >
>


Re: [Discuss] FLIP-362: Support minimum resource limitation

2023-10-04 Thread David Morávek
> If not, what is the difference between the spare resources and redundant
taskmanagers?

I wasn't aware of this one; good catch! The main difference is that you
don't express the spare resources in terms of slots but in terms of task
managers. Also, those options serve slightly different purpose, and users
configuring slot manager might not look for another option somewhere else.

> Secondly, IMHO the difference between min-reserved resource and spare
resources is that we could configure a rather large min-reserved resource

Agreed; in my mind, this boils down to the ability to quickly allocate new
slots (TMs). This might differ between environments though. In most cases,
there should be some time between interactive queries unless they're
submitted programmatically. I can see the value of having both (min + slots
to keep around).

All in all, I don't have a strong opinion here, it's a significant
improvement either way. This was just the first thing that I thought about
after reading the flip.

Best,
D.

On Tue, Oct 3, 2023 at 2:10 PM xiangyu feng  wrote:

> Hi David,
>
> Thx for your feedback.
>
> First of all, for keeping some spare resources around, do you mean
> 'Redundant TaskManagers'[1]? If not, what is the difference between the
> spare resources and redundant taskmanagers?
>
> Secondly, IMHO the difference between min-reserved resource and spare
> resources is that we could configure a rather large min-reserved resource
> for user cases submitting lots of short-lived jobs concurrently, but we
> don't want to configure a large spare resource since this might double the
> total resource usage and lead to resource waste.
>
> Looking forward to hearing from you.
>
> Regards,
> Xiangyu
>
> [1] https://issues.apache.org/jira/browse/FLINK-18625
>
> David Morávek  于2023年10月3日周二 05:00写道:
>
> > H Xiangyui,
> >
> > The sentiment of the FLIP makes sense, but I keep wondering whether this
> > is the best way to think about the problem. I assume that "interactive
> > session cluster" users always want to keep some spare resources around
> (up
> > to a configured threshold) to reduce cold start instead of statically
> > configuring the minimum.
> >
> > It's just a tiny change from the original proposal, but it could make all
> > the difference (eliminate overprovisioning, maintain latencies with a
> > growing # of jobs, ..)
> >
> > WDYT?
> >
> > Best,
> > D.
> >
> > On Mon, Sep 25, 2023 at 5:11 PM Jing Ge 
> > wrote:
> >
> >> Hi Yangze,
> >>
> >> Thanks for the clarification. The example of two batch jobs team up with
> >> one streaming job is interesting.
> >>
> >> Best regards,
> >> Jing
> >>
> >> On Wed, Sep 20, 2023 at 7:19 PM Yangze Guo  wrote:
> >>
> >> > Thanks for the comments, Jing.
> >> >
> >> > > Will the minimum resource configuration also take effect for
> streaming
> >> > jobs in application mode?
> >> > > Since it is not recommended to configure
> >> slotmanager.number-of-slots.max
> >> > for streaming jobs, does it make sense to disable it for common
> >> streaming
> >> > jobs? At least disable the check for avoiding the oscillation?
> >> >
> >> > Yes. The minimum resource configuration will only disabled in
> >> > standalone cluster atm. I agree it make sense to disable it for a pure
> >> > streaming job, however:
> >> > - By default, the minimum resource is configured to 0. If users do not
> >> > proactively set it, either the oscillation check or the minimum
> >> > restriction can be considered as disabled.
> >> > - The minimum resource is a cluster-level configuration rather than a
> >> > job-level configuration. If a user has an application with two batch
> >> > jobs preceding the streaming job, they may also require this
> >> > configuration to accelerate the execution of batch jobs.
> >> >
> >> > WDYT?
> >> >
> >> > Best,
> >> > Yangze Guo
> >> >
> >> > On Thu, Sep 21, 2023 at 4:49 AM Jing Ge 
> >> > wrote:
> >> > >
> >> > > Hi Xiangyu,
> >> > >
> >> > > Thanks for driving it! There is one thing I am not really sure if I
> >> > > understand you correctly.
> >> > >
> >> > > According to the FLIP: "The minimum resource limitation will be
> >> > implemented
> >> > > in the DefaultResourceAllocationStrategy of FineGrainedSlotManager.
> >> > >
> >> > > Each time when SlotManager needs to reconcile the cluster resources
> or
> >> > > fulfill job resource requirements, the
> >> DefaultResourceAllocationStrategy
> >> > > will check if the minimum resource requirement has been fulfilled.
> If
> >> it
> >> > is
> >> > > not, DefaultResourceAllocationStrategy will request new
> >> > PendingTaskManagers
> >> > > and FineGrainedSlotManager will allocate new worker resources
> >> > accordingly."
> >> > >
> >> > > "To avoid this oscillation, we need to check the worker number
> derived
> >> > from
> >> > > minimum and maximum resource configuration is consistent before
> >> starting
> >> > > SlotManager."
> >> > >
> >> > > Will the minimum resource configuration also take effect for
> streaming
> >> > 

Re: [DISCUSS] Java Record support

2023-10-04 Thread Chesnay Schepler
Kryo isn't required for this; newer versions do support records but we 
want something like a PojoSerializer for records to be performant.


The core challenges are
a) detecting records during type extraction
b) ensuring parameters are passed to the constructor in the right order.

From what I remember from my own experiments this shouldn't exactly 
/difficult/, but just a bit tedious to integrate into the Type 
extraction stack.


On 04/10/2023 16:14, Őrhidi Mátyás wrote:

+1 This would be great

On Wed, Oct 4, 2023 at 7:04 AM Gyula Fóra  wrote:

Hi All!

Flink 1.18 contains experimental Java 17 support but it misses out
on Java
records which can be one of the nice benefits of actually using
newer java
versions.

There is already a Jira to track this feature [1] but I am not
aware of any
previous efforts so far.

Since records have pretty strong guarantees and many users would
probably
want to migrate from their POJOs, I think we should enhance the
current
Pojo TypeInfo/Serializer to accommodate for the records.

I experimented with this locally and the changes are not huge as
we only
need to allow instantiating records through the constructor instead of
setters. This would mean that the serialization format is basically
equivalent to the same non-record pojo, giving us backward
compatibility
and all the features of the Pojo serializer for basically free.

We should make sure to not introduce any performance regression in the
PojoSerializer but I am happy to open a preview PR if there is
interest.

There were mentions of upgrading Kryo to support this but I think that
would add unnecessary complexity.

What do you all think?

Cheers,
Gyula

[1] https://issues.apache.org/jira/browse/FLINK-32380



Re: Flink and Flink shaded dependency

2023-10-04 Thread Chesnay Schepler

There is no "monolithic" flink-shaded dependency.
Connectors shouldn't depend on anything that Flink provides, but be 
self-contained as Martijn pointed out.


Connectors shouldn't depend on flink-shaded.
The overhead and/or risks of doing/supporting that right now far 
outweigh the benefits.
( Because we either have to encode the full version for all dependencies 
into the package, or accept the risk of minor/patch dependency clashes)
Connectors are small enough in scope that depending directly on 
guava/jackson/etc. is a fine approach, and they have plenty of other 
dependencies that they need to manage anyway; let's treat these the same 
way.
Naturally this is also an argument against flink-shaded-connectors; on 
top of that we already experience repo creep and managing releases is 
difficult enough as-is.


As for class-loading, there has been a long-standing goal of each 
connector being loaded in their own classloader. That still is the north 
star and the only reasonable way to ensure that multiple connectors can 
be safely used with SQL.


On 02/10/2023 18:32, Jing Ge wrote:

Hi Sergey,

Thanks for sharing your thoughts. It could somehow help but didn't get to
the root of this issue.

According to the documentation, Flink shaded is used to provide a single
instance of a shaded dependency across sub-modules in Flink repo. Shaded
namespaces should be used where shaded dependencies are configured. After
connectors have been externalized, it ends up with more repos depending on
one shaded jar, e.g. guava. This is a "monolithic" dependency setup that
makes it difficult to change the root(flink-shade), because any changes of
the root have to be propagated to all downstream repos. Even worse is that
not every downstream repo is known while modifying the root.

Since all externalized connectors have their own repos and are not
sub-modules of Flink anymore, I would suggest the following upgrade:

1. Connectors should use their own classloader instead of Flink's
classloader. This will break the monolithic dependency. Connectors and
Flink can use different versions of flink-shaded.
2. [optional] It would be even better that all connector repos depend on
their own individual shaded repo, e.g. flink-connector-shaded. flink-shaded
should only be used by Flink.

WDYT?

Best regards,
Jing


On Thu, Sep 14, 2023 at 11:28 PM Sergey Nuyanzin 
wrote:


Yes, that's a reasonable question, thanks for raising it.

I think this is not only about flink-shaded, rather about dependencies in
general

I guess there is no rule of thumb, or at least I'm not aware of
Here are my thoughts
1. If bumping dependency doesn't require breaking changes and passes
existing tests then just bump it
2. In case there are breaking changes we could consider doing this within
next major release
for minor release
 a. try to answer a question whether it impacts Flink or not
 b. in case it impacts Flink and fix itself is relatively small then to
avoid breaking change
we could copy classes with solutions to Flink repo like it usually
happens with Calcite related fixes.
The problem of this approach is that I guess it will not work for
non jvm deps like e.g. RocksDB
 c. In case no way to do it without breaking changes for minor release
then probably need sort of announcement motivating to move to another major
version where the issue is fixed

looking forward to seeing other opinions about that

On Wed, Sep 13, 2023 at 9:47 PM Jing Ge 
wrote:


Hi Sergey,

Thanks for doing the analysis and providing the great insight. I did my

own

analysis and got the same conclusion. I just wanted to use this example

to

kick off a discussion and check if there is a common guideline or concept
in the community to handle such cases, since it seems any bump-up might
have a big impact. This time, we are kind of lucky. What if CVE related
code has been used in Flink? I do see it is an issue that upgrading a lib
in the flink-shaded repo is not recommended because of the complexity.
IMHO, we don't want to put the cart before the horse.

Best regards,
Jing

On Wed, Sep 13, 2023 at 9:11 PM Sergey Nuyanzin 
wrote:


Thanks for raising this

I would suggest trying to double check whether it actually impacts

Flink

or

not.

For instance from one side Calcite between 1.22.0..1.31.0 has a

critical

CVE-2022-39135 [1]
from another side Flink does not use this functionality and is not

impacted

by this.

Regarding guava
After closer look at Guava's high CVE you've mentioned [2]
Based on Github issue describing the problem (exists since 2016) [3]
there is a security issue with class
com.google.common.io.FileBackedOutputStream

While looking at source code for
com.google.common.io.FileBackedOutputStream usage I was not able to

find

such.
Also I was not able to find usage of
org.apache.flink.shaded.guava30.com.google.common.io

.Files#createTempDir

which was fixed within commit [4]
Also I was not able to find other Guava classes which use the 

[jira] [Created] (FLINK-33183) Enable metadata columns in NduAnalyzer with retract if non-virtual

2023-10-04 Thread Timo Walther (Jira)
Timo Walther created FLINK-33183:


 Summary: Enable metadata columns in NduAnalyzer with retract if 
non-virtual
 Key: FLINK-33183
 URL: https://issues.apache.org/jira/browse/FLINK-33183
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Timo Walther


Currently, the NduAnalyzer is very strict about metadata columns in updating 
sources. Compared to append and upsert sources (see also FLINK-33182), retract 
sources are tricky. And the analyzer is actually correct.

However, for retract sources we should expose more functionality to the user 
and add a warning to the documentation that retract mode could potentially 
cause NDU problems if not enough attention is paid. We should only throw an 
error on virtual metadata columns. Persisted metadata columns can be considered 
“safe“. When a metadata column is persisted, we can assume that an upstream 
Flink job fills its content thus likely also fills its correct retraction.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Java Record support

2023-10-04 Thread Őrhidi Mátyás
+1 This would be great

On Wed, Oct 4, 2023 at 7:04 AM Gyula Fóra  wrote:

> Hi All!
>
> Flink 1.18 contains experimental Java 17 support but it misses out on Java
> records which can be one of the nice benefits of actually using newer java
> versions.
>
> There is already a Jira to track this feature [1] but I am not aware of any
> previous efforts so far.
>
> Since records have pretty strong guarantees and many users would probably
> want to migrate from their POJOs, I think we should enhance the current
> Pojo TypeInfo/Serializer to accommodate for the records.
>
> I experimented with this locally and the changes are not huge as we only
> need to allow instantiating records through the constructor instead of
> setters. This would mean that the serialization format is basically
> equivalent to the same non-record pojo, giving us backward compatibility
> and all the features of the Pojo serializer for basically free.
>
> We should make sure to not introduce any performance regression in the
> PojoSerializer but I am happy to open a preview PR if there is interest.
>
> There were mentions of upgrading Kryo to support this but I think that
> would add unnecessary complexity.
>
> What do you all think?
>
> Cheers,
> Gyula
>
> [1]  https://issues.apache.org/jira/browse/FLINK-32380
>


[jira] [Created] (FLINK-33182) Allow metadata columns in NduAnalyzer with ChangelogNormalize

2023-10-04 Thread Timo Walther (Jira)
Timo Walther created FLINK-33182:


 Summary: Allow metadata columns in NduAnalyzer with 
ChangelogNormalize
 Key: FLINK-33182
 URL: https://issues.apache.org/jira/browse/FLINK-33182
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Timo Walther


Currently, the NduAnalyzer is very strict about metadata columns in updating 
sources. However, for upsert sources (like Kafka) that contain an incomplete 
changelog, the planner always adds a ChangelogNormalize node. 
ChangelogNormalize will make sure that metadata columns can be considered 
deterministic. So the NduAnalyzer should be satisfied in this case. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: FW: RE: Close orphaned/stale PRs

2023-10-04 Thread Ryan Skraba
Hey, this has been an interesting discussion -- this is something that
has been on my mind as an open source contributor and committer (I'm
not a Flink committer).

A large number of open PRs doesn't _necessarily_ mean a project is
unhealthy or has technical debt. If it's fun and easy to get your
contribution accepted and committed, even for a small fix, you're more
likely to raise another PR, and another.  I wouldn't be surprised if
there's a natural equilibrium where adding capacity to smoothly review
and manage more PRs cause more PRs to be submitted.  Everyone wins!

I don't think there's a measure for the "average PR lifetime", or
"time to first comment", but those would be more interesting things to
know and those are the worrisome ones.

As a contributor, I'm pretty willing to wait as long as necessary (and
rebase and fix merge conflicts) if there's good communication in
place. I'm pretty patient, especially if I knew that the PR would be
looked at and merged for a specific fix version (for example).  I'd
expect simple and obvious fixes with limited scope to take less time
than a more complex, far-reaching change.  I'd probably appreciate
that the boring-cyborg welcomes me on my first PR, but I'd be pretty
irritated if any PR were closed without any human interaction.

As a reviewer or committer, it's just overwhelming to see the big
GitHub list, and sometimes it feels random just "picking one near the
top" to look at.  In projects where I have the committer role, I
sometimes feel more badly about work I'm *not* doing than the work I'm
getting done! This isn't sustainable either.  A lot of people on the
project are volunteering after hours, and grooming, reviewing and
commenting PRs shouldn't be a thankless, unending job to feel bad
about.

As a contributor, one "magic" solution that I'd love to see is a
better UI that could show (for example) tentative "review dates", like
the number at a butcher shop, and proposed reviewers.

If I was committing to reviewing a PR every day, it would be great if
I could know which ones were the best "next" candidates to review: the
one waiting longest, or a new, critical fix in my domain.  As it
stands, there's next to no chance that the PRs in the middle of the
list are going to get any attention, but closing them stand to lose
valuable work or (worse) turn off a potential contributor forever.

Taking a look at some open PRs that I authored or interacted with: I
found one that should have been closed, one that was waiting for MY
attention for a merge-squash-rebase (oops), another where I made some
requested changes and it's back in review limbo.  Unfortunately, I
don't think any of these would have been brought to my attention by a
nag-bot. I don't think I'm alone; automated emails get far less
attention  with sometime not giving automated emails much attention.

OK, one more thing to think about: some underrepresented groups in
tech can find it difficult to demand attention, through constant
pinging and commenting and reminding...  So finding ways to make sure
that non-squeaky wheels also get some love is a really fair goal.

There's some pretty good ideas in this conversation, and I'm really
glad to hear it being brought up!  I'd love to hear any other
brainstorming for ideas, and get the virtual circle that David
mentioned!

All my best, Ryan







On Wed, Oct 4, 2023 at 12:03 PM David Radley  wrote:
>
> Hi,
> To add I agree with Martijn’s insights; I think we are saying similar things. 
> To progress agreed upon work, and not blanket close all stale prs,
>   Kind regards, David.
>
> From: David Radley 
> Date: Wednesday, 4 October 2023 at 10:59
> To: dev@flink.apache.org 
> Subject: [EXTERNAL] RE: Close orphaned/stale PRs
> Hi ,
> I agree Venkata this issue is bigger than closing out stale prs.
>
> We can see that issues are being raised at a rate way above the resolution 
> time. 
> https://issues.apache.org/jira/secure/ConfigureReport.jspa?projectOrFilterId=project-12315522=daily=90=true=major=12315522=com.atlassian.jira.jira-core-reports-plugin%3Acreatedvsresolved-report_token=A5KQ-2QAV-T4JA-FDED_19ff17decb93662bafa09e4b3ffb3a385c202015_lin=Next
> Gaining over 500 issues to the backlog every 3 months.
>
> We have over 1000 open prs. This is a lot of technical debt. I came across a 
> 6 month old pr recently that had not been merged. A second Jira issue was 
> raised  for the same problem and a second pr fixed the issue (identically). 
> The first pr was still on the backlog until we noticed it.
>
> I am looking to contribute to the community to be able to identify issues I 
> can work on and then be reasonably certain they will be reviewed and merged 
> so I can build on contributions. I have worked as a maintainer and committer 
> in other communities and managed to spend some of the week addressing 
> incoming work; I am happy to do this in some capacity with the support of 
> committer(s) for Flink.  It seems to me it is virtuous circle to enable 

[DISCUSS] Java Record support

2023-10-04 Thread Gyula Fóra
Hi All!

Flink 1.18 contains experimental Java 17 support but it misses out on Java
records which can be one of the nice benefits of actually using newer java
versions.

There is already a Jira to track this feature [1] but I am not aware of any
previous efforts so far.

Since records have pretty strong guarantees and many users would probably
want to migrate from their POJOs, I think we should enhance the current
Pojo TypeInfo/Serializer to accommodate for the records.

I experimented with this locally and the changes are not huge as we only
need to allow instantiating records through the constructor instead of
setters. This would mean that the serialization format is basically
equivalent to the same non-record pojo, giving us backward compatibility
and all the features of the Pojo serializer for basically free.

We should make sure to not introduce any performance regression in the
PojoSerializer but I am happy to open a preview PR if there is interest.

There were mentions of upgrading Kryo to support this but I think that
would add unnecessary complexity.

What do you all think?

Cheers,
Gyula

[1]  https://issues.apache.org/jira/browse/FLINK-32380


[DISCUSS] FLIP-371: SinkV2 Committer imporvements

2023-10-04 Thread Péter Váry
Hi Team,

In my previous email[1] I have described our challenges migrating the
existing Iceberg SinkFunction based implementation, to the new SinkV2 based
implementation.

As a result of the discussion around that topic, I have created the first
[2] of the FLIP-s addressing the missing features there.

The main goal of the FLIP-371 is to extend the currently existing Committer
API by providing an initial context on Committer creation. This context
will contain - among other, less interesting data -
the SinkCommitterMetricGroup which could be used to store the generic
commit related metrics, and also provide a way for the Committer to publish
its own metrics.

The feature has already been requested through FLINK-25857 [3].

Please use this thread to discuss the FLIP related questions, proposals,
feedback.

Thanks,
Peter

- [1] https://lists.apache.org/thread/h3kg7jcgjstpvwlhnofq093vk93ylgsn
- [2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-371%3A+SinkV2+Committer+imporvements
- [3] https://issues.apache.org/jira/browse/FLINK-25857


FW: RE: Close orphaned/stale PRs

2023-10-04 Thread David Radley
Hi,
To add I agree with Martijn’s insights; I think we are saying similar things. 
To progress agreed upon work, and not blanket close all stale prs,
  Kind regards, David.

From: David Radley 
Date: Wednesday, 4 October 2023 at 10:59
To: dev@flink.apache.org 
Subject: [EXTERNAL] RE: Close orphaned/stale PRs
Hi ,
I agree Venkata this issue is bigger than closing out stale prs.

We can see that issues are being raised at a rate way above the resolution 
time. 
https://issues.apache.org/jira/secure/ConfigureReport.jspa?projectOrFilterId=project-12315522=daily=90=true=major=12315522=com.atlassian.jira.jira-core-reports-plugin%3Acreatedvsresolved-report_token=A5KQ-2QAV-T4JA-FDED_19ff17decb93662bafa09e4b3ffb3a385c202015_lin=Next
Gaining over 500 issues to the backlog every 3 months.

We have over 1000 open prs. This is a lot of technical debt. I came across a 6 
month old pr recently that had not been merged. A second Jira issue was raised  
for the same problem and a second pr fixed the issue (identically). The first 
pr was still on the backlog until we noticed it.

I am looking to contribute to the community to be able to identify issues I can 
work on and then be reasonably certain they will be reviewed and merged so I 
can build on contributions. I have worked as a maintainer and committer in 
other communities and managed to spend some of the week addressing incoming 
work; I am happy to do this in some capacity with the support of committer(s) 
for Flink.  It seems to me it is virtuous circle to enable more contributions, 
to get more committers , builds those committers that can help merge and review 
the backlog.

Some thoughts ( I am new to this – so apologise if I have misunderstood 
something or am unaware of other existing mechanisms) :

  1.  If there is an issue that a committer has assigned to a contributor as 
per the process , 
and there is a pr then it should be with the committer to review the pr, or 
return it to the work queue. I do not know how many prs are like this. It seems 
to me that if a committer assigns an issue, they are indicating they will 
review, unassign themselves or merge. I do not think these prs should be closed 
as stale.
  2.  Could we have a Git action to notify committers (tagged in the pr?) if a 
pr (that has an assigned Jira)  has not been reviewed in a certain period (7 
days?) then subsequent nags if there has been no response . In this way busy 
committers can see that a pr needs looking at.
  3.  Other prs have been raised without a committer saying that they will fix 
it.  In this case there is likely to be value, but the merging and review work 
has not been taken on by anyone. I notice spelling mistake prs that have not 
been merged (there are 8 with this query 
https://github.com/apache/flink/pulls?q=is%3Apr+is%3Aopen+spelling ) , these 
are typical newbee prs as they are simple but useful improvements.; it would be 
great if these simpler ones could just be merged – maybe they should be marked 
as a [hotfix] to indicate they should be merged.  If simpler prs are not merged 
– it is very difficult for new contributors to gain eminence to get towards 
being a committer.
  4.  There are also issues that have been raised by people who do not want to 
fix them. It seems to me that we need a “triaged” state to indicate the issue 
looks valid and reasonable, so could be picked up by someone – at which time 
they would need to agree with a committer to get the associated pr reviewed and 
merged. This triaged state would be a pool of issues that new contributors to 
choose from



I am happy to help to improve – once we have consensus,



Kind regards, David.




From: Venkatakrishnan Sowrirajan 
Date: Wednesday, 4 October 2023 at 00:36
To: dev@flink.apache.org 
Subject: [EXTERNAL] Re: Close orphaned/stale PRs
Gentle ping to surface this up for more discussions.

Regards
Venkata krishnan


On Tue, Sep 26, 2023 at 4:59 PM Venkatakrishnan Sowrirajan 
wrote:

> Hi Martijn,
>
> Agree with your point that closing a PR without any review feedback even
> after 'X' days is discouraging to a new contributor. I understand that this
> is a capacity problem. Capacity problem cannot be solved by this proposal
> and it is beyond the scope of this proposal.
>
> Regarding your earlier question,
> > What's the added value of
> closing these PRs
>
>- Having lots of inactive PRs lingering around shows the project is
>less active. I am not saying this is the only way to determine how active a
>project is, but this is one of the key factors.
>- A large number of PRs open can be discouraging for (new)
>contributors but on the other hand I agree closing an inactive PR without
>any reviews can also drive contributors away.
>
> Having said all of that, I agree closing PRs that don't have any reviews
> to start with should be avoided from the final proposal.
>
> > I'm +1 for (automatically) closing up 

RE: Close orphaned/stale PRs

2023-10-04 Thread David Radley
Hi ,
I agree Venkata this issue is bigger than closing out stale prs.

We can see that issues are being raised at a rate way above the resolution 
time. 
https://issues.apache.org/jira/secure/ConfigureReport.jspa?projectOrFilterId=project-12315522=daily=90=true=major=12315522=com.atlassian.jira.jira-core-reports-plugin%3Acreatedvsresolved-report_token=A5KQ-2QAV-T4JA-FDED_19ff17decb93662bafa09e4b3ffb3a385c202015_lin=Next
Gaining over 500 issues to the backlog every 3 months.

We have over 1000 open prs. This is a lot of technical debt. I came across a 6 
month old pr recently that had not been merged. A second Jira issue was raised  
for the same problem and a second pr fixed the issue (identically). The first 
pr was still on the backlog until we noticed it.

I am looking to contribute to the community to be able to identify issues I can 
work on and then be reasonably certain they will be reviewed and merged so I 
can build on contributions. I have worked as a maintainer and committer in 
other communities and managed to spend some of the week addressing incoming 
work; I am happy to do this in some capacity with the support of committer(s) 
for Flink.  It seems to me it is virtuous circle to enable more contributions, 
to get more committers , builds those committers that can help merge and review 
the backlog.

Some thoughts ( I am new to this – so apologise if I have misunderstood 
something or am unaware of other existing mechanisms) :

  1.  If there is an issue that a committer has assigned to a contributor as 
per the process , 
and there is a pr then it should be with the committer to review the pr, or 
return it to the work queue. I do not know how many prs are like this. It seems 
to me that if a committer assigns an issue, they are indicating they will 
review, unassign themselves or merge. I do not think these prs should be closed 
as stale.
  2.  Could we have a Git action to notify committers (tagged in the pr?) if a 
pr (that has an assigned Jira)  has not been reviewed in a certain period (7 
days?) then subsequent nags if there has been no response . In this way busy 
committers can see that a pr needs looking at.
  3.  Other prs have been raised without a committer saying that they will fix 
it.  In this case there is likely to be value, but the merging and review work 
has not been taken on by anyone. I notice spelling mistake prs that have not 
been merged (there are 8 with this query 
https://github.com/apache/flink/pulls?q=is%3Apr+is%3Aopen+spelling) , these are 
typical newbee prs as they are simple but useful improvements.; it would be 
great if these simpler ones could just be merged – maybe they should be marked 
as a [hotfix] to indicate they should be merged.  If simpler prs are not merged 
– it is very difficult for new contributors to gain eminence to get towards 
being a committer.
  4.  There are also issues that have been raised by people who do not want to 
fix them. It seems to me that we need a “triaged” state to indicate the issue 
looks valid and reasonable, so could be picked up by someone – at which time 
they would need to agree with a committer to get the associated pr reviewed and 
merged. This triaged state would be a pool of issues that new contributors to 
choose from



I am happy to help to improve – once we have consensus,



Kind regards, David.




From: Venkatakrishnan Sowrirajan 
Date: Wednesday, 4 October 2023 at 00:36
To: dev@flink.apache.org 
Subject: [EXTERNAL] Re: Close orphaned/stale PRs
Gentle ping to surface this up for more discussions.

Regards
Venkata krishnan


On Tue, Sep 26, 2023 at 4:59 PM Venkatakrishnan Sowrirajan 
wrote:

> Hi Martijn,
>
> Agree with your point that closing a PR without any review feedback even
> after 'X' days is discouraging to a new contributor. I understand that this
> is a capacity problem. Capacity problem cannot be solved by this proposal
> and it is beyond the scope of this proposal.
>
> Regarding your earlier question,
> > What's the added value of
> closing these PRs
>
>- Having lots of inactive PRs lingering around shows the project is
>less active. I am not saying this is the only way to determine how active a
>project is, but this is one of the key factors.
>- A large number of PRs open can be discouraging for (new)
>contributors but on the other hand I agree closing an inactive PR without
>any reviews can also drive contributors away.
>
> Having said all of that, I agree closing PRs that don't have any reviews
> to start with should be avoided from the final proposal.
>
> > I'm +1 for (automatically) closing up PRs after X days which:
> a) Don't have a CI that has passed
> b) Don't follow the code contribution guide (like commit naming
> conventions)
> c) Have changes requested but aren't being followed-up by the contributor
>
> In general, I'm largely +1 on your above proposal except for the
> implementation 

[jira] [Created] (FLINK-33181) Table using `kinesis` connector can not be used for both read & write operations if it's defined with unsupported sink property

2023-10-04 Thread Khanh Vu (Jira)
Khanh Vu created FLINK-33181:


 Summary: Table using `kinesis` connector can not be used for both 
read & write operations if it's defined with unsupported sink property
 Key: FLINK-33181
 URL: https://issues.apache.org/jira/browse/FLINK-33181
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kinesis, Table SQL / Runtime
Affects Versions: 1.15.4
Reporter: Khanh Vu


First, I define a table which uses `kinesis` connector with an unsupported 
property for sink, e.g. `scan.stream.initpos`:

```
%flink.ssql(type=update)

-- Create input

DROP TABLE IF EXISTS `kds_input`;
CREATE TABLE `kds_input` (
  `some_string` STRING,
  `some_int` BIGINT,
  `time` AS PROCTIME()
) WITH (
  'connector' = 'kinesis',
  'stream' = 'ExampleInputStream',
  'aws.region' = 'us-east-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'csv'
);
```

I can read from my table (kds_input) without any issue, but it will throw 
exception if I try to write to the table:
```
%flink.ssql(type=update)
--  Use to generate data in the input table

DROP TABLE IF EXISTS connector_cve_datagen;
CREATE TABLE connector_cve_datagen(
  `some_string` STRING,
  `some_int` BIGINT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.some_string.length' = '2');
INSERT INTO kds_input SELECT some_string, some_int from connector_cve_datagen
```

Exception observed:
```
Caused by: org.apache.flink.table.api.ValidationException: Unsupported options 
found for 'kinesis'.

Unsupported options:

scan.stream.initpos

Supported options:

aws.region
connector
csv.allow-comments
csv.array-element-delimiter
csv.disable-quote-character
csv.escape-character
csv.field-delimiter
csv.ignore-parse-errors
csv.null-literal
csv.quote-character
format
property-version
sink.batch.max-size
sink.fail-on-error
sink.flush-buffer.size
sink.flush-buffer.timeout
sink.partitioner
sink.partitioner-field-delimiter
sink.producer.collection-max-count (deprecated)
sink.producer.collection-max-size (deprecated)
sink.producer.fail-on-error (deprecated)
sink.producer.record-max-buffered-time (deprecated)
sink.requests.max-buffered
sink.requests.max-inflight
stream
at 
org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:624)
at 
org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validate(FactoryUtil.java:914)
at 
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:978)
at 
org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validateExcept(FactoryUtil.java:938)
at 
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:978)
at 
org.apache.flink.connector.kinesis.table.KinesisDynamicTableSinkFactory.createDynamicTableSink(KinesisDynamicTableSinkFactory.java:65)
at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:259)
... 36 more
```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)