[jira] [Created] (FLINK-22957) Rank TTL should use enableTimeToLive of state instead of timer

2021-06-09 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-22957:


 Summary: Rank TTL should use enableTimeToLive of state instead of 
timer
 Key: FLINK-22957
 URL: https://issues.apache.org/jira/browse/FLINK-22957
 Project: Flink
  Issue Type: Sub-task
Reporter: Jingsong Lee






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22956) Stream Over TTL should use enableTimeToLive of state instead of timer

2021-06-09 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-22956:


 Summary: Stream Over TTL should use enableTimeToLive of state 
instead of timer
 Key: FLINK-22956
 URL: https://issues.apache.org/jira/browse/FLINK-22956
 Project: Flink
  Issue Type: Sub-task
Reporter: Jingsong Lee
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22955) lookup join filter push down result to mismatch function signature

2021-06-09 Thread Cooper Luan (Jira)
Cooper Luan created FLINK-22955:
---

 Summary: lookup join filter push down result to mismatch function 
signature
 Key: FLINK-22955
 URL: https://issues.apache.org/jira/browse/FLINK-22955
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.12.4, 1.13.1, 1.11.3
 Environment: Flink 1.13.1

how to reproduce: patch file attached
Reporter: Cooper Luan
 Attachments: 
0001-try-to-produce-lookup-join-filter-pushdown-expensive.patch

a sql like this may result to look function signature mismatch exception when 
explain sql
{code:sql}
CREATE TEMPORARY VIEW v_vvv AS
SELECT * FROM MyTable AS T
JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
ON T.a = D.id;

SELECT a,b,id,name
FROM v_vvv
WHERE age = 10;{code}
the lookup function is
{code:scala}
class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
  def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer): 
Unit = {
  }
}{code}
exec plan is
{code:java}
LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], 
fields=[a, b, id, name])
+- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], 
joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = 10)], 
select=[a, b, id, name])
   +- Calc(select=[a, b])
  +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
{code}
the "lookup=[age=10, id=a]" result to mismatch signature mismatch

 

but if I add 1 more insert, it works well
{code:sql}
SELECT a,b,id,name
FROM v_vvv
WHERE age = 30
{code}
exec plan is
{code:java}
== Optimized Execution Plan ==
LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], 
joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, 
rowtime, id, name, age, ts])(reuse_id=[1])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, 
rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], 
fields=[a, b, id, name])
+- Calc(select=[a, b, id, name], where=[(age = 10)])
   +- 
Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`],
 fields=[a, b, id, name])
+- Calc(select=[a, b, id, name], where=[(age = 30)])
   +- Reused(reference_id=[1])

{code}
 the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" 
(wrong)

 

so, in "multi insert" case, planner works great

in "single insert" case, planner throw exception



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Definition of idle partitions

2021-06-09 Thread Tzu-Li (Gordon) Tai
Forgot to provide the link to the [1] reference:

[1] https://issues.apache.org/jira/browse/FLINK-5017

On Thu, Jun 10, 2021 at 11:43 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi everyone,
>
> Sorry for chiming in late here.
>
> Regarding the topic of changing the definition of StreamStatus and
> changing the name as well:
> After digging into some of the roots of this implementation [1], initially
> the StreamStatus was actually defined to mark "watermark idleness", and not
> "record idleness" (in fact, the alternative name "WatermarkStatus" was
> considered at the time).
>
> The concern at the time causing us to alter the definition to be "record
> idleness" in the final implementation was due to the existence of periodic
> timestamp / watermark generators within the pipeline. Those would continue
> to generate non-increasing watermarks in the absence of any input records
> from upstream. In this scenario, downstream operators would not be able to
> consider that channel as idle and therefore watermark progress is locked.
> We could consider a timeout-based approach on those specific operators to
> toggle watermark idleness if the values remain constant for a period of
> time, but then again, this is very ill-defined and most likely wrong.
>
> I have not followed the newest changes to the watermark generator
> operators and am not sure if this issue is still relevant.
> Otherwise, I don't see other problems with changing the definition here.
>
> Thanks,
> Gordon
>
> On Wed, Jun 9, 2021 at 3:06 PM Arvid Heise  wrote:
>
>> Hi Eron,
>>
>> again to recap from the other thread:
>> - You are right that idleness is correct with static assignment and fully
>> active partitions. In this case, the source defines idleness. (case A)
>> - For the more pressing use cases of idle, assigned partitions, the user
>> defines an idleness threshold, and it becomes potentially incorrect, when
>> the partition becomes active again. (case B)
>> - Same holds for dynamic assignment of splits. If a source without a split
>> gets a split assigned dynamically, there is a realistic chance that the
>> watermark advanced past the first record of the newly assigned split.
>> (case
>> C)
>> You can certainly insist that only the first case is valid (as it's
>> correct) but we know that users use it in other ways and that was also the
>> intent of the devs.
>>
>> Now the question could be if it makes sense to distinguish these cases.
>> Would you treat the idleness information differently (especially in the
>> sink/source that motivated FLIP-167) if you knew that the idleness is
>> guaranteed correct?
>> We could have some WatermarkStatus with ACTIVE, IDLE (case A), TIMEOUT
>> (case B).
>>
>> However, that would still leave case C, which probably would need to be
>> solved completely differently. I could imagine that a source with dynamic
>> assignments should never have IDLE subtasks and rather manage the idleness
>> itself. For example, it could emit a watermark per second/minute that is
>> directly fetched from the source system. I'm just not sure if the current
>> WatermarkAssigner interface suffices in that regard...
>>
>>
>> On Wed, Jun 9, 2021 at 7:31 AM Piotr Nowojski 
>> wrote:
>>
>> > Hi Eron,
>> >
>> > Can you elaborate a bit more what do you mean? I don’t understand what
>> do
>> > you mean by more general solution.
>> >
>> > As of now, stream is marked idle by a source/watermark generator, which
>> > has an effect of temporarily ignoring this stream/partition from
>> > calculating min watermark in the downstream tasks. However stream is
>> > switching back to active when any record is emitted. This is what’s
>> causing
>> > problems described by Arvid.
>> >
>> > The core of our proposal is very simple. Keep everything as it is except
>> > stating that stream will be changed back to active only once a
>> watermark is
>> > emitted again - not record. In other words disconnecting idleness from
>> > presence of records and connecting it only to presence or lack of
>> > watermarks and allowing to emit records while “stream status” is “idle”
>> >
>> > Piotrek
>> >
>> >
>> > > Wiadomość napisana przez Eron Wright > .invalid>
>> > w dniu 09.06.2021, o godz. 06:01:
>> > >
>> > > It seems to me that idleness was introduced to deal with a very
>> specific
>> > > issue.  In the pipeline, watermarks are aggregated not on a per-split
>> > basis
>> > > but on a per-subtask basis.  This works well when each subtask has
>> > exactly
>> > > one split.  When a sub-task has multiple splits, various complications
>> > > occur involving the commingling of watermarks.  And when a sub-task
>> has
>> > no
>> > > splits, the pipeline stalls altogether.  To deal with the latter
>> problem,
>> > > idleness was introduced.  The sub-task simply declares itself to be
>> idle
>> > to
>> > > be taken out of consideration for purposes of watermark aggregation.
>> > >
>> > > If we're looking for a more general solution, I would suggest we
>> discuss
>> > > how to 

Re: [DISCUSS] Definition of idle partitions

2021-06-09 Thread Tzu-Li (Gordon) Tai
Hi everyone,

Sorry for chiming in late here.

Regarding the topic of changing the definition of StreamStatus and changing
the name as well:
After digging into some of the roots of this implementation [1], initially
the StreamStatus was actually defined to mark "watermark idleness", and not
"record idleness" (in fact, the alternative name "WatermarkStatus" was
considered at the time).

The concern at the time causing us to alter the definition to be "record
idleness" in the final implementation was due to the existence of periodic
timestamp / watermark generators within the pipeline. Those would continue
to generate non-increasing watermarks in the absence of any input records
from upstream. In this scenario, downstream operators would not be able to
consider that channel as idle and therefore watermark progress is locked.
We could consider a timeout-based approach on those specific operators to
toggle watermark idleness if the values remain constant for a period of
time, but then again, this is very ill-defined and most likely wrong.

I have not followed the newest changes to the watermark generator operators
and am not sure if this issue is still relevant.
Otherwise, I don't see other problems with changing the definition here.

Thanks,
Gordon

On Wed, Jun 9, 2021 at 3:06 PM Arvid Heise  wrote:

> Hi Eron,
>
> again to recap from the other thread:
> - You are right that idleness is correct with static assignment and fully
> active partitions. In this case, the source defines idleness. (case A)
> - For the more pressing use cases of idle, assigned partitions, the user
> defines an idleness threshold, and it becomes potentially incorrect, when
> the partition becomes active again. (case B)
> - Same holds for dynamic assignment of splits. If a source without a split
> gets a split assigned dynamically, there is a realistic chance that the
> watermark advanced past the first record of the newly assigned split. (case
> C)
> You can certainly insist that only the first case is valid (as it's
> correct) but we know that users use it in other ways and that was also the
> intent of the devs.
>
> Now the question could be if it makes sense to distinguish these cases.
> Would you treat the idleness information differently (especially in the
> sink/source that motivated FLIP-167) if you knew that the idleness is
> guaranteed correct?
> We could have some WatermarkStatus with ACTIVE, IDLE (case A), TIMEOUT
> (case B).
>
> However, that would still leave case C, which probably would need to be
> solved completely differently. I could imagine that a source with dynamic
> assignments should never have IDLE subtasks and rather manage the idleness
> itself. For example, it could emit a watermark per second/minute that is
> directly fetched from the source system. I'm just not sure if the current
> WatermarkAssigner interface suffices in that regard...
>
>
> On Wed, Jun 9, 2021 at 7:31 AM Piotr Nowojski 
> wrote:
>
> > Hi Eron,
> >
> > Can you elaborate a bit more what do you mean? I don’t understand what do
> > you mean by more general solution.
> >
> > As of now, stream is marked idle by a source/watermark generator, which
> > has an effect of temporarily ignoring this stream/partition from
> > calculating min watermark in the downstream tasks. However stream is
> > switching back to active when any record is emitted. This is what’s
> causing
> > problems described by Arvid.
> >
> > The core of our proposal is very simple. Keep everything as it is except
> > stating that stream will be changed back to active only once a watermark
> is
> > emitted again - not record. In other words disconnecting idleness from
> > presence of records and connecting it only to presence or lack of
> > watermarks and allowing to emit records while “stream status” is “idle”
> >
> > Piotrek
> >
> >
> > > Wiadomość napisana przez Eron Wright 
> > w dniu 09.06.2021, o godz. 06:01:
> > >
> > > It seems to me that idleness was introduced to deal with a very
> specific
> > > issue.  In the pipeline, watermarks are aggregated not on a per-split
> > basis
> > > but on a per-subtask basis.  This works well when each subtask has
> > exactly
> > > one split.  When a sub-task has multiple splits, various complications
> > > occur involving the commingling of watermarks.  And when a sub-task has
> > no
> > > splits, the pipeline stalls altogether.  To deal with the latter
> problem,
> > > idleness was introduced.  The sub-task simply declares itself to be
> idle
> > to
> > > be taken out of consideration for purposes of watermark aggregation.
> > >
> > > If we're looking for a more general solution, I would suggest we
> discuss
> > > how to track watermarks on a per-split basis.  Or, as Till mentioned
> > > recently, an alternate solution may be to dynamically adjust the
> > > parallelism of the task.
> > >
> > > I don't agree with the notion that idleness involves a correctness
> > > tradeoff.  The facility I described above has no impact on 

1

2021-06-09 Thread fengcheng


Re: [DISCUSS]FLIP-170 Adding Checkpoint Rejection Mechanism

2021-06-09 Thread Senhong Liu
Hi Piotrek,

Thanks for your feedback!

1. Why not ExternallyInducedSourceReader/ExternallyInducedSource?
a. The
`org.apache.flink.api.connector.source.ExternallyInducedSourceReader` and
`org.apache.flink.api.connector.source.ExternallyInducedSource` seems like
playing the role of checkpoint coordinator. Once it is implmented, the
source reader might need to design the similar logic as what the checkpoint
coordinator does. I think it would be better to let the checkpoint
coordinator plays its own role of triggering the checkpoint.
b. The new interface can not only implemented in the source operator, but
also the other operators. However, I am not having a solid use case about
implementing it to downstream operator so far. So basically it's for the
future compatibility.

2. Why not exception?
Actually, I don't think rejecting a checkpoint is an exception. Just like
the soft failure I introduced in the FLIP, the rejection and
therefore checkpoint failure could be acceptable to the user. However, the
tolerable checkpoint number is only counting on those failures that are NOT
acceptable to the users, e.g., checkpoint expiration.



Piotr Nowojski  于2021年6月9日周三 下午3:37写道:

> Hi Senhong,
>
> Thanks for the proposal. I have a couple of questions.
>
> Have you seen
> `org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource` (for
> the legacy SourceFunction) and
> `org.apache.flink.api.connector.source.ExternallyInducedSourceReader` (for
> FLIP-27) interfaces? They work the other way around, by letting the source
> to trigger/initiate a checkpoint, instead of declining it. Could it be made
> to work in your use case? If not, can you explain why?
>
> Regarding declining/failing the checkpoint (without blocking the barrier
> waiting for snapshot availability), can not you achieve the same thing by a
> combination of throwing an exception in for example
> `org.apache.flink.api.connector.source.SourceReader#snapshotState` call and
> setting the tolerable checkpoint failure number? [1]
>
> Best, Piotrek
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/environment/CheckpointConfig.html#setTolerableCheckpointFailureNumber-int-
>
> śr., 9 cze 2021 o 09:11 Senhong Liu  napisał(a):
>
> > Here is some brief context about the new feature.
> >
> > 1. Actively checkpoint rejecting by the operator. Follow by the current
> > checkpoint mechanism, one more preliminary step is added to help the
> > operator determine that if it is able to take snapshots. The preliminary
> > step is a new API provided to the users/developers. The new API will be
> > implemented in the Source API (the new one based on FLIP-27) for CDC
> > implementation. The new API can also be implemented in other operator if
> > necessary.
> >
> > 2. Handling the failure returned from the operator. If the checkpoint is
> > rejected by the operator, an appropriate failure reason needs to be
> > returned
> > from the operator as well. In the current design, two failure reasons are
> > listed, soft failure and hard failure. The previous one would be ignored
> by
> > the Flink and the later one would be counted as continuous checkpoint
> > failure according to the current checkpoint failure manager mechanism.
> >
> > 3. To prevent that the operator keeps reporting soft failure and
> therefore
> > no checkpoint can be completed for a long time, we introduce a new
> > configuration about the tolerable checkpoint failure timeout, which is a
> > timer that starts with the checkpoint scheduler. Overall, the timer would
> > only be reset if and only if the checkpoint completes. Otherwise, it
> would
> > do nothing until the tolerable timeout is hit. If the timer rings, it
> would
> > then trigger the current checkpoint failover.
> >
> > Question:
> > a. According to the current design, the checkpoint might fail for a
> > possibly
> > long time with a large checkpoint interval, for example. Is there any
> > better
> > idea to make the checkpoint more likely to succeed? For example, trigger
> > the
> > checkpoint immediately after the last one is rejected. But it seems
> > unappropriate because that would increase the overhead.
> > b. Is there any better idea on handling the soft failure?
> >
> >
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> >
>


[jira] [Created] (FLINK-22954) Don't support consuming update and delete changes when use table function that does not contain table field

2021-06-09 Thread hehuiyuan (Jira)
hehuiyuan created FLINK-22954:
-

 Summary: Don't support consuming update and delete changes when 
use table function that does not contain table field
 Key: FLINK-22954
 URL: https://issues.apache.org/jira/browse/FLINK-22954
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Reporter: hehuiyuan


{code:java}
Exception in thread "main" org.apache.flink.table.api.TableException: Table 
sink 'default_catalog.default_database.kafkaTableSink' doesn't support 
consuming update and delete changes which is produced by node 
Join(joinType=[LeftOuterJoin], where=[true], select=[name, word], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])Exception in thread 
"main" org.apache.flink.table.api.TableException: Table sink 
'default_catalog.default_database.kafkaTableSink' doesn't support consuming 
update and delete changes which is produced by node 
Join(joinType=[LeftOuterJoin], where=[true], select=[name, word], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:382)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:265)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.immutable.Range.foreach(Range.scala:160) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:329)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:279)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.immutable.Range.foreach(Range.scala:160) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:329)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:125)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:50)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:39)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
 at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
 at 

Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource Requirements

2021-06-09 Thread Yangze Guo
Thanks all for the discussion. I've updated the FLIP accordingly, the
key changes are:
- Introduce SlotSharingGroup instead of ResourceSpec which contains
the resource spec of slot sharing group
- Introduce two interfaces for specifying the SlotSharingGroup:
#slotSharingGroup(SlotSharingGroup) and
StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup).

If there is no more feedback, I'd start a vote next week.

Best,
Yangze Guo

On Wed, Jun 9, 2021 at 10:46 AM Yangze Guo  wrote:
>
> Thanks for the valuable suggestion, Arvid.
>
> 1) Yes, we can add a new SlotSharingGroup which includes the name and
> its resource. After that, we have two interfaces for configuring the
> slot sharing group of an operator:
> - #slotSharingGroup(String name)// the resource of it can be
> configured through StreamExecutionEnvironment#registerSlotSharingGroup
> - #slotSharingGroup(SlotSharingGroup ssg) // Directly configure the resource
> And one interface to configure the resource of a SSG:
> - StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup)
> We can also define the priority of the above two approaches, e.g. the
> resource registering in the StreamExecutionEnvironment will always be
> respected when conflict. That would be well documented.
>
> 2) Yes, I originally add this interface as a shortcut. It seems
> unnecessary now. Will remove it.
>
> 3) I don't think we need to expose the ExternalResource. In the
> builder of SlotSharingGroup, we can introduce a
> #withExternalResource(String name, double value). Also, this interface
> needs to be annotated as evolving.
>
> 4) Actually, I've mentioned it in the FLIP. Maybe it would be good to
> elaborate on the Builder for the SlotSharingGroup.
>
> WDYT?
>
> Best,
> Yangze Guo
>
> On Tue, Jun 8, 2021 at 6:51 PM Arvid Heise  wrote:
> >
> > Hi Yangze,
> >
> > I like the general approach to bind requirements to slotsharing groups. I
> > think the current approach is also flexible enough that a user could simply
> > use ParameterTool or similar to use config values and wire that with their
> > slotgroups, such that different requirements can be tested without
> > recompilation. So I don't see an immediate need to provide a generic
> > solution for yaml configuration for now.
> >
> > Looking at the programmatic interface though, I think we could improve by
> > quite a bit and I haven't seen these alternatives being considered in the
> > FLIP:
> > 1) Add new class SlotSharingGroup that incorporates all ResourceSpec
> > properties. Instead of using group names, the user could directly configure
> > such an object.
> >
> > SlotSharingGroup ssg1 = new SlotSharingGroup("ssg-1"); // name
> > could also be omitted and auto-generated
> > ssg1.setCPUCores(4);
> > ...
> > DataStream> grades =
> > GradeSource
> > .getSource(env, rate)
> >
> > .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())
> > .slotSharingGroup(ssg1);
> > DataStream> salaries =
> > SalarySource.getSource(env, rate)
> >
> > .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())
> > .slotSharingGroup(ssg2);
> >
> > // run the actual window join program with the same slot sharing
> > group as grades
> > DataStream> joinedStream =
> > runWindowJoin(grades, salaries,
> > windowSize).slotSharingGroup(ssg1);
> >
> > Note that we could make it backward compatible by changing the proposed
> > StreamExecutionEnvironment#setSlotSharingGroupResource to
> > StreamExecutionEnvironment#registerSlotSharingGroup(SlotSharingGroup) and
> > then use the string name for further reference.
> >
> > 2) I'm also not sure on the StreamExecutionEnvironment#
> > setSlotSharingGroupResources. What's the benefit of the Map version over
> > having the simple setter? Even if the user has a map
> > slotSharingGroupResources, he could simply do
> > slotSharingGroupResources.forEach(env::setSlotSharingGroupResource);
> >
> > 3) Is defining the ExternalResource part of this FLIP? I don't see a
> > Public* class yet. I'd be also fine to cut the scope of this FLIP and
> > remove it for now and annotate ResourceSpec/SlotSharingGroup evolving.
> >
> > 4) We should probably use a builder pattern around
> > ResourceSpec/SlotSharingGroup as in the current ResourceSpec. I don't think
> > we need to fully specify that in the FLIP but it would be good to at least
> > say how they should be created by the user.
> >
> >
> >
> > On Tue, Jun 8, 2021 at 10:58 AM Yangze Guo  wrote:
> >
> > > @Yang
> > > In short, the external resources will participate in resource
> > > deduction and be logically ensured, but requesting an external
> > > resource must still be done through config options with the current
> > > default resource allocation strategy.
> > > In FLIP-56, we abstract the logic of resource allocation to the
> > > 

Re: [VOTE] Watermark propagation with Sink API

2021-06-09 Thread Tzu-Li (Gordon) Tai
+1

On Thu, Jun 10, 2021 at 9:04 AM jincheng sun 
wrote:

> +1 (binding)  // Sorry for the late reply.
>
> Best,
> Jincheng
>
>
> Piotr Nowojski  于2021年6月9日周三 下午10:04写道:
>
> > Thanks for driving this Eron, and sorry for causing the delay.
> >
> > +1 (binding) from my side
> >
> > Piotrek
> >
> > wt., 8 cze 2021 o 23:48 Eron Wright 
> > napisał(a):
> >
> > > Voting is re-open for FLIP-167 as-is (without idleness support as was
> the
> > > point of contention).
> > >
> > > On Fri, Jun 4, 2021 at 10:45 AM Eron Wright 
> > > wrote:
> > >
> > > > Little update on this, more good discussion over the last few days,
> and
> > > > the FLIP will probably be amended to incorporate idleness.   Voting
> > will
> > > > remain open until, let's say, mid-next week.
> > > >
> > > > On Thu, Jun 3, 2021 at 8:00 AM Piotr Nowojski 
> > > > wrote:
> > > >
> > > >> I would like to ask you to hold on with counting the votes until I
> get
> > > an
> > > >> answer for my one question in the dev mailing list thread (sorry if
> it
> > > was
> > > >> already covered somewhere).
> > > >>
> > > >> Best, Piotrek
> > > >>
> > > >> czw., 3 cze 2021 o 16:12 Jark Wu  napisał(a):
> > > >>
> > > >> > +1 (binding)
> > > >> >
> > > >> > Best,
> > > >> > Jark
> > > >> >
> > > >> > On Thu, 3 Jun 2021 at 21:34, Dawid Wysakowicz <
> > dwysakow...@apache.org
> > > >
> > > >> > wrote:
> > > >> >
> > > >> > > +1 (binding)
> > > >> > >
> > > >> > > Best,
> > > >> > >
> > > >> > > Dawid
> > > >> > >
> > > >> > > On 03/06/2021 03:50, Zhou, Brian wrote:
> > > >> > > > +1 (non-binding)
> > > >> > > >
> > > >> > > > Thanks Eron, looking forward to seeing this feature soon.
> > > >> > > >
> > > >> > > > Thanks,
> > > >> > > > Brian
> > > >> > > >
> > > >> > > > -Original Message-
> > > >> > > > From: Arvid Heise 
> > > >> > > > Sent: Wednesday, June 2, 2021 15:44
> > > >> > > > To: dev
> > > >> > > > Subject: Re: [VOTE] Watermark propagation with Sink API
> > > >> > > >
> > > >> > > >
> > > >> > > > [EXTERNAL EMAIL]
> > > >> > > >
> > > >> > > > +1 (binding)
> > > >> > > >
> > > >> > > > Thanks Eron for driving this effort; it will open up new
> > exciting
> > > >> use
> > > >> > > cases.
> > > >> > > >
> > > >> > > > On Tue, Jun 1, 2021 at 7:17 PM Eron Wright <
> > > ewri...@streamnative.io
> > > >> > > .invalid>
> > > >> > > > wrote:
> > > >> > > >
> > > >> > > >> After some good discussion about how to enhance the Sink API
> to
> > > >> > > >> process watermarks, I believe we're ready to proceed with a
> > vote.
> > > >> > > >> Voting will be open until at least Friday, June 4th, 2021.
> > > >> > > >>
> > > >> > > >> Reference:
> > > >> > > >>
> > > >> > > >>
> > > >> >
> > > https://urldefense.com/v3/__https://cwiki.apache.org/confluence/displa
> > > >> > > >>
> > > >>
> y/FLINK/FLIP-167*3A*Watermarks*for*Sink*API__;JSsrKys!!LpKI!zkBBhuqEEi
> > > >> > > >> fxF_fDQqAjqsbuWWFmnAvwRfEAWxeC63viFWXPLul-GCBb-PTq$
> > > >> > > >> [cwiki[.]apache[.]org]
> > > >> > > >>
> > > >> > > >> Discussion thread:
> > > >> > > >>
> > > >> > > >>
> > > >> >
> > > https://urldefense.com/v3/__https://lists.apache.org/thread.html/r5194
> > > >> > > >>
> > > >>
> e1cf157d1fd5ba7ca9b567cb01723bd582aa12dda57d25bca37e*40*3Cdev.flink.ap
> > > >> > > >> ache.org
> > > >> > *3E__;JSUl!!LpKI!zkBBhuqEEifxF_fDQqAjqsbuWWFmnAvwRfEAWxeC63viF
> > > >> > > >> WXPLul-GJXlxwqN$ [lists[.]apache[.]org]
> > > >> > > >>
> > > >> > > >> Implementation Issue:
> > > >> > > >>
> > > >> >
> > > https://urldefense.com/v3/__https://issues.apache.org/jira/browse/FLIN
> > > >> > > >>
> > > >>
> K-22700__;!!LpKI!zkBBhuqEEifxF_fDQqAjqsbuWWFmnAvwRfEAWxeC63viFWXPLul-G
> > > >> > > >> N6AJm4h$ [issues[.]apache[.]org]
> > > >> > > >>
> > > >> > > >> Thanks,
> > > >> > > >> Eron Wright
> > > >> > > >> StreamNative
> > > >> > > >>
> > > >> > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > >
> >
>


[jira] [Created] (FLINK-22953) Using Types.LIST(Types.STRING()) in datastream result in crash

2021-06-09 Thread awayne (Jira)
awayne created FLINK-22953:
--

 Summary: Using Types.LIST(Types.STRING()) in datastream result in 
crash
 Key: FLINK-22953
 URL: https://issues.apache.org/jira/browse/FLINK-22953
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.13.1
 Environment: python==3.7.5

flink==1.13.1
Reporter: awayne


In our business, we need to deserialize JSON like this:
{code:java}
{
  ...,
  "texts": ["a", "b", "c"],
  ...
}{code}
number of strings in list isn't fixed.

After define Row type using Types.LIST(Types.STRING()), job will crash:
{code:java}
  File 
"/home/ubuntu/pyflenv/lib/python3.7/site-packages/pyflink/datastream/stream_execution_environment.py",
 line 645, in execute  File 
"/home/ubuntu/pyflenv/lib/python3.7/site-packages/pyflink/datastream/stream_execution_environment.py",
 line 645, in execute    return 
JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
  File "/home/ubuntu/pyflenv/lib/python3.7/site-packages/py4j/java_gateway.py", 
line 1286, in __call__    answer, self.gateway_client, self.target_id, 
self.name)  File 
"/home/ubuntu/pyflenv/lib/python3.7/site-packages/pyflink/util/exceptions.py", 
line 146, in deco    return f(*a, **kw)  File 
"/home/ubuntu/pyflenv/lib/python3.7/site-packages/py4j/protocol.py", line 328, 
in get_return_value    format(target_id, ".", name), 
value)py4j.protocol.Py4JJavaError: An error occurred while calling 
o31.execute.: org.apache.flink.runtime.client.JobExecutionException: Job 
execution failed. at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
 at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
 at 
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
 at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
 at 
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
 at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
 at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
 at 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
 at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
 at 
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
 at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1081)
 at akka.dispatch.OnComplete.internal(Future.scala:264) at 
akka.dispatch.OnComplete.internal(Future.scala:261) at 
akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at 
akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at 
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
 at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) 
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) 
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
 at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
 at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) at 
scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) at 
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
 at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
 at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
 at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
 at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at 
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) at 
akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
 at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22952) docs_404_check fail on azure due to ruby version not available

2021-06-09 Thread Xintong Song (Jira)
Xintong Song created FLINK-22952:


 Summary: docs_404_check fail on azure due to ruby version not 
available
 Key: FLINK-22952
 URL: https://issues.apache.org/jira/browse/FLINK-22952
 Project: Flink
  Issue Type: Bug
  Components: Test Infrastructure
Affects Versions: 1.12.4
Reporter: Xintong Song


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18852=logs=6dc02e5c-5865-5c6a-c6c5-92d598e3fc43=404fcc1b-71ae-54f6-61c8-430a6aeff2b5

{code}
Starting: UseRubyVersion
==
Task : Use Ruby version
Description  : Use the specified version of Ruby from the tool cache, 
optionally adding it to the PATH
Version  : 0.186.0
Author   : Microsoft Corporation
Help : 
https://docs.microsoft.com/azure/devops/pipelines/tasks/tool/use-ruby-version
==
##[error]Version spec = 2.4 for architecture %25s did not match any version in 
Agent.ToolsDirectory.
Available versions: /opt/hostedtoolcache
2.5.9,2.6.7,2.7.3,3.0.1
If this is a Microsoft-hosted agent, check that this image supports 
side-by-side versions of Ruby at https://aka.ms/hosted-agent-software.
If this is a self-hosted agent, see how to configure side-by-side Ruby versions 
at https://go.microsoft.com/fwlink/?linkid=2005989.
Finishing: UseRubyVersion
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22951) First parameter of ROW() function must be constant

2021-06-09 Thread Fu Kai (Jira)
Fu Kai created FLINK-22951:
--

 Summary: First parameter of ROW() function must be constant
 Key: FLINK-22951
 URL: https://issues.apache.org/jira/browse/FLINK-22951
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.13.1
Reporter: Fu Kai
 Fix For: 1.14.0


We found the first parameter passed of ROW() function must be constant, it 
cannot be a variable or a function and reports error otherwise. The example is 
as below.
{code:java}
Flink SQL> select ROW(now(), 0));
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "(" at line 1, 
column 15.
Was expecting one of:
")" ...
"," ...
{code}
If we change the query to be following, it works fine.
{code:java}
select ROW(0, now()){code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22950) Back-pressure report final sink operator is not accurate

2021-06-09 Thread Fu Kai (Jira)
Fu Kai created FLINK-22950:
--

 Summary: Back-pressure report final sink operator is not accurate
 Key: FLINK-22950
 URL: https://issues.apache.org/jira/browse/FLINK-22950
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Metrics
Affects Versions: 1.13.1
Reporter: Fu Kai
 Fix For: 1.14.0


We noticed the back-pressure status of the final sink on the job graph is not 
accurate. It's apparently the final sink operator should receive back-pressure 
from the sink storage(ElasitcSearch/Kafka), while it's always showing not 
back-pressured(with value of 0, but always showing 100% busy though).

This is confirmed by switching the sink to be more powerful and without 
changing any other configurations. We noticed the record publish rate increases 
significantly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] Watermark propagation with Sink API

2021-06-09 Thread jincheng sun
+1 (binding)  // Sorry for the late reply.

Best,
Jincheng


Piotr Nowojski  于2021年6月9日周三 下午10:04写道:

> Thanks for driving this Eron, and sorry for causing the delay.
>
> +1 (binding) from my side
>
> Piotrek
>
> wt., 8 cze 2021 o 23:48 Eron Wright 
> napisał(a):
>
> > Voting is re-open for FLIP-167 as-is (without idleness support as was the
> > point of contention).
> >
> > On Fri, Jun 4, 2021 at 10:45 AM Eron Wright 
> > wrote:
> >
> > > Little update on this, more good discussion over the last few days, and
> > > the FLIP will probably be amended to incorporate idleness.   Voting
> will
> > > remain open until, let's say, mid-next week.
> > >
> > > On Thu, Jun 3, 2021 at 8:00 AM Piotr Nowojski 
> > > wrote:
> > >
> > >> I would like to ask you to hold on with counting the votes until I get
> > an
> > >> answer for my one question in the dev mailing list thread (sorry if it
> > was
> > >> already covered somewhere).
> > >>
> > >> Best, Piotrek
> > >>
> > >> czw., 3 cze 2021 o 16:12 Jark Wu  napisał(a):
> > >>
> > >> > +1 (binding)
> > >> >
> > >> > Best,
> > >> > Jark
> > >> >
> > >> > On Thu, 3 Jun 2021 at 21:34, Dawid Wysakowicz <
> dwysakow...@apache.org
> > >
> > >> > wrote:
> > >> >
> > >> > > +1 (binding)
> > >> > >
> > >> > > Best,
> > >> > >
> > >> > > Dawid
> > >> > >
> > >> > > On 03/06/2021 03:50, Zhou, Brian wrote:
> > >> > > > +1 (non-binding)
> > >> > > >
> > >> > > > Thanks Eron, looking forward to seeing this feature soon.
> > >> > > >
> > >> > > > Thanks,
> > >> > > > Brian
> > >> > > >
> > >> > > > -Original Message-
> > >> > > > From: Arvid Heise 
> > >> > > > Sent: Wednesday, June 2, 2021 15:44
> > >> > > > To: dev
> > >> > > > Subject: Re: [VOTE] Watermark propagation with Sink API
> > >> > > >
> > >> > > >
> > >> > > > [EXTERNAL EMAIL]
> > >> > > >
> > >> > > > +1 (binding)
> > >> > > >
> > >> > > > Thanks Eron for driving this effort; it will open up new
> exciting
> > >> use
> > >> > > cases.
> > >> > > >
> > >> > > > On Tue, Jun 1, 2021 at 7:17 PM Eron Wright <
> > ewri...@streamnative.io
> > >> > > .invalid>
> > >> > > > wrote:
> > >> > > >
> > >> > > >> After some good discussion about how to enhance the Sink API to
> > >> > > >> process watermarks, I believe we're ready to proceed with a
> vote.
> > >> > > >> Voting will be open until at least Friday, June 4th, 2021.
> > >> > > >>
> > >> > > >> Reference:
> > >> > > >>
> > >> > > >>
> > >> >
> > https://urldefense.com/v3/__https://cwiki.apache.org/confluence/displa
> > >> > > >>
> > >> y/FLINK/FLIP-167*3A*Watermarks*for*Sink*API__;JSsrKys!!LpKI!zkBBhuqEEi
> > >> > > >> fxF_fDQqAjqsbuWWFmnAvwRfEAWxeC63viFWXPLul-GCBb-PTq$
> > >> > > >> [cwiki[.]apache[.]org]
> > >> > > >>
> > >> > > >> Discussion thread:
> > >> > > >>
> > >> > > >>
> > >> >
> > https://urldefense.com/v3/__https://lists.apache.org/thread.html/r5194
> > >> > > >>
> > >> e1cf157d1fd5ba7ca9b567cb01723bd582aa12dda57d25bca37e*40*3Cdev.flink.ap
> > >> > > >> ache.org
> > >> > *3E__;JSUl!!LpKI!zkBBhuqEEifxF_fDQqAjqsbuWWFmnAvwRfEAWxeC63viF
> > >> > > >> WXPLul-GJXlxwqN$ [lists[.]apache[.]org]
> > >> > > >>
> > >> > > >> Implementation Issue:
> > >> > > >>
> > >> >
> > https://urldefense.com/v3/__https://issues.apache.org/jira/browse/FLIN
> > >> > > >>
> > >> K-22700__;!!LpKI!zkBBhuqEEifxF_fDQqAjqsbuWWFmnAvwRfEAWxeC63viFWXPLul-G
> > >> > > >> N6AJm4h$ [issues[.]apache[.]org]
> > >> > > >>
> > >> > > >> Thanks,
> > >> > > >> Eron Wright
> > >> > > >> StreamNative
> > >> > > >>
> > >> > >
> > >> > >
> > >> >
> > >>
> > >
> >
>


[jira] [Created] (FLINK-22949) java.io.InvalidClassException With Flink Kafka Beam

2021-06-09 Thread Ravikiran Borse (Jira)
Ravikiran Borse created FLINK-22949:
---

 Summary: java.io.InvalidClassException With Flink Kafka Beam
 Key: FLINK-22949
 URL: https://issues.apache.org/jira/browse/FLINK-22949
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.12.0
Reporter: Ravikiran Borse
 Fix For: 1.12.0


Beam: 2.30.0

Flink: 1.12.0

Kafka: 2.6.0



ERROR:root:java.io.InvalidClassException: 
org.apache.flink.streaming.api.graph.StreamConfig$NetworkInputConfig; local 
class incompatible: stream classdesc serialVersionUID = 3698633776553163849, 
local class serialVersionUID = -3137689219135046939

 

In Flink Logs

KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), 
KafkaIO.ReadSourceDescriptors} (1/1)#0 (b0c31371874208adb0ccaff85b971883) 
switched from RUNNING to FAILED.

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not 
deserialize inputs

        at 
org.apache.flink.streaming.api.graph.StreamConfig.getInputs(StreamConfig.java:265)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]

        at 
org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn(StreamConfig.java:280)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]

        at 
org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn1(StreamConfig.java:271)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]

        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.wrapOperatorIntoOutput(OperatorChain.java:639)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]

        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:591)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]

        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]

        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:164)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]

        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]

        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) 
~[flink-dist_2.12-1.12.0.jar:1.12.0]

        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) 
[flink-dist_2.12-1.12.0.jar:1.12.0]

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) 
[flink-dist_2.12-1.12.0.jar:1.12.0]

        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]

Caused by: java.io.InvalidClassException: 
org.apache.flink.streaming.api.graph.StreamConfig$NetworkInputConfig; local 
class incompatible: stream classdesc serialVersionUID = 3698633776553163849, 
local class serialVersionUID = -3137689219135046939



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-171: Async Sink

2021-06-09 Thread Piotr Nowojski
Hi Steffen,

Thanks for writing down the proposal. Back when the new Sink API was being
discussed, I was proposing to add our usual `CompletableFuture
isAvailable()` pattern to make sinks non-blocking. You can see the
discussion starting here [1], and continuing for a couple of more posts
until here [2]. Back then, the outcome was that it would give very little
benefit, at the expense of making the API more complicated. Could you maybe
relate your proposal to that discussion from last year?

I see that your proposal is going much further than just adding the
availability method, could you also motivate this a bit further? Could you
maybe reference/show some sinks that:
1. are already implemented using FLIP-143
2. that have some code duplication...
3. ...this duplication would be solved by FLIP-171

Best,
Piotrek

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44872.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-143-Unified-Sink-API-tp44602p44930.html

śr., 9 cze 2021 o 09:49 Hausmann, Steffen 
napisał(a):

> Hi there,
>
> We would like to start a discussion thread on "FLIP-171: Async Sink" [1],
> where we propose to create a common abstraction for destinations that
> support async requests. This abstraction will make it easier to add
> destinations to Flink by implementing a lightweight shim, while it avoids
> maintaining dozens of independent sinks.
>
> Looking forward to your feedback.
>
> Cheers, Steffen
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink
>
>
>
> Amazon Web Services EMEA SARL
> 38 avenue John F. Kennedy, L-1855 Luxembourg
> Sitz der Gesellschaft: L-1855 Luxemburg
> eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284
>
> Amazon Web Services EMEA SARL, Niederlassung Deutschland
> Marcel-Breuer-Str. 12, D-80807 Muenchen
> Sitz der Zweigniederlassung: Muenchen
> eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240,
> USt-ID DE317013094
>
>
>
>


Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-06-09 Thread Arvid Heise
Hi Piot,

I'm fine with just doing it on the Sink. My responses were focused on the
API (the how) not on the concept (the if). Just keep the methods on the
different places in sync, such that it is easy to introduce a common
interface later.

Re name: drain is not a reinvention as it's used quite often throughout
Flink (e.g., Mailbox, stopWithSavepoint with drain flag). flush has no link
to life-cycles: you usually do it arbitrarily often.
I like `finish` very much as it relates to JobStatus FINISHED, has a clear
tie to life-cycles, and is crisp.
I also thought about `terminate` but I'd clearly favor `finish` as the
verbs cannot be exchanged in the following: the task may terminate its
operators but the operators should finish their thing first.

On Wed, Jun 9, 2021 at 6:48 PM Piotr Nowojski  wrote:

> Hi,
>
> Arvid: What's the problem with providing `void flush()`/`void drain()` only
> in the `SinkFunction`? It would avoid the problem of typing. Why would one
> need to have it in the other `Rich***Function`s? For `flush()` to make
> sense, the entity which has this method, would need to buffer some data. I
> don't see this to be reasonable in anything but `SinkFunction`,
> `ProcessFunction` and operators.
>
> Re `flush()` vs `drain()`. Frankly, I wouldn't understand what `drain()` is
> all about without reading the java-doc, and afterwards, I would have an
> impression that someone wanted to reinvent a wheel :) `flush()` is kind of
> an industry standard for things like that. Furthermore I don't think
> `drain()` solves Till's concern (drain != stop + flush). `stopAndFlush()`
> would be better in this regard, but it also doesn't feel right. Maybe
> `finish()`?
>
> Piotrek
>
> śr., 9 cze 2021 o 11:51 Arvid Heise  napisał(a):
>
> > Hi Dawid,
> >
> > I see your point. I'd probably add drain only to Rich*Function where we
> > have the type bounds. Then we still need your Flushable interface in
> > Rich*Function<..., T> to call it efficiently but we at least avoid weird
> > type combinations. I'll have a rethink later.
> >
> > The proper solution is probably to add  to RichFunction and use Void
> > for RichSinkFunction but I'll have to understand the implications first.
> >
> > On Wed, Jun 9, 2021 at 11:37 AM Dawid Wysakowicz  >
> > wrote:
> >
> >> Hey,
> >>
> >> @Arvid The problem with adding the "drain/flush/stopProcessing" method
> to
> >> RichFunction is that it is not typed with the output type. At the same
> time
> >> we would most likely need a way to emit records from the method. That's
> >> originally thought about adding a typed interface which honestly I don't
> >> like that much either.
> >>
> >> On the UDF level we do not need to deprecate anything as you said. The
> >> close there already works as dispose on the Operator level. What we are
> >> suggesting is to unify that on the Operator level and deprecate the
> dispose
> >> there. @Yun I think we can already do that. We can either try to handle
> >> exceptions from the close in the case of a failure or just break it as
> it
> >> is a low-level, mostly internal API as Arvid said and also the migration
> >> would be really easy there.
> >>
> >> @Till @Arvid I am open for suggestions about the naming. I like the
> >> "drain" method.
> >>
> >> For now I'd go with @Piotr's proposal to add the "drain" method only to
> >> the SinkFunction. We think they are not immediately necessary for any of
> >> the other UDFs.
> >>
> >> Best,
> >>
> >> Dawid
> >> On 09/06/2021 11:20, Arvid Heise wrote:
> >>
> >> I have not followed the complete discussion and can't comment on the
> >> concepts. However, I have some ideas on the API changes:
> >>
> >> 1. If it's about adding additional life-cycle methods to UDFs, we should
> >> add the flush/endOfInput to RichFunction as this is the current way to
> >> define it. At this point, I don't see the need to add/change anything
> for
> >> UDFs. Since RichFunction does not have a dispose, do we even need to
> >> deprecate anything on UDF level? This would avoid having a new interface
> >> Flushable altogether (of which I'm not a big fan, see Piot's mail)
> >>
> >> 2. Further, I'd like to propose drain instead of flush as it would more
> >> align with the current nomenclature and makes the intent more obvious.
> >> However, that only works if there is no clash, so please double-check.
> >>
> >> 3. About changing methods on Operators: I'd say go ahead. It's
> >> experimental and not too hard to adjust on the user side. I also like
> the
> >> idea of beefing up ProcessFunction as a full replacement to custom
> >> operators but I'd keep that effort separate.
> >>
> >> On Wed, Jun 9, 2021 at 9:38 AM Till Rohrmann 
> >> wrote:
> >>
> >>> Thanks for the lively discussion everyone. I have to admit that I am
> not
> >>> really convinced that we should call the interface Flushable and the
> >>> method
> >>> flush. The problem is that this method should in the first place tell
> the
> >>> operator that it should stop processing and 

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-06-09 Thread Piotr Nowojski
Hi,

Arvid: What's the problem with providing `void flush()`/`void drain()` only
in the `SinkFunction`? It would avoid the problem of typing. Why would one
need to have it in the other `Rich***Function`s? For `flush()` to make
sense, the entity which has this method, would need to buffer some data. I
don't see this to be reasonable in anything but `SinkFunction`,
`ProcessFunction` and operators.

Re `flush()` vs `drain()`. Frankly, I wouldn't understand what `drain()` is
all about without reading the java-doc, and afterwards, I would have an
impression that someone wanted to reinvent a wheel :) `flush()` is kind of
an industry standard for things like that. Furthermore I don't think
`drain()` solves Till's concern (drain != stop + flush). `stopAndFlush()`
would be better in this regard, but it also doesn't feel right. Maybe
`finish()`?

Piotrek

śr., 9 cze 2021 o 11:51 Arvid Heise  napisał(a):

> Hi Dawid,
>
> I see your point. I'd probably add drain only to Rich*Function where we
> have the type bounds. Then we still need your Flushable interface in
> Rich*Function<..., T> to call it efficiently but we at least avoid weird
> type combinations. I'll have a rethink later.
>
> The proper solution is probably to add  to RichFunction and use Void
> for RichSinkFunction but I'll have to understand the implications first.
>
> On Wed, Jun 9, 2021 at 11:37 AM Dawid Wysakowicz 
> wrote:
>
>> Hey,
>>
>> @Arvid The problem with adding the "drain/flush/stopProcessing" method to
>> RichFunction is that it is not typed with the output type. At the same time
>> we would most likely need a way to emit records from the method. That's
>> originally thought about adding a typed interface which honestly I don't
>> like that much either.
>>
>> On the UDF level we do not need to deprecate anything as you said. The
>> close there already works as dispose on the Operator level. What we are
>> suggesting is to unify that on the Operator level and deprecate the dispose
>> there. @Yun I think we can already do that. We can either try to handle
>> exceptions from the close in the case of a failure or just break it as it
>> is a low-level, mostly internal API as Arvid said and also the migration
>> would be really easy there.
>>
>> @Till @Arvid I am open for suggestions about the naming. I like the
>> "drain" method.
>>
>> For now I'd go with @Piotr's proposal to add the "drain" method only to
>> the SinkFunction. We think they are not immediately necessary for any of
>> the other UDFs.
>>
>> Best,
>>
>> Dawid
>> On 09/06/2021 11:20, Arvid Heise wrote:
>>
>> I have not followed the complete discussion and can't comment on the
>> concepts. However, I have some ideas on the API changes:
>>
>> 1. If it's about adding additional life-cycle methods to UDFs, we should
>> add the flush/endOfInput to RichFunction as this is the current way to
>> define it. At this point, I don't see the need to add/change anything for
>> UDFs. Since RichFunction does not have a dispose, do we even need to
>> deprecate anything on UDF level? This would avoid having a new interface
>> Flushable altogether (of which I'm not a big fan, see Piot's mail)
>>
>> 2. Further, I'd like to propose drain instead of flush as it would more
>> align with the current nomenclature and makes the intent more obvious.
>> However, that only works if there is no clash, so please double-check.
>>
>> 3. About changing methods on Operators: I'd say go ahead. It's
>> experimental and not too hard to adjust on the user side. I also like the
>> idea of beefing up ProcessFunction as a full replacement to custom
>> operators but I'd keep that effort separate.
>>
>> On Wed, Jun 9, 2021 at 9:38 AM Till Rohrmann 
>> wrote:
>>
>>> Thanks for the lively discussion everyone. I have to admit that I am not
>>> really convinced that we should call the interface Flushable and the
>>> method
>>> flush. The problem is that this method should in the first place tell the
>>> operator that it should stop processing and flush all buffered data. The
>>> method "flush" alone does not convey this contract very well. Maybe a
>>> more
>>> explicit name like stopProcessingAndFlush (maybe only stopProcessing)
>>> would
>>> be better. Moreover, from the OutputStream.flush method, I would expect
>>> that I can call this method multiple times w/o changing the state of the
>>> stream. This is not the case here.
>>>
>>> Given that the stop processing and flush all results is such an essential
>>> lifecycle method of an operator/UDF, I am not sure whether we should
>>> offer
>>> it as an optional interface users can implement. The problem I see is
>>> that
>>> users are not aware of it when writing their own operators/UDFs. Making
>>> it
>>> part of the actual interfaces makes it more explicit and easier to
>>> discover. Maybe there is a way of adding it together with a default
>>> implementation, deprecating the other methods, and then eventually
>>> removing
>>> the old methods. The last step will break APIs, 

[jira] [Created] (FLINK-22948) Scala example for toDataStream does not compile

2021-06-09 Thread David Anderson (Jira)
David Anderson created FLINK-22948:
--

 Summary: Scala example for toDataStream does not compile
 Key: FLINK-22948
 URL: https://issues.apache.org/jira/browse/FLINK-22948
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Table SQL / API
Affects Versions: 1.13.1
Reporter: David Anderson


The scala example at 
[https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api/#examples-for-todatastream]
 does not compile – {{User.class}} should be {{classOf[User]}}.

It would also be better to show the table DDL as

{{tableEnv.executeSql(}}
{{  """}}
{{  CREATE TABLE GeneratedTable (}}
{{    name STRING,}}
{{    score INT,}}
{{    event_time TIMESTAMP_LTZ(3),}}
{{    WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND}}
{{  )}}
{{  WITH ('connector'='datagen')}}
{{  """}}
{{)}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] Watermark propagation with Sink API

2021-06-09 Thread Piotr Nowojski
Thanks for driving this Eron, and sorry for causing the delay.

+1 (binding) from my side

Piotrek

wt., 8 cze 2021 o 23:48 Eron Wright 
napisał(a):

> Voting is re-open for FLIP-167 as-is (without idleness support as was the
> point of contention).
>
> On Fri, Jun 4, 2021 at 10:45 AM Eron Wright 
> wrote:
>
> > Little update on this, more good discussion over the last few days, and
> > the FLIP will probably be amended to incorporate idleness.   Voting will
> > remain open until, let's say, mid-next week.
> >
> > On Thu, Jun 3, 2021 at 8:00 AM Piotr Nowojski 
> > wrote:
> >
> >> I would like to ask you to hold on with counting the votes until I get
> an
> >> answer for my one question in the dev mailing list thread (sorry if it
> was
> >> already covered somewhere).
> >>
> >> Best, Piotrek
> >>
> >> czw., 3 cze 2021 o 16:12 Jark Wu  napisał(a):
> >>
> >> > +1 (binding)
> >> >
> >> > Best,
> >> > Jark
> >> >
> >> > On Thu, 3 Jun 2021 at 21:34, Dawid Wysakowicz  >
> >> > wrote:
> >> >
> >> > > +1 (binding)
> >> > >
> >> > > Best,
> >> > >
> >> > > Dawid
> >> > >
> >> > > On 03/06/2021 03:50, Zhou, Brian wrote:
> >> > > > +1 (non-binding)
> >> > > >
> >> > > > Thanks Eron, looking forward to seeing this feature soon.
> >> > > >
> >> > > > Thanks,
> >> > > > Brian
> >> > > >
> >> > > > -Original Message-
> >> > > > From: Arvid Heise 
> >> > > > Sent: Wednesday, June 2, 2021 15:44
> >> > > > To: dev
> >> > > > Subject: Re: [VOTE] Watermark propagation with Sink API
> >> > > >
> >> > > >
> >> > > > [EXTERNAL EMAIL]
> >> > > >
> >> > > > +1 (binding)
> >> > > >
> >> > > > Thanks Eron for driving this effort; it will open up new exciting
> >> use
> >> > > cases.
> >> > > >
> >> > > > On Tue, Jun 1, 2021 at 7:17 PM Eron Wright <
> ewri...@streamnative.io
> >> > > .invalid>
> >> > > > wrote:
> >> > > >
> >> > > >> After some good discussion about how to enhance the Sink API to
> >> > > >> process watermarks, I believe we're ready to proceed with a vote.
> >> > > >> Voting will be open until at least Friday, June 4th, 2021.
> >> > > >>
> >> > > >> Reference:
> >> > > >>
> >> > > >>
> >> >
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/displa
> >> > > >>
> >> y/FLINK/FLIP-167*3A*Watermarks*for*Sink*API__;JSsrKys!!LpKI!zkBBhuqEEi
> >> > > >> fxF_fDQqAjqsbuWWFmnAvwRfEAWxeC63viFWXPLul-GCBb-PTq$
> >> > > >> [cwiki[.]apache[.]org]
> >> > > >>
> >> > > >> Discussion thread:
> >> > > >>
> >> > > >>
> >> >
> https://urldefense.com/v3/__https://lists.apache.org/thread.html/r5194
> >> > > >>
> >> e1cf157d1fd5ba7ca9b567cb01723bd582aa12dda57d25bca37e*40*3Cdev.flink.ap
> >> > > >> ache.org
> >> > *3E__;JSUl!!LpKI!zkBBhuqEEifxF_fDQqAjqsbuWWFmnAvwRfEAWxeC63viF
> >> > > >> WXPLul-GJXlxwqN$ [lists[.]apache[.]org]
> >> > > >>
> >> > > >> Implementation Issue:
> >> > > >>
> >> >
> https://urldefense.com/v3/__https://issues.apache.org/jira/browse/FLIN
> >> > > >>
> >> K-22700__;!!LpKI!zkBBhuqEEifxF_fDQqAjqsbuWWFmnAvwRfEAWxeC63viFWXPLul-G
> >> > > >> N6AJm4h$ [issues[.]apache[.]org]
> >> > > >>
> >> > > >> Thanks,
> >> > > >> Eron Wright
> >> > > >> StreamNative
> >> > > >>
> >> > >
> >> > >
> >> >
> >>
> >
>


[jira] [Created] (FLINK-22947) Reduce DataSet visibility in documentation

2021-06-09 Thread Seth Wiesman (Jira)
Seth Wiesman created FLINK-22947:


 Summary: Reduce DataSet visibility in documentation
 Key: FLINK-22947
 URL: https://issues.apache.org/jira/browse/FLINK-22947
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Seth Wiesman
Assignee: Seth Wiesman
 Fix For: 1.14.0


Now that the legacy planner has been dropped and bounded datastream execution 
has landed, the primary remaining use cases for DataSet are gone however it is 
still featured prominently in our documentation.

 

In particular, it is listed second under Application Development, below 
DataStream but above table and Python. Additionally, Application Development 
has top-level dropdowns for serialization and managing execution even though 
these sections are only relevant for JVM DataStream and DataSet.

 

We should

1) move dataset to the bottom of the list and add a legacy label

2) move serialization to be a subsection of datastream

3) evaluate which pages under managing execution are still relevant and see if 
there are more appropriate places to put them.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22946) Network buffer deadlock introduced by unaligned checkpoint

2021-06-09 Thread Guokuai Huang (Jira)
Guokuai Huang created FLINK-22946:
-

 Summary: Network buffer deadlock introduced by unaligned checkpoint
 Key: FLINK-22946
 URL: https://issues.apache.org/jira/browse/FLINK-22946
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.13.1, 1.13.0
Reporter: Guokuai Huang
 Attachments: Screen Shot 2021-06-09 at 6.39.47 PM.png, Screen Shot 
2021-06-09 at 7.02.04 PM.png

We recently encountered deadlock when using unaligned checkpoint. Below are two 
thread stacks that cause deadlock:
```
"Channel state writer Join(xx) (34/256)#1":"Channel state writer 
Join(xx) (34/256)#1": at 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager.notifyBufferAvailable(BufferManager.java:296)
 - waiting to lock <0x0007296dfa90> (a 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.fireBufferAvailableNotification(LocalBufferPool.java:507)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:494)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:460)
 at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
 at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
 at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
 at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
 at 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue.addExclusiveBuffer(BufferManager.java:399)
 at 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:200)
 - locked <0x0007296bc450> (a 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
 at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
 at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
 at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
 at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.write(ChannelStateCheckpointWriter.java:173)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.writeInput(ChannelStateCheckpointWriter.java:131)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$write$0(ChannelStateWriteRequest.java:63)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest$$Lambda$785/722492780.accept(Unknown
 Source) at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$buildWriteRequest$2(ChannelStateWriteRequest.java:93)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest$$Lambda$786/1360749026.accept(Unknown
 Source) at 
org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequest.execute(ChannelStateWriteRequest.java:212)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:82)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:59)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:96)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:75)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl$$Lambda$253/502209879.run(Unknown
 Source) at java.lang.Thread.run(Thread.java:745)"Join(xx) (34/256)#1": at 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager.notifyBufferAvailable(BufferManager.java:296)
 - waiting to lock <0x0007296bc450> (a 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.fireBufferAvailableNotification(LocalBufferPool.java:507)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:494)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:460)
 at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
 at 

Re: [DISCUSS] Dashboard/HistoryServer authentication

2021-06-09 Thread Till Rohrmann
Thanks for the update Gabor. I'll take a look and respond in the document.

Cheers,
Till

On Wed, Jun 9, 2021 at 12:59 PM Gabor Somogyi 
wrote:

> Hi Till,
>
> Your proxy suggestion has been considered in-depth and updated the FLIP
> accordingly.
> We've considered 2 proxy implementation (Nginx and Squid) but according to
> our analysis and testing it's not suitable for the mentioned use-cases.
> Please take a look at the rejected alternatives for detailed explanation.
>
> Thanks for your time in advance!
>
> BR,
> G
>
>
> On Fri, Jun 4, 2021 at 3:31 PM Till Rohrmann  wrote:
>
>> As I've said I am not a security expert and that's why I have to ask for
>> clarification, Gabor. You are saying that if we configure a truststore for
>> the REST endpoint with a single trusted certificate which has been
>> generated by the operator of the Flink cluster, then the attacker can
>> generate a new certificate, sign it and then talk to the Flink cluster if
>> he has access to the node on which the REST endpoint runs? My understanding
>> was that you need the corresponding private key which in my proposed setup
>> would be under the control of the operator as well (e.g. stored in a
>> keystore on the same machine but guarded by some secret). That way (if I am
>> not mistaken), only the entity which has access to the keystore is able to
>> talk to the Flink cluster.
>>
>> Maybe we are also getting our wires crossed here and are talking about
>> different things.
>>
>> Thanks for listing the pros and cons of Kerberos. Concerning what other
>> authentication mechanisms are used in the industry, I am not 100% sure.
>>
>> Cheers,
>> Till
>>
>> On Fri, Jun 4, 2021 at 11:09 AM Gabor Somogyi 
>> wrote:
>>
>>> > I did not mean for the user to sign its own certificates but for the
>>> operator of the cluster. Once the user request hits the proxy, it should no
>>> longer be under his control. I think I do not fully understand yet why this
>>> would not work.
>>> I said it's not solving the authentication problem over any proxy. Even
>>> if the operator is signing the certificate one can have access to an
>>> internal node.
>>> Such case anybody can craft certificates which is accepted by the
>>> server. When it's accepted a bad guy can cancel jobs causing huge impacts.
>>>
>>> > Also, I am missing a bit the comparison of Kerberos to other
>>> authentication mechanisms and why they were rejected in favour of Kerberos.
>>> PROS:
>>> * Since it's not depending on cloud provider and/or k8s or bare-metal
>>> etc. deployment it's the biggest plus
>>> * Centralized with tools and no need to write tons of tools around
>>> * There are clients/tools on almost all OS-es and several languages
>>> * Super huge users are using it for years in production w/o huge issues
>>> * Provides cross-realm trust possibility amongst other features
>>> * Several open source components using it which could increase
>>> compatibility
>>>
>>> CONS:
>>> * Not everybody using kerberos
>>> * It would increase the code footprint but this is true for many
>>> features (as a side note I'm here to maintain it)
>>>
>>> Feel free to add your points because it only represents a single
>>> viewpoint.
>>> Also if you have any better option for strong authentication please
>>> share it and we can consider the pros/cons here.
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Fri, Jun 4, 2021 at 10:32 AM Till Rohrmann 
>>> wrote:
>>>
 I did not mean for the user to sign its own certificates but for the
 operator of the cluster. Once the user request hits the proxy, it should no
 longer be under his control. I think I do not fully understand yet why this
 would not work.

 What I would like to avoid is to add more complexity into Flink if
 there is an easy solution which fulfills the requirements. That's why I
 would like to exercise thoroughly through the different alternatives. Also,
 I am missing a bit the comparison of Kerberos to other authentication
 mechanisms and why they were rejected in favour of Kerberos.

 Cheers,
 Till

 On Fri, Jun 4, 2021 at 10:26 AM Gyula Fóra  wrote:

> Hi!
>
> I think there might be possible alternatives but it seems Kerberos on
> the rest endpoint ticks all the right boxes and provides a super clean and
> simple solution for strong authentication.
>
> I wouldn’t even consider sidecar proxies etc if we can solve it in
> such a simple way as proposed by G.
>
> Cheers
> Gyula
>
> On Fri, 4 Jun 2021 at 10:03, Till Rohrmann 
> wrote:
>
>> I am not saying that we shouldn't add a strong authentication
>> mechanism if there are good reasons for it. I primarily would like to
>> understand the context a bit better in order to give qualified feedback 
>> and
>> come to a good decision. In order to do this, I have the feeling that we
>> haven't fully considered all available options which are on the table, 
>> tbh.

Re: [DISCUSS][Statebackend][Runtime] Changelog Statebackend Configuration Proposal

2021-06-09 Thread Yuan Mei
Thank you everyone for replying!

Option 3 wins with dominating # of votes + mine.

This option works as a refined version of the original proposal in
FLIP-158: Generalized incremental checkpoints [1]:
  - Define consistent override and combination policy (flag + state
backend) in different config levels
  - Define explicitly the meaning of "enable flag" = true/false/unset
  - Hide ChangelogStateBackend from users

According to the discussion in this thread, we will go with
Option 3: Enable Changelog Statebackend through a Boolean Flag + W/O
ChangelogStateBackend Exposed

 [1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-158%3A+Generalized+incremental+checkpoints

Best
Yuan

On Tue, Jun 8, 2021 at 6:40 PM Yu Li  wrote:

> +1 for option 3.
>
> IMHO persisting (operator's) state data through change log is an
> independent mechanism which could co-work with all kinds of local state
> stores (heap and rocksdb). This mechanism is similar to the WAL
> (write-ahead-log) mechanism in the database system. Although implement-wise
> we're using wrapper (decorator) pattern and naming it as
> `ChangeLogStateBackend`, it's not really another type of state backend. For
> the same reason, ChangeLogStateBackend should be an internal class and not
> exposed to the end user. Users only need to know / control whether to
> enable change log or not, just like whether to enable WAL in the
> traditional database system.
>
> Thanks.
>
> Best Regards,
> Yu
>
>
> On Thu, 3 Jun 2021 at 22:50, Piotr Nowojski  wrote:
>
> > Hi,
> >
> > I would actually prefer option 6 (or 5/4), for the sake of configuration
> > being explicit and self explanatory. But at the same time I don't have
> very
> > hard preferences and from the remaining options, option 3 seems the most
> > reasonable.
> >
> > The question would be, do we want to expose to the users that
> > ChangeLogStateBackend is wrapping an inner state backend or not? If not,
> > option 3 is the best. If we do, if we want to teach the users and help
> them
> > build the understanding of how things are working underneath, option 5
> or 6
> > are better.
> >
> > Best,
> > Piotrek
> >
> > śr., 2 cze 2021 o 04:36 Yun Tang  napisał(a):
> >
> > > Hi Yuan, thanks for launching this discussion.
> > >
> > > I prefer option-3 as this is the easiest to understand for users.
> > >
> > >
> > > Best
> > > Yun Tang
> > > 
> > > From: Roman Khachatryan 
> > > Sent: Monday, May 31, 2021 16:53
> > > To: dev 
> > > Subject: Re: [DISCUSS][Statebackend][Runtime] Changelog Statebackend
> > > Configuration Proposal
> > >
> > > Hey Yuan, thanks for the proposal
> > >
> > > I think Option 3 is the simplest to use and exposes less details than
> any
> > > other.
> > > It's also consistent with the current way of configuring state
> > > backends, as long as we treat change logging as a common feature
> > > applicable to any state backend, like e.g.
> > > state.backend.local-recovery.
> > >
> > > Option 6 seems slightly less preferable as it exposes more details but
> > > I think is the most viable alternative.
> > >
> > > Regards,
> > > Roman
> > >
> > >
> > > On Mon, May 31, 2021 at 8:39 AM Yuan Mei 
> wrote:
> > > >
> > > > Hey all,
> > > >
> > > > We would like to start a discussion on how to enable/config Changelog
> > > > Statebakcend.
> > > >
> > > > As part of FLIP-158[1], Changelog state backend wraps on top of
> > existing
> > > > state backend (HashMapStateBackend, EmbeddedRocksDBStateBackend and
> may
> > > > expect more) and delegates state changes to the underlying state
> > > backends.
> > > > This thread is to discuss the problem of how Changelog StateBackend
> > > should
> > > > be enabled and configured.
> > > >
> > > > Proposed options to enable/config state changelog is listed below:
> > > >
> > > > Option 1: Enable Changelog Statebackend through a Boolean Flag
> > > >
> > > > Option 2: Enable Changelog Statebackend through a Boolean Flag + a
> > > Special
> > > > Case
> > > >
> > > > Option 3: Enable Changelog Statebackend through a Boolean Flag + W/O
> > > > ChangelogStateBackend Exposed
> > > >
> > > > Option 4: Explicit Nested Configuration + “changelog.inner” prefix
> for
> > > > inner backend
> > > >
> > > > Option 5: Explicit Nested Configuration + inner state backend
> > > configuration
> > > > unchanged
> > > >
> > > > Option 6: Config Changelog and Inner Statebackend All-Together
> > > >
> > > > Details of each option can be found here:
> > > >
> > >
> >
> https://docs.google.com/document/d/13AaCf5fczYTDHZ4G1mgYL685FqbnoEhgo0cdwuJlZmw/edit?usp=sharing
> > > >
> > > > When considering these options, please consider these four
> dimensions:
> > > > 1 Consistency
> > > > API/config should follow a consistent model and should not have
> > > > contradicted logic beneath
> > > > 2 Simplicity
> > > > API should be easy to use and not introduce too much burden on users
> > > > 3. Explicity
> > > > API/config should not contain implicit assumptions and 

[jira] [Created] (FLINK-22945) StackOverflowException can happen when a large scale job is CANCELED/FAILED

2021-06-09 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-22945:
---

 Summary: StackOverflowException can happen when a large scale job 
is CANCELED/FAILED
 Key: FLINK-22945
 URL: https://issues.apache.org/jira/browse/FLINK-22945
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.12.4, 1.13.1
Reporter: Zhu Zhu


The pending requests in ExecutionSlotAllocator are not cleared when a job 
transitions to CANCELING or FAILING, while all vertices will be canceled and 
assigned slot will be returned. The returned slot is possible to be used to 
fulfill the pending request of a CANCELED vertex and the assignment will fail 
immediately and the slot will be returned and used to fulfilled another vertex 
in a recursive way. StackOverflow can happen in this way when there are many 
vertices. A sample call stack is attached below.
To fix this problem, we should clear the pending requests in 
ExecutionSlotAllocator when a job is CANCELING or FAILING. However, I think 
it's better to improve the call stack of slot assignment to avoid similar 
StackOverflowException to occur.


...
at 
org.apache.flink.runtime.scheduler.SharedSlot.returnLogicalSlot(SharedSlot.java:234)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.lambda$returnSlotToOwner$0(SingleLogicalSlot.java:203)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) 
~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717) 
~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010) 
~[?:1.8.0_102]
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.returnSlotToOwner(SingleLogicalSlot.java:200)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.releaseSlot(SingleLogicalSlot.java:130)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.releaseSlotIfPresent(DefaultScheduler.java:533)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$8(DefaultScheduler.java:512)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
 ~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
~[?:1.8.0_102]
at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
~[?:1.8.0_102]
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.fulfill(DeclarativeSlotPoolBridge.java:552)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequestSlotMatching.fulfillPendingRequest(DeclarativeSlotPoolBridge.java:587)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.newSlotsAreAvailable(DeclarativeSlotPoolBridge.java:171)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.lambda$freeReservedSlot$0(DefaultDeclarativeSlotPool.java:316)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_102]
at 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeReservedSlot(DefaultDeclarativeSlotPool.java:313)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.releaseSlot(DeclarativeSlotPoolBridge.java:335)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl.cancelSlotRequest(PhysicalSlotProviderImpl.java:112)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.releaseSharedSlot(SlotSharingExecutionSlotAllocator.java:242)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 
org.apache.flink.runtime.scheduler.SharedSlot.releaseExternally(SharedSlot.java:281)
 ~[flink-dist_2.11-1.13-vvr-4.0-SNAPSHOT.jar:1.13-vvr-4.0-SNAPSHOT]
at 

Re: [DISCUSS] Dashboard/HistoryServer authentication

2021-06-09 Thread Gabor Somogyi
Hi Till,

Your proxy suggestion has been considered in-depth and updated the FLIP
accordingly.
We've considered 2 proxy implementation (Nginx and Squid) but according to
our analysis and testing it's not suitable for the mentioned use-cases.
Please take a look at the rejected alternatives for detailed explanation.

Thanks for your time in advance!

BR,
G


On Fri, Jun 4, 2021 at 3:31 PM Till Rohrmann  wrote:

> As I've said I am not a security expert and that's why I have to ask for
> clarification, Gabor. You are saying that if we configure a truststore for
> the REST endpoint with a single trusted certificate which has been
> generated by the operator of the Flink cluster, then the attacker can
> generate a new certificate, sign it and then talk to the Flink cluster if
> he has access to the node on which the REST endpoint runs? My understanding
> was that you need the corresponding private key which in my proposed setup
> would be under the control of the operator as well (e.g. stored in a
> keystore on the same machine but guarded by some secret). That way (if I am
> not mistaken), only the entity which has access to the keystore is able to
> talk to the Flink cluster.
>
> Maybe we are also getting our wires crossed here and are talking about
> different things.
>
> Thanks for listing the pros and cons of Kerberos. Concerning what other
> authentication mechanisms are used in the industry, I am not 100% sure.
>
> Cheers,
> Till
>
> On Fri, Jun 4, 2021 at 11:09 AM Gabor Somogyi 
> wrote:
>
>> > I did not mean for the user to sign its own certificates but for the
>> operator of the cluster. Once the user request hits the proxy, it should no
>> longer be under his control. I think I do not fully understand yet why this
>> would not work.
>> I said it's not solving the authentication problem over any proxy. Even
>> if the operator is signing the certificate one can have access to an
>> internal node.
>> Such case anybody can craft certificates which is accepted by the server.
>> When it's accepted a bad guy can cancel jobs causing huge impacts.
>>
>> > Also, I am missing a bit the comparison of Kerberos to other
>> authentication mechanisms and why they were rejected in favour of Kerberos.
>> PROS:
>> * Since it's not depending on cloud provider and/or k8s or bare-metal
>> etc. deployment it's the biggest plus
>> * Centralized with tools and no need to write tons of tools around
>> * There are clients/tools on almost all OS-es and several languages
>> * Super huge users are using it for years in production w/o huge issues
>> * Provides cross-realm trust possibility amongst other features
>> * Several open source components using it which could increase
>> compatibility
>>
>> CONS:
>> * Not everybody using kerberos
>> * It would increase the code footprint but this is true for many features
>> (as a side note I'm here to maintain it)
>>
>> Feel free to add your points because it only represents a single
>> viewpoint.
>> Also if you have any better option for strong authentication please share
>> it and we can consider the pros/cons here.
>>
>> BR,
>> G
>>
>>
>> On Fri, Jun 4, 2021 at 10:32 AM Till Rohrmann 
>> wrote:
>>
>>> I did not mean for the user to sign its own certificates but for the
>>> operator of the cluster. Once the user request hits the proxy, it should no
>>> longer be under his control. I think I do not fully understand yet why this
>>> would not work.
>>>
>>> What I would like to avoid is to add more complexity into Flink if there
>>> is an easy solution which fulfills the requirements. That's why I would
>>> like to exercise thoroughly through the different alternatives. Also, I am
>>> missing a bit the comparison of Kerberos to other authentication mechanisms
>>> and why they were rejected in favour of Kerberos.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Jun 4, 2021 at 10:26 AM Gyula Fóra  wrote:
>>>
 Hi!

 I think there might be possible alternatives but it seems Kerberos on
 the rest endpoint ticks all the right boxes and provides a super clean and
 simple solution for strong authentication.

 I wouldn’t even consider sidecar proxies etc if we can solve it in such
 a simple way as proposed by G.

 Cheers
 Gyula

 On Fri, 4 Jun 2021 at 10:03, Till Rohrmann 
 wrote:

> I am not saying that we shouldn't add a strong authentication
> mechanism if there are good reasons for it. I primarily would like to
> understand the context a bit better in order to give qualified feedback 
> and
> come to a good decision. In order to do this, I have the feeling that we
> haven't fully considered all available options which are on the table, 
> tbh.
>
> Does the problem of certificate expiry also apply for self-signed
> certificates? If yes, then this should then also be a problem for the
> internal encryption of Flink's communication. If not, then one could use
> self-signed certificates with a 

[jira] [Created] (FLINK-22944) Optimize writing state changes

2021-06-09 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-22944:
-

 Summary: Optimize writing state changes
 Key: FLINK-22944
 URL: https://issues.apache.org/jira/browse/FLINK-22944
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 1.14.0
Reporter: Roman Khachatryan
 Fix For: 1.14.0


An (incremental) improvement over the existing way of writing state changes, 
which writes metadata name (UTF-8) and keygroup for each change. Please see 
https://github.com/apache/flink/pull/16035#discussion_r642614515 for more 
details.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-06-09 Thread Arvid Heise
Hi Dawid,

I see your point. I'd probably add drain only to Rich*Function where we
have the type bounds. Then we still need your Flushable interface in
Rich*Function<..., T> to call it efficiently but we at least avoid weird
type combinations. I'll have a rethink later.

The proper solution is probably to add  to RichFunction and use Void
for RichSinkFunction but I'll have to understand the implications first.

On Wed, Jun 9, 2021 at 11:37 AM Dawid Wysakowicz 
wrote:

> Hey,
>
> @Arvid The problem with adding the "drain/flush/stopProcessing" method to
> RichFunction is that it is not typed with the output type. At the same time
> we would most likely need a way to emit records from the method. That's
> originally thought about adding a typed interface which honestly I don't
> like that much either.
>
> On the UDF level we do not need to deprecate anything as you said. The
> close there already works as dispose on the Operator level. What we are
> suggesting is to unify that on the Operator level and deprecate the dispose
> there. @Yun I think we can already do that. We can either try to handle
> exceptions from the close in the case of a failure or just break it as it
> is a low-level, mostly internal API as Arvid said and also the migration
> would be really easy there.
>
> @Till @Arvid I am open for suggestions about the naming. I like the
> "drain" method.
>
> For now I'd go with @Piotr's proposal to add the "drain" method only to
> the SinkFunction. We think they are not immediately necessary for any of
> the other UDFs.
>
> Best,
>
> Dawid
> On 09/06/2021 11:20, Arvid Heise wrote:
>
> I have not followed the complete discussion and can't comment on the
> concepts. However, I have some ideas on the API changes:
>
> 1. If it's about adding additional life-cycle methods to UDFs, we should
> add the flush/endOfInput to RichFunction as this is the current way to
> define it. At this point, I don't see the need to add/change anything for
> UDFs. Since RichFunction does not have a dispose, do we even need to
> deprecate anything on UDF level? This would avoid having a new interface
> Flushable altogether (of which I'm not a big fan, see Piot's mail)
>
> 2. Further, I'd like to propose drain instead of flush as it would more
> align with the current nomenclature and makes the intent more obvious.
> However, that only works if there is no clash, so please double-check.
>
> 3. About changing methods on Operators: I'd say go ahead. It's
> experimental and not too hard to adjust on the user side. I also like the
> idea of beefing up ProcessFunction as a full replacement to custom
> operators but I'd keep that effort separate.
>
> On Wed, Jun 9, 2021 at 9:38 AM Till Rohrmann  wrote:
>
>> Thanks for the lively discussion everyone. I have to admit that I am not
>> really convinced that we should call the interface Flushable and the
>> method
>> flush. The problem is that this method should in the first place tell the
>> operator that it should stop processing and flush all buffered data. The
>> method "flush" alone does not convey this contract very well. Maybe a more
>> explicit name like stopProcessingAndFlush (maybe only stopProcessing)
>> would
>> be better. Moreover, from the OutputStream.flush method, I would expect
>> that I can call this method multiple times w/o changing the state of the
>> stream. This is not the case here.
>>
>> Given that the stop processing and flush all results is such an essential
>> lifecycle method of an operator/UDF, I am not sure whether we should offer
>> it as an optional interface users can implement. The problem I see is that
>> users are not aware of it when writing their own operators/UDFs. Making it
>> part of the actual interfaces makes it more explicit and easier to
>> discover. Maybe there is a way of adding it together with a default
>> implementation, deprecating the other methods, and then eventually
>> removing
>> the old methods. The last step will break APIs, though :-(
>>
>> Cheers,
>> Till
>>
>> On Tue, Jun 8, 2021 at 6:27 PM Piotr Nowojski 
>> wrote:
>>
>> > Hi,
>> >
>> > Thanks for resuming this discussion. I think +1 for the proposal of
>> > dropping (deprecating) `dispose()`, and adding `flush()` to the
>> > `StreamOperator`/udfs. Semantically it would be more like new `close()`
>> is
>> > an equivalent of old `dispose()`. Old `close()` is an equivalent of new
>> > `flush() + close()`. I think it provides a relatively painless migration
>> > path (could we write down this migration?).
>> >
>> > However I have some doubts about the Flushable interface. First of
>> all,
>> > it wouldn't work for sinks - sinks have no output. Secondly, I don't
>> like
>> > that it opens a possibility for problems like this (incompatible output
>> > types):
>> > ```
>> > public class MyMap implements MapFunction,
>> Flushable
>> > { ...}
>> > ```
>> >
>> > Also after a quick offline discussion with Dawid, I'm not sure anymore
>> to
>> > which UDFs it actually makes sense to add 

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-06-09 Thread Dawid Wysakowicz
Hey,

@Arvid The problem with adding the "drain/flush/stopProcessing" method
to RichFunction is that it is not typed with the output type. At the
same time we would most likely need a way to emit records from the
method. That's originally thought about adding a typed interface which
honestly I don't like that much either.

On the UDF level we do not need to deprecate anything as you said. The
close there already works as dispose on the Operator level. What we are
suggesting is to unify that on the Operator level and deprecate the
dispose there. @Yun I think we can already do that. We can either try to
handle exceptions from the close in the case of a failure or just break
it as it is a low-level, mostly internal API as Arvid said and also the
migration would be really easy there.

@Till @Arvid I am open for suggestions about the naming. I like the
"drain" method.

For now I'd go with @Piotr's proposal to add the "drain" method only to
the SinkFunction. We think they are not immediately necessary for any of
the other UDFs.

Best,

Dawid

On 09/06/2021 11:20, Arvid Heise wrote:
> I have not followed the complete discussion and can't comment on the
> concepts. However, I have some ideas on the API changes:
>
> 1. If it's about adding additional life-cycle methods to UDFs, we
> should add the flush/endOfInput to RichFunction as this is the current
> way to define it. At this point, I don't see the need to add/change
> anything for UDFs. Since RichFunction does not have a dispose, do we
> even need to deprecate anything on UDF level? This would avoid having
> a new interface Flushable altogether (of which I'm not a big fan, see
> Piot's mail)
>
> 2. Further, I'd like to propose drain instead of flush as it would
> more align with the current nomenclature and makes the intent more
> obvious. However, that only works if there is no clash, so please
> double-check.
>
> 3. About changing methods on Operators: I'd say go ahead. It's
> experimental and not too hard to adjust on the user side. I also like
> the idea of beefing up ProcessFunction as a full replacement to custom
> operators but I'd keep that effort separate.
>
> On Wed, Jun 9, 2021 at 9:38 AM Till Rohrmann  > wrote:
>
> Thanks for the lively discussion everyone. I have to admit that I
> am not
> really convinced that we should call the interface Flushable and
> the method
> flush. The problem is that this method should in the first place
> tell the
> operator that it should stop processing and flush all buffered
> data. The
> method "flush" alone does not convey this contract very well.
> Maybe a more
> explicit name like stopProcessingAndFlush (maybe only
> stopProcessing) would
> be better. Moreover, from the OutputStream.flush method, I would
> expect
> that I can call this method multiple times w/o changing the state
> of the
> stream. This is not the case here.
>
> Given that the stop processing and flush all results is such an
> essential
> lifecycle method of an operator/UDF, I am not sure whether we
> should offer
> it as an optional interface users can implement. The problem I see
> is that
> users are not aware of it when writing their own operators/UDFs.
> Making it
> part of the actual interfaces makes it more explicit and easier to
> discover. Maybe there is a way of adding it together with a default
> implementation, deprecating the other methods, and then eventually
> removing
> the old methods. The last step will break APIs, though :-(
>
> Cheers,
> Till
>
> On Tue, Jun 8, 2021 at 6:27 PM Piotr Nowojski
> mailto:pnowoj...@apache.org>> wrote:
>
> > Hi,
> >
> > Thanks for resuming this discussion. I think +1 for the proposal of
> > dropping (deprecating) `dispose()`, and adding `flush()` to the
> > `StreamOperator`/udfs. Semantically it would be more like new
> `close()` is
> > an equivalent of old `dispose()`. Old `close()` is an equivalent
> of new
> > `flush() + close()`. I think it provides a relatively painless
> migration
> > path (could we write down this migration?).
> >
> > However I have some doubts about the Flushable interface.
> First of all,
> > it wouldn't work for sinks - sinks have no output. Secondly, I
> don't like
> > that it opens a possibility for problems like this (incompatible
> output
> > types):
> > ```
> > public class MyMap implements MapFunction,
> Flushable
> > { ...}
> > ```
> >
> > Also after a quick offline discussion with Dawid, I'm not sure
> anymore to
> > which UDFs it actually makes sense to add `flush`, as most of them
> > shouldn't buffer any data. Apart from Sinks, it's usually an
> operator that
> > is buffering the data (that holds true for AsyncFunction,
> ReduceFunction,
> > AggregateFunction, 

[jira] [Created] (FLINK-22943) java.lang.ClassCastException: java.time.Instant cannot be cast to java.sql.Timestamp

2021-06-09 Thread jack wang (Jira)
jack wang created FLINK-22943:
-

 Summary: java.lang.ClassCastException: java.time.Instant cannot be 
cast to java.sql.Timestamp
 Key: FLINK-22943
 URL: https://issues.apache.org/jira/browse/FLINK-22943
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Affects Versions: 1.13.1
Reporter: jack wang


Before 3.1.2 of hive version, getQueryCurrentTimestamp return Timestamp. But 
when hive version is 3.1.2,getCurrentTSMethod invoke return Instant.  So the 
code `(Timestamp)getCurrentTSMethod.invoke(sessionState)` will result the 
ClassCastException. It should be compatibility with this situation.

when I use hive dialect to create hive table, it will tirgger this error. The 
error is below:

Exception in thread "main" java.lang.ClassCastException: java.time.Instant 
cannot be cast to java.sql.TimestampException in thread "main" 
java.lang.ClassCastException: java.time.Instant cannot be cast to 
java.sql.Timestamp at 
org.apache.flink.table.planner.delegation.hive.HiveParser.setCurrentTimestamp(HiveParser.java:365)
 at 
org.apache.flink.table.planner.delegation.hive.HiveParser.startSessionState(HiveParser.java:350)
 at 
org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:218)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:722)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-06-09 Thread Arvid Heise
I have not followed the complete discussion and can't comment on the
concepts. However, I have some ideas on the API changes:

1. If it's about adding additional life-cycle methods to UDFs, we should
add the flush/endOfInput to RichFunction as this is the current way to
define it. At this point, I don't see the need to add/change anything for
UDFs. Since RichFunction does not have a dispose, do we even need to
deprecate anything on UDF level? This would avoid having a new interface
Flushable altogether (of which I'm not a big fan, see Piot's mail)

2. Further, I'd like to propose drain instead of flush as it would more
align with the current nomenclature and makes the intent more obvious.
However, that only works if there is no clash, so please double-check.

3. About changing methods on Operators: I'd say go ahead. It's experimental
and not too hard to adjust on the user side. I also like the idea of
beefing up ProcessFunction as a full replacement to custom operators but
I'd keep that effort separate.

On Wed, Jun 9, 2021 at 9:38 AM Till Rohrmann  wrote:

> Thanks for the lively discussion everyone. I have to admit that I am not
> really convinced that we should call the interface Flushable and the method
> flush. The problem is that this method should in the first place tell the
> operator that it should stop processing and flush all buffered data. The
> method "flush" alone does not convey this contract very well. Maybe a more
> explicit name like stopProcessingAndFlush (maybe only stopProcessing) would
> be better. Moreover, from the OutputStream.flush method, I would expect
> that I can call this method multiple times w/o changing the state of the
> stream. This is not the case here.
>
> Given that the stop processing and flush all results is such an essential
> lifecycle method of an operator/UDF, I am not sure whether we should offer
> it as an optional interface users can implement. The problem I see is that
> users are not aware of it when writing their own operators/UDFs. Making it
> part of the actual interfaces makes it more explicit and easier to
> discover. Maybe there is a way of adding it together with a default
> implementation, deprecating the other methods, and then eventually removing
> the old methods. The last step will break APIs, though :-(
>
> Cheers,
> Till
>
> On Tue, Jun 8, 2021 at 6:27 PM Piotr Nowojski 
> wrote:
>
> > Hi,
> >
> > Thanks for resuming this discussion. I think +1 for the proposal of
> > dropping (deprecating) `dispose()`, and adding `flush()` to the
> > `StreamOperator`/udfs. Semantically it would be more like new `close()`
> is
> > an equivalent of old `dispose()`. Old `close()` is an equivalent of new
> > `flush() + close()`. I think it provides a relatively painless migration
> > path (could we write down this migration?).
> >
> > However I have some doubts about the Flushable interface. First of
> all,
> > it wouldn't work for sinks - sinks have no output. Secondly, I don't like
> > that it opens a possibility for problems like this (incompatible output
> > types):
> > ```
> > public class MyMap implements MapFunction,
> Flushable
> > { ...}
> > ```
> >
> > Also after a quick offline discussion with Dawid, I'm not sure anymore to
> > which UDFs it actually makes sense to add `flush`, as most of them
> > shouldn't buffer any data. Apart from Sinks, it's usually an operator
> that
> > is buffering the data (that holds true for AsyncFunction, ReduceFunction,
> > AggregateFunction, MapFunction, FilterFunction, ...). For those functions
> > it's difficult to buffer any data, as they have no means to control when
> to
> > emit this data. One notable exception might be (Co)ProcessFunction, as it
> > can register timers on it's own. In that case I would propose to do the
> > following thing:
> > 1. Add `flush() {}` to `Sink` function (FLIP-143 Writer interface already
> > has flush capabilities)
> > 2. Maybe add `flush(Collector)` to `(Co)ProcessFunction`, but maybe we
> > can postpone it
> > 3. Leave other functions alone.
> >
> > After all, we could add `flush()` to other functions in the future if we
> > really find a good motivating example to do so.
> >
> > About 2. Dawid is pitching an idea to convert `ProcessFunction` into a
> > proper `Public` API that would replace StreamOperator. We could change
> > `StreamOperator` to be purely `@Internal` class/interface, and add the
> > missing functionality to the `ProcessFunction` (InputSelectable,
> > BoundedInput, MailboxExecutor). With this, adding `flush()` to
> > `ProcessFunction` would make a lot of sense. But maybe that should be a
> > story for another day?
> >
> > Best,
> > Piotrek
> >
> > pt., 4 cze 2021 o 10:36 Yun Gao  napisał(a):
> >
> >> Hi all,
> >>
> >> Very thanks @Dawid for resuming the discussion and very thanks @Till for
> >> the summary ! (and very sorry for I missed the mail and do not response
> >> in time...)
> >>
> >> I also agree with that we could consider the global commits latter
> >> 

Re: State migration scenario's

2021-06-09 Thread Yun Tang
Hi Marlo,

One of the scenarios that we're trying to improve is to add or remove one field 
in state serializer.
Users might add or remove one field during their schema evolution, state 
processor could help it with another offline job while state migration could 
help it once we restart the new job.

Best
Yun Tang

From: Marlo Ploemen 
Sent: Wednesday, June 9, 2021 15:57
To: dev@flink.apache.org 
Subject: State migration scenario's

Hi community,

I am looking into the data migration and schema evolution process for stateful 
streaming jobs. Currently, there is no orchestration support for performing 
these job evolutions and no in-job state migration or schema evolution syntax 
(as this is part of the separate state processor API). I am looking for 
examples (e.g. Github repositories) or scenarios of stateful streaming jobs 
where the orchestration of their state evolution process can improve 
development quality.

Best, Marlo


[jira] [Created] (FLINK-22942) Disable upsert into syntax in Flink SQL

2021-06-09 Thread Leonard Xu (Jira)
Leonard Xu created FLINK-22942:
--

 Summary: Disable upsert into syntax in Flink SQL
 Key: FLINK-22942
 URL: https://issues.apache.org/jira/browse/FLINK-22942
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: Leonard Xu


I found we can write  *insert into* and *upsert into* in Flink SQL, but the 
later syntax's semantic and behavior is never discussed, currently they have 
same implementation.

We should disable the later one util we support  `*upsert into* ` with correct 
behavior.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


State migration scenario's

2021-06-09 Thread Marlo Ploemen
Hi community,
 
I am looking into the data migration and schema evolution process for stateful 
streaming jobs. Currently, there is no orchestration support for performing 
these job evolutions and no in-job state migration or schema evolution syntax 
(as this is part of the separate state processor API). I am looking for 
examples (e.g. Github repositories) or scenarios of stateful streaming jobs 
where the orchestration of their state evolution process can improve 
development quality.
 
Best, Marlo

[DISCUSS] FLIP-171: Async Sink

2021-06-09 Thread Hausmann, Steffen
Hi there,

We would like to start a discussion thread on "FLIP-171: Async Sink" [1], where 
we propose to create a common abstraction for destinations that support async 
requests. This abstraction will make it easier to add destinations to Flink by 
implementing a lightweight shim, while it avoids maintaining dozens of 
independent sinks.

Looking forward to your feedback.

Cheers, Steffen

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink



Amazon Web Services EMEA SARL
38 avenue John F. Kennedy, L-1855 Luxembourg
Sitz der Gesellschaft: L-1855 Luxemburg
eingetragen im Luxemburgischen Handelsregister unter R.C.S. B186284

Amazon Web Services EMEA SARL, Niederlassung Deutschland
Marcel-Breuer-Str. 12, D-80807 Muenchen
Sitz der Zweigniederlassung: Muenchen
eingetragen im Handelsregister des Amtsgerichts Muenchen unter HRB 242240, 
USt-ID DE317013094





Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-06-09 Thread Till Rohrmann
Thanks for the lively discussion everyone. I have to admit that I am not
really convinced that we should call the interface Flushable and the method
flush. The problem is that this method should in the first place tell the
operator that it should stop processing and flush all buffered data. The
method "flush" alone does not convey this contract very well. Maybe a more
explicit name like stopProcessingAndFlush (maybe only stopProcessing) would
be better. Moreover, from the OutputStream.flush method, I would expect
that I can call this method multiple times w/o changing the state of the
stream. This is not the case here.

Given that the stop processing and flush all results is such an essential
lifecycle method of an operator/UDF, I am not sure whether we should offer
it as an optional interface users can implement. The problem I see is that
users are not aware of it when writing their own operators/UDFs. Making it
part of the actual interfaces makes it more explicit and easier to
discover. Maybe there is a way of adding it together with a default
implementation, deprecating the other methods, and then eventually removing
the old methods. The last step will break APIs, though :-(

Cheers,
Till

On Tue, Jun 8, 2021 at 6:27 PM Piotr Nowojski  wrote:

> Hi,
>
> Thanks for resuming this discussion. I think +1 for the proposal of
> dropping (deprecating) `dispose()`, and adding `flush()` to the
> `StreamOperator`/udfs. Semantically it would be more like new `close()` is
> an equivalent of old `dispose()`. Old `close()` is an equivalent of new
> `flush() + close()`. I think it provides a relatively painless migration
> path (could we write down this migration?).
>
> However I have some doubts about the Flushable interface. First of all,
> it wouldn't work for sinks - sinks have no output. Secondly, I don't like
> that it opens a possibility for problems like this (incompatible output
> types):
> ```
> public class MyMap implements MapFunction, Flushable
> { ...}
> ```
>
> Also after a quick offline discussion with Dawid, I'm not sure anymore to
> which UDFs it actually makes sense to add `flush`, as most of them
> shouldn't buffer any data. Apart from Sinks, it's usually an operator that
> is buffering the data (that holds true for AsyncFunction, ReduceFunction,
> AggregateFunction, MapFunction, FilterFunction, ...). For those functions
> it's difficult to buffer any data, as they have no means to control when to
> emit this data. One notable exception might be (Co)ProcessFunction, as it
> can register timers on it's own. In that case I would propose to do the
> following thing:
> 1. Add `flush() {}` to `Sink` function (FLIP-143 Writer interface already
> has flush capabilities)
> 2. Maybe add `flush(Collector)` to `(Co)ProcessFunction`, but maybe we
> can postpone it
> 3. Leave other functions alone.
>
> After all, we could add `flush()` to other functions in the future if we
> really find a good motivating example to do so.
>
> About 2. Dawid is pitching an idea to convert `ProcessFunction` into a
> proper `Public` API that would replace StreamOperator. We could change
> `StreamOperator` to be purely `@Internal` class/interface, and add the
> missing functionality to the `ProcessFunction` (InputSelectable,
> BoundedInput, MailboxExecutor). With this, adding `flush()` to
> `ProcessFunction` would make a lot of sense. But maybe that should be a
> story for another day?
>
> Best,
> Piotrek
>
> pt., 4 cze 2021 o 10:36 Yun Gao  napisał(a):
>
>> Hi all,
>>
>> Very thanks @Dawid for resuming the discussion and very thanks @Till for
>> the summary ! (and very sorry for I missed the mail and do not response
>> in time...)
>>
>> I also agree with that we could consider the global commits latter
>> separately after we have addressed the final checkpoints, and also other
>> points as Till summarized.
>> Currently the only case that have used the cascade commit is the Table
>> FileSystem and Hive connectors. I checked the code and found currently they
>> will commit the
>> last piece of data directly  in endOfInput(). Although this might emit
>> repeat records if there are failover during job finishing, it avoids
>> emitting the records in the
>> notifyCheckpointComplete() after endOfInput(), thus the modification to
>> the operator lifecycle in final checkpoints would cause compatibility
>> problem for these connectors,
>> thus we do not need to modify them at the first place.
>>
>> 2. Regarding the operator lifecycle, I also agree with the proposed
>> changes. To sum up, I think the operator lifecycle would become
>>
>> endOfInput(1)
>> ...
>> endOfInput(n)
>> flush() --> call UDF's flush method
>> if some operator requires final checkpoints
>> snapshotState()
>> notifyCheckpointComplete()
>> end if
>> close() --> call UDF's close method
>>
>> Since currently the close() is only called in normal finish and dispose()
>> will be called in both failover and normal case, for compatibility, I think
>> we may
>> have to 

Re: [DISCUSS]FLIP-170 Adding Checkpoint Rejection Mechanism

2021-06-09 Thread Piotr Nowojski
Hi Senhong,

Thanks for the proposal. I have a couple of questions.

Have you seen
`org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource` (for
the legacy SourceFunction) and
`org.apache.flink.api.connector.source.ExternallyInducedSourceReader` (for
FLIP-27) interfaces? They work the other way around, by letting the source
to trigger/initiate a checkpoint, instead of declining it. Could it be made
to work in your use case? If not, can you explain why?

Regarding declining/failing the checkpoint (without blocking the barrier
waiting for snapshot availability), can not you achieve the same thing by a
combination of throwing an exception in for example
`org.apache.flink.api.connector.source.SourceReader#snapshotState` call and
setting the tolerable checkpoint failure number? [1]

Best, Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/environment/CheckpointConfig.html#setTolerableCheckpointFailureNumber-int-

śr., 9 cze 2021 o 09:11 Senhong Liu  napisał(a):

> Here is some brief context about the new feature.
>
> 1. Actively checkpoint rejecting by the operator. Follow by the current
> checkpoint mechanism, one more preliminary step is added to help the
> operator determine that if it is able to take snapshots. The preliminary
> step is a new API provided to the users/developers. The new API will be
> implemented in the Source API (the new one based on FLIP-27) for CDC
> implementation. The new API can also be implemented in other operator if
> necessary.
>
> 2. Handling the failure returned from the operator. If the checkpoint is
> rejected by the operator, an appropriate failure reason needs to be
> returned
> from the operator as well. In the current design, two failure reasons are
> listed, soft failure and hard failure. The previous one would be ignored by
> the Flink and the later one would be counted as continuous checkpoint
> failure according to the current checkpoint failure manager mechanism.
>
> 3. To prevent that the operator keeps reporting soft failure and therefore
> no checkpoint can be completed for a long time, we introduce a new
> configuration about the tolerable checkpoint failure timeout, which is a
> timer that starts with the checkpoint scheduler. Overall, the timer would
> only be reset if and only if the checkpoint completes. Otherwise, it would
> do nothing until the tolerable timeout is hit. If the timer rings, it would
> then trigger the current checkpoint failover.
>
> Question:
> a. According to the current design, the checkpoint might fail for a
> possibly
> long time with a large checkpoint interval, for example. Is there any
> better
> idea to make the checkpoint more likely to succeed? For example, trigger
> the
> checkpoint immediately after the last one is rejected. But it seems
> unappropriate because that would increase the overhead.
> b. Is there any better idea on handling the soft failure?
>
>
>
>
>
> --
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>


[jira] [Created] (FLINK-22941) support column comment in catalogTable column schema

2021-06-09 Thread peng wang (Jira)
peng wang created FLINK-22941:
-

 Summary: support column comment in catalogTable column schema
 Key: FLINK-22941
 URL: https://issues.apache.org/jira/browse/FLINK-22941
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Planner
Reporter: peng wang


we found that column comment is support in flink ddl syntax, but it was dropped 
when SqlCreateTable class convert to  CatalogTable class. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS]FLIP-170 Adding Checkpoint Rejection Mechanism

2021-06-09 Thread Senhong Liu
Here is some brief context about the new feature.

1. Actively checkpoint rejecting by the operator. Follow by the current
checkpoint mechanism, one more preliminary step is added to help the
operator determine that if it is able to take snapshots. The preliminary
step is a new API provided to the users/developers. The new API will be
implemented in the Source API (the new one based on FLIP-27) for CDC
implementation. The new API can also be implemented in other operator if
necessary.

2. Handling the failure returned from the operator. If the checkpoint is
rejected by the operator, an appropriate failure reason needs to be returned
from the operator as well. In the current design, two failure reasons are
listed, soft failure and hard failure. The previous one would be ignored by
the Flink and the later one would be counted as continuous checkpoint
failure according to the current checkpoint failure manager mechanism.

3. To prevent that the operator keeps reporting soft failure and therefore
no checkpoint can be completed for a long time, we introduce a new
configuration about the tolerable checkpoint failure timeout, which is a
timer that starts with the checkpoint scheduler. Overall, the timer would
only be reset if and only if the checkpoint completes. Otherwise, it would
do nothing until the tolerable timeout is hit. If the timer rings, it would
then trigger the current checkpoint failover. 

Question:
a. According to the current design, the checkpoint might fail for a possibly
long time with a large checkpoint interval, for example. Is there any better
idea to make the checkpoint more likely to succeed? For example, trigger the
checkpoint immediately after the last one is rejected. But it seems
unappropriate because that would increase the overhead.
b. Is there any better idea on handling the soft failure?





--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/


Re: [DISCUSS] Definition of idle partitions

2021-06-09 Thread Arvid Heise
Hi Eron,

again to recap from the other thread:
- You are right that idleness is correct with static assignment and fully
active partitions. In this case, the source defines idleness. (case A)
- For the more pressing use cases of idle, assigned partitions, the user
defines an idleness threshold, and it becomes potentially incorrect, when
the partition becomes active again. (case B)
- Same holds for dynamic assignment of splits. If a source without a split
gets a split assigned dynamically, there is a realistic chance that the
watermark advanced past the first record of the newly assigned split. (case
C)
You can certainly insist that only the first case is valid (as it's
correct) but we know that users use it in other ways and that was also the
intent of the devs.

Now the question could be if it makes sense to distinguish these cases.
Would you treat the idleness information differently (especially in the
sink/source that motivated FLIP-167) if you knew that the idleness is
guaranteed correct?
We could have some WatermarkStatus with ACTIVE, IDLE (case A), TIMEOUT
(case B).

However, that would still leave case C, which probably would need to be
solved completely differently. I could imagine that a source with dynamic
assignments should never have IDLE subtasks and rather manage the idleness
itself. For example, it could emit a watermark per second/minute that is
directly fetched from the source system. I'm just not sure if the current
WatermarkAssigner interface suffices in that regard...


On Wed, Jun 9, 2021 at 7:31 AM Piotr Nowojski 
wrote:

> Hi Eron,
>
> Can you elaborate a bit more what do you mean? I don’t understand what do
> you mean by more general solution.
>
> As of now, stream is marked idle by a source/watermark generator, which
> has an effect of temporarily ignoring this stream/partition from
> calculating min watermark in the downstream tasks. However stream is
> switching back to active when any record is emitted. This is what’s causing
> problems described by Arvid.
>
> The core of our proposal is very simple. Keep everything as it is except
> stating that stream will be changed back to active only once a watermark is
> emitted again - not record. In other words disconnecting idleness from
> presence of records and connecting it only to presence or lack of
> watermarks and allowing to emit records while “stream status” is “idle”
>
> Piotrek
>
>
> > Wiadomość napisana przez Eron Wright 
> w dniu 09.06.2021, o godz. 06:01:
> >
> > It seems to me that idleness was introduced to deal with a very specific
> > issue.  In the pipeline, watermarks are aggregated not on a per-split
> basis
> > but on a per-subtask basis.  This works well when each subtask has
> exactly
> > one split.  When a sub-task has multiple splits, various complications
> > occur involving the commingling of watermarks.  And when a sub-task has
> no
> > splits, the pipeline stalls altogether.  To deal with the latter problem,
> > idleness was introduced.  The sub-task simply declares itself to be idle
> to
> > be taken out of consideration for purposes of watermark aggregation.
> >
> > If we're looking for a more general solution, I would suggest we discuss
> > how to track watermarks on a per-split basis.  Or, as Till mentioned
> > recently, an alternate solution may be to dynamically adjust the
> > parallelism of the task.
> >
> > I don't agree with the notion that idleness involves a correctness
> > tradeoff.  The facility I described above has no impact on correctness.
> > Meanwhile, various watermark strategies rely on heuristics involving the
> > processing-time domain, and the term idleness seems to have found
> purchase
> > there too.  The connection among the concepts seems tenuous.
> >
> > -E
> >
> >
> >
> >> On Tue, Jun 8, 2021 at 8:58 AM Piotr Nowojski 
> wrote:
> >>
> >> Hi Arvid,
> >>
> >> Thanks for writing down this summary and proposal. I think this was the
> >> foundation of the disagreement in FLIP-167 discussion. Dawid was arguing
> >> that idleness is intermittent, strictly a task local concept and as such
> >> shouldn't be exposed in for example sinks. While me and Eron thought
> that
> >> it's a concept strictly connected to watermarks.
> >>
> >> 1. I'm big +1 for changing the StreamStatus definition to stream
> "providing
> >> watermark" and "not providing watermark". With respect to that I agree
> with
> >> Dawid that record bound idleness *(if we would ever need to
> define/expose
> >> it)* should be an intermittent concept, like for example the existing in
> >> the Task/runtime input availability (StreamTaskInput#isAvailable).
> >> 3. I agree that neither `StreamStatus` nor `IDLE` is a good name. But
> >> I also don't have any good ideas.
> >> `WatermarkStatus#WATERMARKING_PAUSED`? `#NO_WATERMARKS`?
> >>
> >> Best,
> >> Piotrek
> >>
> >> wt., 8 cze 2021 o 16:35 Arvid Heise  napisał(a):
> >>
> >>> Hi devs,
> >>>
> >>> While discussing "Watermark propagation with Sink API" and during
> >>> 

Re: Request to open the contributor permission!

2021-06-09 Thread Yun Tang
Hi hapihu,

Welcome to Apache Flink community!
You don't need to ask contributor permission for Flink JIRA issues now, and you 
could comment in the issue which you're interested to ask as to be assigned.

You could also find more details in [1]

[1] https://flink.apache.org/contributing/how-to-contribute.html

Best
Yun Tang




From: w.gh123 
Sent: Wednesday, June 9, 2021 0:53
To: dev 
Subject: Request to open the contributor permission!

Hi,


I want to contribute to Apache Flink. Would you please give me the 
contributor permission? My JIRA ID is hapihu


[jira] [Created] (FLINK-22940) Make SQL column max column widh configurable

2021-06-09 Thread Svend Vanderveken (Jira)
Svend Vanderveken created FLINK-22940:
-

 Summary: Make SQL column max column widh configurable
 Key: FLINK-22940
 URL: https://issues.apache.org/jira/browse/FLINK-22940
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Client
Affects Versions: 1.13.0
Reporter: Svend Vanderveken


When displaying results interactively with the Flink SQL client, each column is 
currently truncated based on its content type, up to a maximum of 30 
characters, which is controlled by the java constant [1].

In case some result to be displayed is too wide, a ~ is  appended a the end to 
indicate the truncation (actually happening in practice at position 25), as 
visible below:
 
 
{code:java}
 SELECT
   metadata.true_as_of_timestamp_millis,
   member_user_id
 FROM some_table
  
 true_as_of_timestamp_mil~member_user_id 
  1622811665919 45ca821f-c0fc-4114-bef8-~
  1622811665919 45ca821f-c0fc-4114-bef8-~
  1622118951005 b4734391-d3e1-417c-ad92-~
 {code}
 
I suggest to make this max width configurable, by adding a parameter that can 
be `SET` to [2].
 
I also suggest to make the default width wide enough s.t. 36 usable characters 
can be displayed, since UUID (which are 36 character longs when represented in 
text) are very
commonly used as identifiers, and therefore as column values.
This seems like a easy code update, if it's useful I'm happy to work on the 
implementation.

[1] 
[https://github.com/apache/flink/blob/6d8c02f90a5a3054015f2f1ee83be821d925ccd1/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java#L74]

[2] 
[https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/SqlClientOptions.java]

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)