Re: [DISCUSS] FLIP-319: Integrating with Kafka’s proper support for 2PC participation (KIP-939).

2023-08-31 Thread Tzu-Li (Gordon) Tai
Thanks for taking a look at the proposals, Gyula and Alex!

@Alexander Sorokoumov  please see my responses
below. I've tagged each bullet point to match the tag of each of your
questions.


***KIP 939 specific questions:***
(1.) Your understanding is correct. The completeTransaction introduced by
KIP-939 is basically a conditional operation. KIP-939 requires this
additional variant as a means to validate that dual writes of events to
other destinations match the transactional state in Kafka. If they match,
then Kafka can roll forward the current transaction, otherwise roll back.
Note that Flink doesn't really need this variant - see point (3) below in
the FLIP-319 specific responses section.

(1.2) Consider an atomic dual write to Kafka + some other DB. For every
write, an app would write to Kafka, prepare the txn, and then write the
obtained PreparedTxnState to the DB with a transactional write. The
PreparedTxnState would mismatch across what is committed in the DB vs Kafka
if that write to the DB failed / app failed before the DB write completed.

(5.) transaction.two.phase.commit.enable is a client-side only property,
not on brokers. When set on the client, the RPC message InitProducerId
request would have a few fields set to indicate that the initiating client
instance intends this to be a 2pc transaction. These fields will only be
recognized by brokers running with versions that support KIP-939.


***FLIP 319 questions:***
(2.) That is correct. All previous txns would have been either committed or
aborted before the Flink job starts processing data

(3.) Flink doesn’t actually need to use completeTransaction at all - I
probably should touch on this in the FLIP to clarify. It **could** use it
for an extra layer of validation, but it should never fail, otherwise
there’s a fundamental bug with Flink’s checkpointing / KafkaSink is not
integrated with the checkpointing lifecycle properly. Furthermore, if we
want to use completeTransaction, we would need to store PreparedTxnState in
the checkpoints, which is a tad bit more overhead (though quite trivial).

(4.) That is not required. The new KafkaSink can completely remove the
FlinkKafkaInternalProducer and get rid of Java reflections for good. Users
upgrading to the new KafkaSink are expected to do the following:

a) first, complete an upgrade of their Kafka clusters to a minimal version
that supports KIP 939.
b) Then, take a savepoint of their existing job (using
stop-with-savepoint), that still uses an old KafkaSink version that uses
KafkaInternalProducer.
c) Restore from that savepoint with an upgraded version of the job that
uses the new version of KafkaSink, that does not use KafkaInternalProducer.

The act of taking a savepoint ---> restoring is essentially a state
migration, from the previous Committable schema of (tid, pid, epoch) to the
new schema, which is just (tid) . The “not so nice bit” is that you can
only upgrade the Flink job only when the Kafka cluster is fully migrated.
If you upgrade the Flink job early, and the new KafkaSink’s Kafka clients
hit an old version broker, the job would fail.

(6.) transaction.two.phase.commit.enable should NOT be user configurable
when a user uses the new KafkaSink - the sink always overwrites that
property before instantiating the producer Kafka client. The value of that
would depend on if EOS is desired or not - if yes, then it’s always true,
otherwise it’ll be false. So far, we've treated overwriting Kafka client
properties passed in by the user by simply logging a warning - do you think
that is sufficient or this specific case justifies throwing an exception?


Hope this helps clarifies things!

Best regards,
Gordon

On Thu, Aug 31, 2023 at 3:30 PM Alexander Sorokoumov
 wrote:

> Hi Gordon!
>
> Thank you for publishing this FLIP! I would like to ask several questions
> and confirm my understanding of several aspects of this proposal. Even
> though this discussion is focused on FLIP-319, as it is based on top of
> KIP-939, my questions will also cover the KIP.
>
>
>
>1. Why does KafkaProducer have both commitTransaction and
>completeTransaction methods? Why can't we fuse them?
>   1. Is it correct that commitTransaction and abortTransaction bring
>   the transaction to its final state (committed/aborted) while
>   completeTransaction is a recovery method that rolls a txn
> back/forward,
>   depending on the prepared state?
>   2. In what situations is PreparedTransactionState in the state store
>   different from PreparedTransactionState in Kafka?
>2. Why does the pre-commit phase initialize the producer with
>currentProducer.initTransaction(false)?
>   1. Is it because "false" means that we do not expect any prepared txn
>   to be there, and if there is one, we should either roll it
> forward or abort
>   it? In the pre-commit phase with a new producer, there shouldn't be
> any
>   dangling txns.
>3. Shouldn't we call completeTransaction on 

Re: [DISCUSS] FLIP-357: Deprecate Iteration API of DataStream

2023-08-31 Thread Dong Lin
Thanks Wencong for initiating the discussion.

+1 for the proposal.

On Fri, Sep 1, 2023 at 12:00 PM Wencong Liu  wrote:

> Hi devs,
>
> I would like to start a discussion on FLIP-357: Deprecate Iteration API of
> DataStream [1].
>
> Currently, the Iteration API of DataStream is incomplete. For instance, it
> lacks support
> for iteration in sync mode and exactly once semantics. Additionally, it
> does not offer the
> ability to set iteration termination conditions. As a result, it's hard
> for developers to
> build an iteration pipeline by DataStream in the practical applications
> such as machine learning.
>
> FLIP-176: Unified Iteration to Support Algorithms [2] has introduced a
> unified iteration library
> in the Flink ML repository. This library addresses all the issues present
> in the Iteration API of
> DataStream and could provide solution for all the iteration use-cases.
> However, maintaining two
> separate implementations of iteration in both the Flink repository and the
> Flink ML repository
> would introduce unnecessary complexity and make it difficult to maintain
> the Iteration API.
>
> As such I propose deprecating the Iteration API of DataStream and removing
> it completely in the next
> major version. In the future, if other modules in the Flink repository
> require the use of the
> Iteration API, we can consider extracting all Iteration implementations
> from the Flink ML repository
> into an independent module.
>
> Looking forward to your feedback.
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-357%3A+Deprecate+Iteration+API+of+DataStream
> [2]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=184615300
>
> Best regards,
>
> Wencong Liu


[DISCUSS] FLIP-357: Deprecate Iteration API of DataStream

2023-08-31 Thread Wencong Liu
Hi devs,

I would like to start a discussion on FLIP-357: Deprecate Iteration API of 
DataStream [1].

Currently, the Iteration API of DataStream is incomplete. For instance, it 
lacks support
for iteration in sync mode and exactly once semantics. Additionally, it does 
not offer the
ability to set iteration termination conditions. As a result, it's hard for 
developers to
build an iteration pipeline by DataStream in the practical applications such as 
machine learning.

FLIP-176: Unified Iteration to Support Algorithms [2] has introduced a unified 
iteration library
in the Flink ML repository. This library addresses all the issues present in 
the Iteration API of
DataStream and could provide solution for all the iteration use-cases. However, 
maintaining two
separate implementations of iteration in both the Flink repository and the 
Flink ML repository
would introduce unnecessary complexity and make it difficult to maintain the 
Iteration API.

As such I propose deprecating the Iteration API of DataStream and removing it 
completely in the next
major version. In the future, if other modules in the Flink repository require 
the use of the
Iteration API, we can consider extracting all Iteration implementations from 
the Flink ML repository
into an independent module.

Looking forward to your feedback.


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-357%3A+Deprecate+Iteration+API+of+DataStream
[2] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=184615300

Best regards,

Wencong Liu

[jira] [Created] (FLINK-33014) flink jobmanager raise java.io.IOException: Connection reset by peer

2023-08-31 Thread zhu (Jira)
zhu created FLINK-33014:
---

 Summary: flink jobmanager raise  java.io.IOException: Connection 
reset by peer
 Key: FLINK-33014
 URL: https://issues.apache.org/jira/browse/FLINK-33014
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.17.1
 Environment: |*blob.server.port*|6124|
|*classloader.resolve-order*|parent-first|
|*jobmanager.execution.failover-strategy*|region|
|*jobmanager.memory.heap.size*|2228014280b|
|*jobmanager.memory.jvm-metaspace.size*|536870912b|
|*jobmanager.memory.jvm-overhead.max*|322122552b|
|*jobmanager.memory.jvm-overhead.min*|322122552b|
|*jobmanager.memory.off-heap.size*|134217728b|
|*jobmanager.memory.process.size*|3gb|
|*jobmanager.rpc.address*|naf-flink-ms-flink-manager-1-59m7w|
|*jobmanager.rpc.port*|6123|
|*parallelism.default*|1|
|*query.server.port*|6125|
|*rest.address*|0.0.0.0|
|*rest.bind-address*|0.0.0.0|
|*rest.connection-timeout*|6|
|*rest.server.numThreads*|8|
|*slot.request.timeout*|300|
|*state.backend.rocksdb.localdir*|/home/nafplat/data/flinkStateStore|
|*state.backend.type*|rocksdb|
|*taskmanager.bind-host*|0.0.0.0|
|*taskmanager.host*|0.0.0.0|
|*taskmanager.memory.framework.off-heap.batch-shuffle.size*|256mb|
|*taskmanager.memory.framework.off-heap.size*|512mb|
|*taskmanager.memory.managed.fraction*|0.4|
|*taskmanager.memory.network.fraction*|0.2|
|*taskmanager.memory.process.size*|5gb|
|*taskmanager.memory.task.off-heap.size*|268435456bytes|
|*taskmanager.numberOfTaskSlots*|2|
|*taskmanager.runtime.large-record-handler*|true|
|*web.submit.enable*|true|
|*web.tmpdir*|/tmp/flink-web-c1b57e2b-5426-4fb8-a9ce-5acd1cceefc9|
|*web.upload.dir*|/opt/flink/nafJar|
Reporter: zhu


 
The Flink cluster was deployed using the Docker image of Flink 1.17.1 java8. 
After deployment, on k8s, in standalone form, jobmanager printed this error at 
intervals, and taskmanager did not print any errors,

There are currently no jobs running
{code:java}
2023-09-01 11:34:14,293 WARN  
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint   [] - Unhandled 
exceptionjava.io.IOException: Connection reset by peer    at 
sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_372]    at 
sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[?:1.8.0_372]    at 
sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[?:1.8.0_372]    at 
sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_372]    at 
sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) ~[?:1.8.0_372]    
at 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:258)
 ~[flink-dist-1.17.1.jar:1.17.1]    at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
 ~[flink-dist-1.17.1.jar:1.17.1]    at 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:357)
 ~[flink-dist-1.17.1.jar:1.17.1]    at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
 [flink-dist-1.17.1.jar:1.17.1]    at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
 [flink-dist-1.17.1.jar:1.17.1]    at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
 [flink-dist-1.17.1.jar:1.17.1]    at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
 [flink-dist-1.17.1.jar:1.17.1]    at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
 [flink-dist-1.17.1.jar:1.17.1]    at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
 [flink-dist-1.17.1.jar:1.17.1]    at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
 [flink-dist-1.17.1.jar:1.17.1]    at java.lang.Thread.run(Thread.java:750) 
[?:1.8.0_372] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33013) Shade flink-connector-base info flink-sql-connector-connector

2023-08-31 Thread Jiabao Sun (Jira)
Jiabao Sun created FLINK-33013:
--

 Summary: Shade flink-connector-base info 
flink-sql-connector-connector
 Key: FLINK-33013
 URL: https://issues.apache.org/jira/browse/FLINK-33013
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / MongoDB
Affects Versions: mongodb-1.0.2
Reporter: Jiabao Sun


Shade flink-connector-base info flink-sql-connector-connector



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[DISCUSS] FLIP-331: Support EndOfStreamWindows and isOutputOnEOF operator attribute to optimize task deployment

2023-08-31 Thread Dong Lin
Hi all,

Jinhao (cc'ed) and I are opening this thread to discuss FLIP-331: Support
EndOfStreamWindows and isOutputOnEOF operator attribute to optimize task
deployment. The design doc can be found at
https://cwiki.apache.org/confluence/display/FLINK/FLIP-331%3A+Support+EndOfStreamWindows++and+isOutputOnEOF+operator+attribute+to+optimize+task+deployment
.

This FLIP introduces isOutputOnEOF operator attribute that JobManager can
use to optimize task deployment and resource utilization. In addition, it
also adds EndOfStreamWindows that can be used with the DataStream APIs
(e.g. cogroup, aggregate) to significantly increase throughput and reduce
resource utilization.

We would greatly appreciate any comment or feedback you may have on this
proposal.

Cheers,
Dong


Re: [DISCUSS] FLIP-328: Allow source operators to determine isProcessingBacklog based on watermark lag

2023-08-31 Thread Xuannan Su
Hi Hang,

Thanks for the review.

This is a good question. It appears that this particular piece of
information is absent from the FLIP document. If I understand
correctly, the term "From the source" refers to a source that
determines the backlog status based on its state. Both the backlog
statuses are determined at the source operator, based on watermark lag
and based on source state. Only when both backlog statuses are false
does the source operator send out the signal "isBacklog=false"
downstream. Essentially, the backlog statuses are combined using a
logical OR operation. I updated the configuration description to
include such behavior.

I hope that addresses your question.

Best,
Xuannan


On Wed, Aug 30, 2023 at 4:31 PM Hang Ruan  wrote:
>
> Hi, Xuannan.
>
> Thanks for preparing the FLIP.
>
> After this FLIP, we will have two ways to report isProcessingBacklog: 1.
> From the source; 2. Judged by the watermark lag. What is the priority
> between them?
> For example, what is the status isProcessingBacklog when the source report
> `isProcessingBacklog=false` and the watermark lag exceeds the threshold?
>
> Best,
> Hang
>
> Xuannan Su  于2023年8月30日周三 10:06写道:
>
> > Hi Jing,
> >
> > Thank you for the suggestion.
> >
> > The definition of watermark lag is the same as the watermarkLag metric in
> > FLIP-33[1]. More specifically, the watermark lag calculation is computed at
> > the time when a watermark is emitted downstream in the following way:
> > watermarkLag = CurrentTime - Watermark. I have added this description to
> > the FLIP.
> >
> > I hope this addresses your concern.
> >
> > Best,
> > Xuannan
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> >
> >
> > > On Aug 28, 2023, at 01:04, Jing Ge  wrote:
> > >
> > > Hi Xuannan,
> > >
> > > Thanks for the proposal. +1 for me.
> > >
> > > There is one tiny thing that I am not sure if I understand it correctly.
> > > Since there will be many different WatermarkStrategies and different
> > > WatermarkGenerators. Could you please update the FLIP and add the
> > > description of how the watermark lag is calculated exactly? E.g.
> > Watermark
> > > lag = A - B with A is the timestamp of the watermark emitted to the
> > > downstream and B is(this is the part I am not really sure after
> > reading
> > > the FLIP).
> > >
> > > Best regards,
> > > Jing
> > >
> > >
> > > On Mon, Aug 21, 2023 at 9:03 AM Xuannan Su 
> > wrote:
> > >
> > >> Hi Jark,
> > >>
> > >> Thanks for the comments.
> > >>
> > >> I agree that the current solution cannot support jobs that cannot define
> > >> watermarks. However, after considering the pending-record-based
> > solution, I
> > >> believe the current solution is superior for the target use case as it
> > is
> > >> more intuitive for users. The backlog status gives users the ability to
> > >> balance between throughput and latency. Making this trade-off decision
> > >> based on the watermark lag is more intuitive from the user's
> > perspective.
> > >> For instance, a user can decide that if the job lags behind the current
> > >> time by more than 1 hour, the result is not usable. In that case, we can
> > >> optimize for throughput when the data lags behind by more than an hour.
> > >> With the pending-record-based solution, it's challenging for users to
> > >> determine when to optimize for throughput and when to prioritize
> > latency.
> > >>
> > >> Regarding the limitations of the watermark-based solution:
> > >>
> > >> 1. The current solution can support jobs with sources that have event
> > >> time. Users can always define a watermark at the source operator, even
> > if
> > >> it's not used by downstream operators, such as streaming join and
> > unbounded
> > >> aggregate.
> > >>
> > >> 2.I don't believe it's accurate to say that the watermark lag will keep
> > >> increasing if no data is generated in Kafka. The watermark lag and
> > backlog
> > >> status are determined at the moment when the watermark is emitted to the
> > >> downstream operator. If no data is emitted from the source, the
> > watermark
> > >> lag and backlog status will not be updated. If the WatermarkStrategy
> > with
> > >> idleness is used, the source becomes non-backlog when it becomes idle.
> > >>
> > >> 3. I think watermark lag is more intuitive to determine if a job is
> > >> processing backlog data. Even when using pending records, it faces a
> > >> similar issue. For example, if the source has 1K pending records, those
> > >> records can span from 1 day  to 1 hour to 1 second. If the records span
> > 1
> > >> day, it's probably best to optimize for throughput. If they span 1
> > hour, it
> > >> depends on the business logic. If they span 1 second, optimizing for
> > >> latency is likely the better choice.
> > >>
> > >> In summary, I believe the watermark-based solution is a superior choice
> > >> for the target use case where watermark/event time can be defined.
> > >> Additionally, I haven't 

Re: [DISCUSS] FLIP-356: Support Nested Fields Filter Pushdown

2023-08-31 Thread Venkatakrishnan Sowrirajan
Gentle ping on the vote for FLIP-356: Support Nested fields filter pushdown
.

Regards
Venkata krishnan


On Tue, Aug 29, 2023 at 9:18 PM Venkatakrishnan Sowrirajan 
wrote:

> Sure, will reference this discussion to resume where we started as part of
> the flip to refactor SupportsProjectionPushDown.
>
> On Tue, Aug 29, 2023, 7:22 PM Jark Wu  wrote:
>
>> I'm fine with this. `ReferenceExpression` and `SupportsProjectionPushDown`
>> can be another FLIP. However, could you summarize the design of this part
>> in the future part of the FLIP? This can be easier to get started with in
>> the future.
>>
>>
>> Best,
>> Jark
>>
>>
>> On Wed, 30 Aug 2023 at 02:45, Venkatakrishnan Sowrirajan <
>> vsowr...@asu.edu>
>> wrote:
>>
>> > Thanks Jark. Sounds good.
>> >
>> > One more thing, earlier in my summary I mentioned,
>> >
>> > Introduce a new *ReferenceExpression* (or *BaseReferenceExpression*)
>> > > abstract class which will be extended by both
>> *FieldReferenceExpression*
>> > >  and *NestedFieldReferenceExpression* (to be introduced as part of
>> this
>> > > FLIP)
>> >
>> > This can be punted for now and can be handled as part of refactoring
>> > SupportsProjectionPushDown.
>> >
>> > Also I made minor changes to the *NestedFieldReferenceExpression,
>> *instead
>> > of *fieldIndexArray* we can just do away with *fieldNames *array that
>> > includes fieldName at every level for the nested field.
>> >
>> > Updated the FLIP-357
>> > <
>> >
>> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP-356*3A*Support*Nested*Fields*Filter*Pushdown__;JSsrKysr!!IKRxdwAv5BmarQ!YAk6kV4CYvUSPfpoUDQRs6VlbmJXVX8KOKqFxKbNDkUWKzShvwpkLRGkAV1tgV3EqClNrjGS-Ij86Q$
>> > >
>> > wiki as well.
>> >
>> > Regards
>> > Venkata krishnan
>> >
>> >
>> > On Tue, Aug 29, 2023 at 5:21 AM Jark Wu  wrote:
>> >
>> > > Hi Venkata,
>> > >
>> > > Your summary looks good to me. +1 to start a vote.
>> > >
>> > > I think we don't need "inputIndex" in NestedFieldReferenceExpression.
>> > > Actually, I think it is also not needed in FieldReferenceExpression,
>> > > and we should try to remove it (another topic). The RexInputRef in
>> > Calcite
>> > > also doesn't require an inputIndex because the field index should
>> > represent
>> > > index of the field in the underlying row type. Field references
>> shouldn't
>> > > be
>> > >  aware of the number of inputs.
>> > >
>> > > Best,
>> > > Jark
>> > >
>> > >
>> > > On Tue, 29 Aug 2023 at 02:24, Venkatakrishnan Sowrirajan <
>> > vsowr...@asu.edu
>> > > >
>> > > wrote:
>> > >
>> > > > Hi Jinsong,
>> > > >
>> > > > Thanks for your comments.
>> > > >
>> > > > What is inputIndex in NestedFieldReferenceExpression?
>> > > >
>> > > >
>> > > > I haven't looked at it before. Do you mean, given that it is now
>> only
>> > > used
>> > > > to push filters it won't be subsequently used in further
>> > > > planning/optimization and therefore it is not required at this time?
>> > > >
>> > > > So if NestedFieldReferenceExpression doesn't need inputIndex, is
>> there
>> > > > > a need to introduce a base class `ReferenceExpression`?
>> > > >
>> > > > For SupportsFilterPushDown itself, *ReferenceExpression* base class
>> is
>> > > not
>> > > > needed. But there were discussions around cleaning up and
>> standardizing
>> > > the
>> > > > API for Supports*PushDown. SupportsProjectionPushDown currently
>> pushes
>> > > the
>> > > > projects as a 2-d array, instead it would be better to use the
>> standard
>> > > API
>> > > > which seems to be the *ResolvedExpression*. For
>> > > SupportsProjectionPushDown
>> > > > either FieldReferenceExpression (top level fields) or
>> > > > NestedFieldReferenceExpression (nested fields) is enough, in order
>> to
>> > > > provide a single API that handles both top level and nested fields,
>> > > > ReferenceExpression will be introduced as a base class.
>> > > >
>> > > > Eventually, *SupportsProjectionPushDown#applyProjections* would
>> evolve
>> > as
>> > > > applyProjection(List projectedFields) and
>> nested
>> > > > fields would be pushed only if *supportsNestedProjections* returns
>> > true.
>> > > >
>> > > > Regards
>> > > > Venkata krishnan
>> > > >
>> > > >
>> > > > On Sun, Aug 27, 2023 at 11:12 PM Jingsong Li <
>> jingsongl...@gmail.com>
>> > > > wrote:
>> > > >
>> > > > > So if NestedFieldReferenceExpression doesn't need inputIndex, is
>> > there
>> > > > > a need to introduce a base class `ReferenceExpression`?
>> > > > >
>> > > > > Best,
>> > > > > Jingsong
>> > > > >
>> > > > > On Mon, Aug 28, 2023 at 2:09 PM Jingsong Li <
>> jingsongl...@gmail.com>
>> > > > > wrote:
>> > > > > >
>> > > > > > Hi thanks all for your discussion.
>> > > > > >
>> > > > > > What is inputIndex in NestedFieldReferenceExpression?
>> > > > > >
>> > > > > > I know inputIndex has special usage in FieldReferenceExpression,
>> > but
>> > > > > > it is only for Join operators, and it is only for SQL
>> optimization.
>> > > 

Re: [DISCUSS] FLIP-319: Integrating with Kafka’s proper support for 2PC participation (KIP-939).

2023-08-31 Thread Alexander Sorokoumov
Hi Gordon!

Thank you for publishing this FLIP! I would like to ask several questions
and confirm my understanding of several aspects of this proposal. Even
though this discussion is focused on FLIP-319, as it is based on top of
KIP-939, my questions will also cover the KIP.



   1. Why does KafkaProducer have both commitTransaction and
   completeTransaction methods? Why can't we fuse them?
  1. Is it correct that commitTransaction and abortTransaction bring
  the transaction to its final state (committed/aborted) while
  completeTransaction is a recovery method that rolls a txn back/forward,
  depending on the prepared state?
  2. In what situations is PreparedTransactionState in the state store
  different from PreparedTransactionState in Kafka?
   2. Why does the pre-commit phase initialize the producer with
   currentProducer.initTransaction(false)?
  1. Is it because "false" means that we do not expect any prepared txn
  to be there, and if there is one, we should either roll it
forward or abort
  it? In the pre-commit phase with a new producer, there shouldn't be any
  dangling txns.
   3. Shouldn't we call completeTransaction on restore instead of
   commitTransaction? In what situations would the flink Kafka connector abort
   the transaction?
   4. Do we need to keep the current KafkaInternalProducer for a while to
   remain compatible with older Kafka versions that do not support KIP-939?
   5. How will the connector handle
   transaction.two.phase.commit.enable=false on the broker (not client) level?
   6. Does it make sense for the connector users to override
   transaction.two.phase.commit.enable? If it does not make sense, would the
   connector ignore the config or throw an exception when it is passed?


Best regards,
Alex

On Wed, Aug 23, 2023 at 6:09 AM Gyula Fóra  wrote:

> Hi Gordon!
>
> Thank you for preparing the detailed FLIP, I think this is a huge
> improvement that enables the exactly-once Kafka sink in many environments /
> use-cases where this was previously unfeasible due to the limitations
> highlighted in the FLIP.
>
> Big +1
>
> Cheers,
> Gyula
>
> On Fri, Aug 18, 2023 at 7:54 PM Tzu-Li (Gordon) Tai 
> wrote:
>
> > Hi Flink devs,
> >
> > I’d like to officially start a discussion for FLIP-319: Integrating with
> > Kafka’s proper support for 2PC participation (KIP-939) [1].
> >
> > This is the “sister” joint FLIP for KIP-939 [2] [3]. It has been a
> > long-standing issue that Flink’s Kafka connector doesn’t work fully
> > correctly under exactly-once mode due to lack of distributed transaction
> > support in the Kafka transaction protocol. This has led to subpar hacks
> in
> > the connector such as Java reflections to workaround the protocol's
> > limitations (which causes a bunch of problems on its own, e.g. long
> > recovery times for the connector), while still having corner case
> scenarios
> > that can lead to data loss.
> >
> > This joint effort with the Kafka community attempts to address this so
> that
> > the Flink Kafka connector can finally work against public Kafka APIs,
> which
> > should result in a much more robust integration between the two systems,
> > and for Flink developers, easier maintainability of the code.
> >
> > Obviously, actually implementing this FLIP relies on the joint KIP being
> > implemented and released first. Nevertheless, I'd like to start the
> > discussion for the design as early as possible so we can benefit from the
> > new Kafka changes as soon as it is available.
> >
> > Looking forward to feedback and comments on the proposal!
> >
> > Thanks,
> > Gordon
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710
> > [2]
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
> > [3] https://lists.apache.org/thread/wbs9sqs3z1tdm7ptw5j4o9osmx9s41nf
> >
>


Re: Azure pipelines CI view permissions

2023-08-31 Thread Jing Ge
Hi Ryan,

It seems we are facing some issues on Azure side[1]. In the meantime, as a
workaround, you can go to the "Runs" tab and find your build via filter.

Best regards,
Jing

[1]
https://developercommunity.visualstudio.com/t/Pipeline-runs-are-not-visible-for-non-ad/10455034

On Thu, Aug 31, 2023 at 2:55 PM Ryan Skraba 
wrote:

> Hey everyone,
>
> Has something changed in the configuration for the CI page?
>
> I can see the pipelines, but we can't see the list of recent / individual
> runs:
>
> https://dev.azure.com/apache-flink/apache-flink/_build
>
> That being said, we can still see the results of SUCCESS or FAILURE
> when a PR runs, so I'm wondering if this is an intentional change!
>
> All my best, Ryan
>


Re: [DISCUSS] FLIP-358: flink-avro enhancement and cleanup

2023-08-31 Thread Jing Ge
Hi Becket,

It is a very useful proposal, thanks for driving it. +1. I'd like to ask
some questions to make sure I understand your thoughts correctly:

1. "For the batch cases, currently the BulkFormat for DataStream is
missing" - true, and there is another option to leverage
StreamFormatAdapter[1]
2. "The following two interfaces should probably be marked as Public for
now and Deprecated once we deprecate the InputFormat / OutputFormat" -
would you like to share some background info of the deprecation of the
InputFormat / OutputFormat? It is for me a little bit weird to mark APIs as
public that are now known to be deprecated.
3. "Remove the PublicEvolving annotation for the following deprecated
classes. It does not make sense for an API to be PublicEvolving and
Deprecated at the same time" - this is very common in the Flink code base
to have PublicEvolving and Deprecated at the same time. APIs that do not
survive the PublicEvolving phase will be marked as deprecated in addition.
Removing PublicEvolving in this case will break Flink API graduation rule.

Best regards,
Jing



[1]
https://github.com/apache/flink/blob/1d1247d4ae6d4313f7d952c4b2d66351314c9432/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StreamFormatAdapter.java#L61

On Thu, Aug 31, 2023 at 4:16 PM Becket Qin  wrote:

> Hi Ryan, thanks for the reply.
>
> Verifying the component with the schemas you have would be super helpful.
>
> I think enum is actually a type that is generally useful. Although it is
> not a part of ANSI SQL, MySQL and some other databases have this type.
> BTW, ENUM_STRING proposed in this FLIP is actually not a type by itself.
> The ENUM_STRING is just a syntax sugar which actually creates a "new
> AtomicDataType(new VarCharType(Integer.MAX_VALUE), ENUM_CLASS)".  So, we
> are not really introducing a new type here. However, in order to make the
> VARCHAR to ENUM conversion work, the ENUM class has to be considered as a
> ConversionClass of the VARCHAR type, and a StringToEnum converter is
> required.
>
> And yes, AvroSchemaUtils should be annotated as @PublicEvolving.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Thu, Aug 31, 2023 at 5:22 PM Ryan Skraba 
> wrote:
>
> > Hey -- I have a certain knowledge of Avro, and I'd be willing to help
> > out with some of these enhancements, writing tests and reviewing.  I
> > have a *lot* of Avro schemas available for validation!
> >
> > The FLIP looks pretty good and covers the possible cases pretty
> > rigorously. I wasn't aware of some of the gaps you've pointed out
> > here!
> >
> > How useful do you think the new ENUM_STRING DataType would be outside
> > of the Avro use case?  It seems like a good enough addition that would
> > solve the problem here.
> >
> > A small note: I assume the AvroSchemaUtils is meant to be annotated
> > @PublicEvolving as well.
> >
> > All my best, Ryan
> >
> >
> > On Tue, Aug 29, 2023 at 4:35 AM Becket Qin  wrote:
> > >
> > > Hi folks,
> > >
> > > I would like to start the discussion about FLIP-158[1] which proposes
> to
> > > clean up and enhance the Avro support in Flink. More specifically, it
> > > proposes to:
> > >
> > > 1. Make it clear what are the public APIs in flink-avro components.
> > > 2. Fix a few buggy cases in flink-avro
> > > 3. Add more supported Avro use cases out of the box.
> > >
> > > Feedbacks are welcome!
> > >
> > > Thanks
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-358%3A+flink-avro+enhancement+and+cleanup
> >
>


Re: [DISCUSS] FLIP-327: Support stream-batch unified operator to improve job throughput when processing backlog data

2023-08-31 Thread Jan Lukavský

Hi,

some keywords in this triggered my attention, so sorry for late jumping 
in, but I'd like to comprehend the nature of the proposal.


I'll try to summarize my understanding:

The goal of the FLIP is to support automatic switching between streaming 
and batch processing, leveraging the fact that batch processing is more 
computationally effective. This makes perfect sense.


Looking at the streaming vs. batch semantics, switching from streaming 
to batch means the following:


 a) generally, watermarks are not propagated in batch, watermark moves 
from -inf to +inf in one step, at the end of batch input, this might 
(and probably will) skip many invocations of timers


 b) grouping by key (and window) can be done efficiently, because it 
can be done by sort-group and ideally parallelized by window (with some 
caveats)


The switch also has some conditions, namely:

 i) batch mode does not do checkpoints, inputs must be accessible 
repeatedly (forever)


 ii) due to failures in batch mode, inputs might be reprocessed and 
thus must be immutable or all sub-results computed in all branches of 
the computation (even possibly unaffected by the failure) have to be 
discarded and recomputed from scratch


Obviously, in case of the switch from batch to streaming, the property 
a) has to be modified so the watermark does not move to +inf, but to 
min(streaming watermark). Giving these properties, it should be possible 
to exchange batch and streaming processing without any cooperation with 
the application logic itself. Is my understanding correct?


If so, there is still one open question to efficiency, though. The 
streaming operator _might_ need sorting by timestamp (e.g. processing 
time-series data, or even sequential data). In that case simply 
switching streaming semantics to batch processing does not yield 
efficient processing, because the operator still needs to buffer and 
manually sort all the input data (batch data is always unordered). On 
the other hand, the batch runner already does sorting (for grouping by 
key), so adding additional sorting criterion is very cheap. In Apache 
Beam, we introduced a property of a stateful PTransform (DoFn) called 
@RequiresTimeSortedInput [1], which can then be implemented efficiently 
by batch engines.


Does the FLIP somehow work with conditions i) and ii)? I can imagine for 
instance that if data is read from say Kafka, then if backlog gets 
sufficiently large, then even the batch processing can take substantial 
time and if it fails after long processing, some of the original data 
might be already rolled out from Kafka topic.


In the FLIP there are some proposed changes to sources to emit metadata 
about if the records come from backlog. What is the driving line of 
thoughts why this is needed? In my point of view, streaming engines are 
_always_ processing backlog, the only question is "how delayed are the 
currently processed events after HEAD", or more specifically in this 
case "how many elements can we expect to process if the source would 
immediately stop receiving more data?". This should be configurable 
using simple option defining the difference between current 
processing-time (JM) and watermark of the source, or am I missing something?


Thanks for clarification and all the best,

 Jan

[1] 
https://beam.apache.org/releases/javadoc/2.50.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html


On 8/31/23 13:17, Xuannan Su wrote:

Hi all,

I would like to share some updates on FLIP-327. Dong and I have had a
series of discussions and have made several refinements to the FLIP.

The major change to the FLIP is to allow the input of the one-input
operator to be automatically sorted during backlog processing. When
combined with the state backend optimization introduced in FLIP-325 [1],
all the keyed single-input operators can achieve similar performance as in
batch mode during backlog processing without any code change to the
operator. We also implemented a POC[2] and conducted benchmark[3] using the
KeyedStream#reduce operation. The benchmark results demonstrate the
performance gains that this FLIP can offer.

I am looking forward to any comments or feedback you may have on this FLIP.

Best,
Xuannan

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-325%3A+Introduce+LRU+cache+to+accelerate+state+backend+access
[2] https://github.com/Sxnan/flink/tree/FLIP-327-demo
[3]
https://github.com/Sxnan/flink/blob/d77d0d3fb268de0a1939944ea4796a112e2d68c0/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/backlog/ReduceBacklogBenchmark.java




On Aug 18, 2023, at 21:28, Dong Lin  wrote:

Hi Piotr,

Thanks for the explanation.

To recap our offline discussion, there is a concern regarding the
capability to dynamically switch between stream and batch modes. This
concern is around unforeseen behaviors such as bugs or performance
regressions, which we might not yet be aware of yet. The reason for this

Re: [DISCUSS] FLIP-358: flink-avro enhancement and cleanup

2023-08-31 Thread Becket Qin
Hi Ryan, thanks for the reply.

Verifying the component with the schemas you have would be super helpful.

I think enum is actually a type that is generally useful. Although it is
not a part of ANSI SQL, MySQL and some other databases have this type.
BTW, ENUM_STRING proposed in this FLIP is actually not a type by itself.
The ENUM_STRING is just a syntax sugar which actually creates a "new
AtomicDataType(new VarCharType(Integer.MAX_VALUE), ENUM_CLASS)".  So, we
are not really introducing a new type here. However, in order to make the
VARCHAR to ENUM conversion work, the ENUM class has to be considered as a
ConversionClass of the VARCHAR type, and a StringToEnum converter is
required.

And yes, AvroSchemaUtils should be annotated as @PublicEvolving.

Thanks,

Jiangjie (Becket) Qin



On Thu, Aug 31, 2023 at 5:22 PM Ryan Skraba 
wrote:

> Hey -- I have a certain knowledge of Avro, and I'd be willing to help
> out with some of these enhancements, writing tests and reviewing.  I
> have a *lot* of Avro schemas available for validation!
>
> The FLIP looks pretty good and covers the possible cases pretty
> rigorously. I wasn't aware of some of the gaps you've pointed out
> here!
>
> How useful do you think the new ENUM_STRING DataType would be outside
> of the Avro use case?  It seems like a good enough addition that would
> solve the problem here.
>
> A small note: I assume the AvroSchemaUtils is meant to be annotated
> @PublicEvolving as well.
>
> All my best, Ryan
>
>
> On Tue, Aug 29, 2023 at 4:35 AM Becket Qin  wrote:
> >
> > Hi folks,
> >
> > I would like to start the discussion about FLIP-158[1] which proposes to
> > clean up and enhance the Avro support in Flink. More specifically, it
> > proposes to:
> >
> > 1. Make it clear what are the public APIs in flink-avro components.
> > 2. Fix a few buggy cases in flink-avro
> > 3. Add more supported Avro use cases out of the box.
> >
> > Feedbacks are welcome!
> >
> > Thanks
> >
> > Jiangjie (Becket) Qin
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-358%3A+flink-avro+enhancement+and+cleanup
>


Azure pipelines CI view permissions

2023-08-31 Thread Ryan Skraba
Hey everyone,

Has something changed in the configuration for the CI page?

I can see the pipelines, but we can't see the list of recent / individual runs:

https://dev.azure.com/apache-flink/apache-flink/_build

That being said, we can still see the results of SUCCESS or FAILURE
when a PR runs, so I'm wondering if this is an intentional change!

All my best, Ryan


Re: [VOTE] FLIP-348: Make expanding behavior of virtual metadata columns configurable

2023-08-31 Thread Jark Wu
+1 (binding)

Best,
Jark

> 2023年8月31日 18:54,Jing Ge  写道:
> 
> +1(binding)
> 
> On Thu, Aug 31, 2023 at 11:22 AM Sergey Nuyanzin 
> wrote:
> 
>> +1 (binding)
>> 
>> On Thu, Aug 31, 2023 at 9:28 AM Benchao Li  wrote:
>> 
>>> +1 (binding)
>>> 
>>> Martijn Visser  于2023年8月31日周四 15:24写道:
 
 +1 (binding)
 
 On Thu, Aug 31, 2023 at 9:09 AM Timo Walther 
>> wrote:
 
> Hi everyone,
> 
> I'd like to start a vote on FLIP-348: Make expanding behavior of
>>> virtual
> metadata columns configurable [1] which has been discussed in this
> thread [2].
> 
> The vote will be open for at least 72 hours unless there is an
>>> objection
> or not enough votes.
> 
> [1] https://cwiki.apache.org/confluence/x/_o6zDw
> [2] https://lists.apache.org/thread/zon967w7synby8z6m1s7dj71dhkh9ccy
> 
> Cheers,
> Timo
> 
>>> 
>>> 
>>> 
>>> --
>>> 
>>> Best,
>>> Benchao Li
>>> 
>> 
>> 
>> --
>> Best regards,
>> Sergey
>> 



Re: [DISCUSS] FLIP-327: Support stream-batch unified operator to improve job throughput when processing backlog data

2023-08-31 Thread Xuannan Su
Hi all,

I would like to share some updates on FLIP-327. Dong and I have had a
series of discussions and have made several refinements to the FLIP.

The major change to the FLIP is to allow the input of the one-input
operator to be automatically sorted during backlog processing. When
combined with the state backend optimization introduced in FLIP-325 [1],
all the keyed single-input operators can achieve similar performance as in
batch mode during backlog processing without any code change to the
operator. We also implemented a POC[2] and conducted benchmark[3] using the
KeyedStream#reduce operation. The benchmark results demonstrate the
performance gains that this FLIP can offer.

I am looking forward to any comments or feedback you may have on this FLIP.

Best,
Xuannan

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-325%3A+Introduce+LRU+cache+to+accelerate+state+backend+access
[2] https://github.com/Sxnan/flink/tree/FLIP-327-demo
[3]
https://github.com/Sxnan/flink/blob/d77d0d3fb268de0a1939944ea4796a112e2d68c0/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/backlog/ReduceBacklogBenchmark.java



> On Aug 18, 2023, at 21:28, Dong Lin  wrote:
>
> Hi Piotr,
>
> Thanks for the explanation.
>
> To recap our offline discussion, there is a concern regarding the
> capability to dynamically switch between stream and batch modes. This
> concern is around unforeseen behaviors such as bugs or performance
> regressions, which we might not yet be aware of yet. The reason for this
> concern is that this feature involves a fundamental impact on the Flink
> runtime's behavior.
>
> Due to the above concern, I agree it is reasonable to annotate related
APIs
> as experimental. This step would provide us with the flexibility to modify
> these APIs if issues arise in the future. This annotation also serves as a
> note to users that this functionality might not perform well as expected.
>
> Though I believe that we can ensure the reliability of this feature
through
> good design and code reviews, comprehensive unit tests, and thorough
> integration testing, I agree that it is reasonable to be extra cautious in
> this case. Also, it should be OK to delay making these APIs as
> non-experimental by 1-2 releases.
>
> I have updated FLIP-327, FLIP-328, and FLIP-331 to mark APIs in these docs
> as experimental. Please let me know if you think any other API should also
> be marked as experimental.
>
> Thanks!
> Dong
>
> On Wed, Aug 16, 2023 at 10:39 PM Piotr Nowojski 
> wrote:
>
>> Hi Dong,
>>
>> Operators API is unfortunately also our public facing API and I mean the
>> APIs that we will add there should also be marked `@Experimental` IMO.
>>
>> The config options should also be marked as experimental (both
>> annotated @Experimental and noted the same thing in the docs,
>> if @Experimental annotation is not automatically mentioned in the docs).
>>
>>> Alternatively, how about we add a doc for
>> checkpointing.interval-during-backlog explaining its impact/concern as
>> discussed above?
>>
>> We should do this independently from marking the APIs/config options as
>> `@Experimental`
>>
>> Best,
>> Piotrek
>>
>> pt., 11 sie 2023 o 14:55 Dong Lin  napisał(a):
>>
>>> Hi Piotr,
>>>
>>> Thanks for the reply!
>>>
>>> On Fri, Aug 11, 2023 at 4:44 PM Piotr Nowojski >>
>>> wrote:
>>>
 Hi,

 Sorry for the long delay in responding!

> Given that it is an optional feature that can be
> turned off by users, it might be OK to just let users try it out and
>> we
 can
> fix performance issues once we detect any of them. What do you think?

 I think it's fine. It would be best to mark this feature as
>> experimental,
 and
 we say that the config keys or the default values might change in the
 future.

>>>
>>> In general I agree we can mark APIs that determine "whether to enable
>>> dynamic switching between stream/batch mode" as experimental.
>>>
>>> However, I am not sure we have such an API yet. The APIs added in this
>> FLIP
>>> are intended to be used by operator developers rather than end users.
End
>>> users can enable this capability by setting
>>> execution.checkpointing.interval-during-backlog = Long.MAX and uses a
>>> source which might implicitly set backlog statu (e.g. HybridSource). So
>>> execution.checkpointing.interval-during-backlog is the only user-facing
>>> APIs that can always control whether this feature can be used.
>>>
>>> However, execution.checkpointing.interval-during-backlog itself is not
>> tied
>>> to FLIP-327.
>>>
>>> Do you mean we should set checkpointing.interval-during-backlog as
>>> experimental? Alternatively, how about we add a doc for
>>> checkpointing.interval-during-backlog explaining its impact/concern as
>>> discussed above?
>>>
>>> Best,
>>> Dong
>>>
>>>
> Maybe we can revisit the need for such a config when we
>>> introduce/discuss
> the capability to switch backlog from false to true in the 

Re: [VOTE] FLIP-348: Make expanding behavior of virtual metadata columns configurable

2023-08-31 Thread Jing Ge
+1(binding)

On Thu, Aug 31, 2023 at 11:22 AM Sergey Nuyanzin 
wrote:

> +1 (binding)
>
> On Thu, Aug 31, 2023 at 9:28 AM Benchao Li  wrote:
>
> > +1 (binding)
> >
> > Martijn Visser  于2023年8月31日周四 15:24写道:
> > >
> > > +1 (binding)
> > >
> > > On Thu, Aug 31, 2023 at 9:09 AM Timo Walther 
> wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > I'd like to start a vote on FLIP-348: Make expanding behavior of
> > virtual
> > > > metadata columns configurable [1] which has been discussed in this
> > > > thread [2].
> > > >
> > > > The vote will be open for at least 72 hours unless there is an
> > objection
> > > > or not enough votes.
> > > >
> > > > [1] https://cwiki.apache.org/confluence/x/_o6zDw
> > > > [2] https://lists.apache.org/thread/zon967w7synby8z6m1s7dj71dhkh9ccy
> > > >
> > > > Cheers,
> > > > Timo
> > > >
> >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>
>
> --
> Best regards,
> Sergey
>


[jira] [Created] (FLINK-33012) LeaderElectionTest.testHasLeadership fails on AZP

2023-08-31 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-33012:
---

 Summary: LeaderElectionTest.testHasLeadership fails on AZP
 Key: FLINK-33012
 URL: https://issues.apache.org/jira/browse/FLINK-33012
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.19.0
Reporter: Sergey Nuyanzin


This build 
[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=52871=logs=675bf62c-8558-587e-2555-dcad13acefb5=5878eed3-cc1e-5b12-1ed0-9e7139ce0992=7151]
fails as
{noformat}
Aug 31 01:04:17 Caused by: 
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException$ConnectionLossException:
 KeeperErrorCode = ConnectionLoss
Aug 31 01:04:17 at 
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
Aug 31 01:04:17 at 
org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.checkBackgroundRetry(CuratorFrameworkImpl.java:885)
Aug 31 01:04:17 at 
org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:1025)
Aug 31 01:04:17 at 
org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:986)
Aug 31 01:04:17 at 
org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:97)
Aug 31 01:04:17 at 
org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:376)
Aug 31 01:04:17 at 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
Aug 31 01:04:17 at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
Aug 31 01:04:17 at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
Aug 31 01:04:17 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
Aug 31 01:04:17 at java.base/java.lang.Thread.run(Thread.java:833)
Aug 31 01:04:17 Suppressed: 
org.apache.flink.shaded.curator5.org.apache.curator.CuratorConnectionLossException:
 KeeperErrorCode = ConnectionLoss
Aug 31 01:04:17 at 
org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:1031)
Aug 31 01:04:17 ... 8 more

{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33011) Operator deletes HA data unexpectedly

2023-08-31 Thread Ruibin Xing (Jira)
Ruibin Xing created FLINK-33011:
---

 Summary: Operator deletes HA data unexpectedly
 Key: FLINK-33011
 URL: https://issues.apache.org/jira/browse/FLINK-33011
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.6.0, 1.17.1
 Environment: Flink: 1.17.1

Flink Kubernetes Operator: 1.6.0
Reporter: Ruibin Xing
 Attachments: flink_operator_logs_0831.csv

We encountered a problem where the operator unexpectedly deleted HA data.

The timeline is as follows:

12:08 We submitted the first spec, which suspended the job with savepoint 
upgrade mode.

12:08 The job was suspended, while the HA data was preserved, and the log 
showed the observed job deployment status was MISSING.

12:10 We submitted the second spec, which deployed the job with the last state 
upgrade mode.

12:10 Logs showed the operator deleted both the Flink deployment and the HA 
data again.

12:10 The job failed to start because the HA data was missing.

According to the log, the deletion was triggered by 
https://github.com/apache/flink-kubernetes-operator/blob/a728ba768e20236184e2b9e9e45163304b8b196c/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L168

I think this would only be triggered if the job deployment status wasn't 
MISSING. But the log before the deletion showed the observed job status was 
MISSING at that moment.

Related logs:

 
{code:java}
2023-08-30 12:08:48.190 + o.a.f.k.o.s.AbstractFlinkService [INFO 
][default/pipeline-pipeline-se-3] Cluster shutdown completed.
2023-08-30 12:10:27.010 + o.a.f.k.o.o.d.ApplicationObserver [INFO 
][default/pipeline-pipeline-se-3] Observing JobManager deployment. Previous 
status: MISSING
2023-08-30 12:10:27.533 + o.a.f.k.o.l.AuditUtils         [INFO 
][default/pipeline-pipeline-se-3] >>> Event  | Info    | SPECCHANGED     | 
UPGRADE change(s) detected (Diff: FlinkDeploymentSpec[image : 
docker-registry.randomcompany.com/octopus/pipeline-pipeline-online:0835137c-362 
-> 
docker-registry.randomcompany.com/octopus/pipeline-pipeline-online:23db7ae8-365,
 podTemplate.metadata.labels.app.kubernetes.io~1version : 
0835137cd803b7258695eb53a6ec520cb62a48a7 -> 
23db7ae84bdab8d91fa527fe2f8f2fce292d0abc, job.state : suspended -> running, 
job.upgradeMode : last-state -> savepoint, restartNonce : 1545 -> 1547]), 
starting reconciliation.
2023-08-30 12:10:27.679 + o.a.f.k.o.s.NativeFlinkService [INFO 
][default/pipeline-pipeline-se-3] Deleting JobManager deployment and HA 
metadata.
{code}
A more complete log file is attached. Thanks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33010) NPE when using GREATEST() in Flink SQL

2023-08-31 Thread Hector Rios (Jira)
Hector Rios created FLINK-33010:
---

 Summary: NPE when using GREATEST() in Flink SQL
 Key: FLINK-33010
 URL: https://issues.apache.org/jira/browse/FLINK-33010
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API, Table SQL / Planner
Affects Versions: 1.16.2, 1.16.1
Reporter: Hector Rios


Hi,

I see NPEs in flink 1.14 and flink 1.16 when running queries with GREATEST() 
and timestamps. Below is an example to help in reproducing the issue.
{code:java}
CREATE TEMPORARY VIEW Positions AS
SELECT
SecurityId,
ccy1,
CAST(publishTimestamp AS TIMESTAMP(3)) as publishTimestampFROM (VALUES
(1, 'USD', '2022-01-01'),
(2, 'GBP', '2022-02-02'),
(3, 'GBX', '2022-03-03'),
(4, 'GBX', '2022-04-4'))
AS ccy(SecurityId, ccy1, publishTimestamp);

CREATE TEMPORARY VIEW Benchmarks AS
SELECT
SecurityId,
ccy1,
CAST(publishTimestamp AS TIMESTAMP(3)) as publishTimestampFROM (VALUES
(3, 'USD', '2023-01-01'),
(4, 'GBP', '2023-02-02'),
(5, 'GBX', '2023-03-03'),
(6, 'GBX', '2023-04-4'))
AS ccy(SecurityId, ccy1, publishTimestamp);

SELECT *,
GREATEST(
IFNULL(Positions.publishTimestamp,CAST('1970-1-1' AS TIMESTAMP(3))),
IFNULL(Benchmarks.publishTimestamp,CAST('1970-1-1' AS TIMESTAMP(3)))
)
FROM Positions
FULL JOIN Benchmarks ON Positions.SecurityId = Benchmarks.SecurityId {code}
 

Using "IF" is a workaround at the moment instead of using "GREATEST"

  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-348: Make expanding behavior of virtual metadata columns configurable

2023-08-31 Thread Sergey Nuyanzin
+1 (binding)

On Thu, Aug 31, 2023 at 9:28 AM Benchao Li  wrote:

> +1 (binding)
>
> Martijn Visser  于2023年8月31日周四 15:24写道:
> >
> > +1 (binding)
> >
> > On Thu, Aug 31, 2023 at 9:09 AM Timo Walther  wrote:
> >
> > > Hi everyone,
> > >
> > > I'd like to start a vote on FLIP-348: Make expanding behavior of
> virtual
> > > metadata columns configurable [1] which has been discussed in this
> > > thread [2].
> > >
> > > The vote will be open for at least 72 hours unless there is an
> objection
> > > or not enough votes.
> > >
> > > [1] https://cwiki.apache.org/confluence/x/_o6zDw
> > > [2] https://lists.apache.org/thread/zon967w7synby8z6m1s7dj71dhkh9ccy
> > >
> > > Cheers,
> > > Timo
> > >
>
>
>
> --
>
> Best,
> Benchao Li
>


-- 
Best regards,
Sergey


Re: [DISCUSS] FLIP-358: flink-avro enhancement and cleanup

2023-08-31 Thread Ryan Skraba
Hey -- I have a certain knowledge of Avro, and I'd be willing to help
out with some of these enhancements, writing tests and reviewing.  I
have a *lot* of Avro schemas available for validation!

The FLIP looks pretty good and covers the possible cases pretty
rigorously. I wasn't aware of some of the gaps you've pointed out
here!

How useful do you think the new ENUM_STRING DataType would be outside
of the Avro use case?  It seems like a good enough addition that would
solve the problem here.

A small note: I assume the AvroSchemaUtils is meant to be annotated
@PublicEvolving as well.

All my best, Ryan


On Tue, Aug 29, 2023 at 4:35 AM Becket Qin  wrote:
>
> Hi folks,
>
> I would like to start the discussion about FLIP-158[1] which proposes to
> clean up and enhance the Avro support in Flink. More specifically, it
> proposes to:
>
> 1. Make it clear what are the public APIs in flink-avro components.
> 2. Fix a few buggy cases in flink-avro
> 3. Add more supported Avro use cases out of the box.
>
> Feedbacks are welcome!
>
> Thanks
>
> Jiangjie (Becket) Qin
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-358%3A+flink-avro+enhancement+and+cleanup


[jira] [Created] (FLINK-33009) tools/release/update_japicmp_configuration.sh

2023-08-31 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-33009:
-

 Summary: tools/release/update_japicmp_configuration.sh
 Key: FLINK-33009
 URL: https://issues.apache.org/jira/browse/FLINK-33009
 Project: Flink
  Issue Type: Bug
  Components: Release System
Affects Versions: 1.19.0
Reporter: Matthias Pohl


According to Flink's API compatibility constraints, we only support binary 
compatibility between versions. In 
[apache-flink:pom.xml:2246|https://github.com/apache/flink/blob/aa8d93ea239f5be79066b7e5caad08d966c86ab2/pom.xml#L2246]
 we have binary compatibility enabled even in {{master}}. This doesn't comply 
with the rules. We should this flag disabled in {{master}}. The 
{{tools/release/update_japicmp_configuration.sh}} should enable this flag in 
the release branch as part of the release process.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-348: Make expanding behavior of virtual metadata columns configurable

2023-08-31 Thread Benchao Li
+1 (binding)

Martijn Visser  于2023年8月31日周四 15:24写道:
>
> +1 (binding)
>
> On Thu, Aug 31, 2023 at 9:09 AM Timo Walther  wrote:
>
> > Hi everyone,
> >
> > I'd like to start a vote on FLIP-348: Make expanding behavior of virtual
> > metadata columns configurable [1] which has been discussed in this
> > thread [2].
> >
> > The vote will be open for at least 72 hours unless there is an objection
> > or not enough votes.
> >
> > [1] https://cwiki.apache.org/confluence/x/_o6zDw
> > [2] https://lists.apache.org/thread/zon967w7synby8z6m1s7dj71dhkh9ccy
> >
> > Cheers,
> > Timo
> >



-- 

Best,
Benchao Li


Re: [VOTE] FLIP-348: Make expanding behavior of virtual metadata columns configurable

2023-08-31 Thread Martijn Visser
+1 (binding)

On Thu, Aug 31, 2023 at 9:09 AM Timo Walther  wrote:

> Hi everyone,
>
> I'd like to start a vote on FLIP-348: Make expanding behavior of virtual
> metadata columns configurable [1] which has been discussed in this
> thread [2].
>
> The vote will be open for at least 72 hours unless there is an objection
> or not enough votes.
>
> [1] https://cwiki.apache.org/confluence/x/_o6zDw
> [2] https://lists.apache.org/thread/zon967w7synby8z6m1s7dj71dhkh9ccy
>
> Cheers,
> Timo
>


[jira] [Created] (FLINK-33008) Use KubernetesClient from JOSDK context

2023-08-31 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-33008:
--

 Summary: Use KubernetesClient from JOSDK context
 Key: FLINK-33008
 URL: https://issues.apache.org/jira/browse/FLINK-33008
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Gyula Fora


We are currently manually creating and passing around the KubernetesClient 
instances. 

This is now accessible directly from the JOSDK Context, so we should use it 
from there to simplify the code.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33007) Integrate autoscaler config validation into the general validator flow

2023-08-31 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-33007:
--

 Summary: Integrate autoscaler config validation into the general 
validator flow
 Key: FLINK-33007
 URL: https://issues.apache.org/jira/browse/FLINK-33007
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Gyula Fora
 Fix For: kubernetes-operator-1.7.0


Currently autoscaler configs are not validated at all but cause runtime 
failures of the autoscaler mechanism. 

We should create a custom autoscaler config validator plugin and hook it up 
into the core validation flow



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[VOTE] FLIP-348: Make expanding behavior of virtual metadata columns configurable

2023-08-31 Thread Timo Walther

Hi everyone,

I'd like to start a vote on FLIP-348: Make expanding behavior of virtual 
metadata columns configurable [1] which has been discussed in this 
thread [2].


The vote will be open for at least 72 hours unless there is an objection 
or not enough votes.


[1] https://cwiki.apache.org/confluence/x/_o6zDw
[2] https://lists.apache.org/thread/zon967w7synby8z6m1s7dj71dhkh9ccy

Cheers,
Timo


[jira] [Created] (FLINK-33006) Add e2e test for Kubernetes Operator HA

2023-08-31 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-33006:
--

 Summary: Add e2e test for Kubernetes Operator HA
 Key: FLINK-33006
 URL: https://issues.apache.org/jira/browse/FLINK-33006
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Gyula Fora
 Fix For: kubernetes-operator-1.7.0


There is currently no proper test coverage for operator HA



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33005) Upgrade JOSDK to 4.4.2 and Fabric8 to 6.8.1

2023-08-31 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-33005:
--

 Summary: Upgrade JOSDK to 4.4.2 and Fabric8 to 6.8.1
 Key: FLINK-33005
 URL: https://issues.apache.org/jira/browse/FLINK-33005
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Gyula Fora
 Fix For: kubernetes-operator-1.7.0


The JOSDK / fabric8 dept haven't been upgraded in a while. We should upgrade 
these critical clients to the latest stable versions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)