[jira] [Created] (FLINK-20849) Improve JavaDoc and logging of new KafkaSource

2021-01-04 Thread Qingsheng Ren (Jira)
Qingsheng Ren created FLINK-20849:
-

 Summary: Improve JavaDoc and logging of new KafkaSource
 Key: FLINK-20849
 URL: https://issues.apache.org/jira/browse/FLINK-20849
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Reporter: Qingsheng Ren
 Fix For: 1.13.0, 1.12.1


Some JavaDoc and logging message of the new KafkaSource should be more 
descriptive to provide more information to users. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS]Some thoughts about CatalogPartitionSpec

2021-01-04 Thread Jun Zhang
hi,jark:
Thanks for your explanation. I am doing the integration of flink and
iceberg. The iceberg partition needs to be of accurate type, and I cannot
modify it.

I will follow what you suggestion, get the column type by  schema, and then
do the cast.

Jark Wu  于2021年1月5日周二 下午3:05写道:

> Hi Jun,
>
> AFAIK, the main reason to use Map is because it's easy for
> serialization and deserialization.
> For example, if we use Java `LocalDateTime` instead of String to represent
> TIMESTAMP partition value,
> then users may deserialize into Java `Timestamp` to Flink framework, which
> may cause problems.
>
> Re: "the system will throw an exception that the type does not match",
> could your system store partition values as string type?
>
> Best,
> Jark
>
> On Tue, 5 Jan 2021 at 14:09, Jun Zhang  wrote:
>
> > hi ,Jack:
> >
> > If the partition type is int and we pass in a string type, the system
> will
> > throw an exception that the type does not match. We can indeed cast by
> get
> > the schema, but I think if CatalogPartitionSpec#partitionSpec is of type
> > Map, there is no need to do cast operation, and the
> > universal and compatibility are better
> >
> > Jark Wu  于2021年1月5日周二 下午1:47写道:
> >
> > > Hi Jun,
> > >
> > > I'm curious why it doesn't work when represented in string?
> > > You can get the field type from the CatalogTable#getSchema(),
> > > then parse/cast the partition value to the type you want.
> > >
> > > Best,
> > > Jark
> > >
> > >
> > > On Tue, 5 Jan 2021 at 13:43, Jun Zhang 
> > wrote:
> > >
> > > >  Hello dev:
> > > >  Now I encounter a problem when using the method
> > > > "Catalog#listPartitions(ObjectPath, CatalogPartitionSpec)".
> > > >  I found that the partitionSpec type in CatalogPartitionSpec is
> > > > Map,
> > > >  This is no problem for hivecatalog, but my subclass of Catalog
> > needs
> > > > precise types. For example, if the partition is of int type, passing
> in
> > > > "123" will not work.
> > > >  So I think whether the partitionSpec field of Flink's
> > > > CatalogPartitionSpec is changed to Map type will be
> > more
> > > > reasonable and universal?
> > > >
> > >
> >
>


Re: [DISCUSS]Some thoughts about CatalogPartitionSpec

2021-01-04 Thread Jark Wu
Hi Jun,

AFAIK, the main reason to use Map is because it's easy for
serialization and deserialization.
For example, if we use Java `LocalDateTime` instead of String to represent
TIMESTAMP partition value,
then users may deserialize into Java `Timestamp` to Flink framework, which
may cause problems.

Re: "the system will throw an exception that the type does not match",
could your system store partition values as string type?

Best,
Jark

On Tue, 5 Jan 2021 at 14:09, Jun Zhang  wrote:

> hi ,Jack:
>
> If the partition type is int and we pass in a string type, the system will
> throw an exception that the type does not match. We can indeed cast by get
> the schema, but I think if CatalogPartitionSpec#partitionSpec is of type
> Map, there is no need to do cast operation, and the
> universal and compatibility are better
>
> Jark Wu  于2021年1月5日周二 下午1:47写道:
>
> > Hi Jun,
> >
> > I'm curious why it doesn't work when represented in string?
> > You can get the field type from the CatalogTable#getSchema(),
> > then parse/cast the partition value to the type you want.
> >
> > Best,
> > Jark
> >
> >
> > On Tue, 5 Jan 2021 at 13:43, Jun Zhang 
> wrote:
> >
> > >  Hello dev:
> > >  Now I encounter a problem when using the method
> > > "Catalog#listPartitions(ObjectPath, CatalogPartitionSpec)".
> > >  I found that the partitionSpec type in CatalogPartitionSpec is
> > > Map,
> > >  This is no problem for hivecatalog, but my subclass of Catalog
> needs
> > > precise types. For example, if the partition is of int type, passing in
> > > "123" will not work.
> > >  So I think whether the partitionSpec field of Flink's
> > > CatalogPartitionSpec is changed to Map type will be
> more
> > > reasonable and universal?
> > >
> >
>


[jira] [Created] (FLINK-20848) Kafka consumer ID is not specified correctly in new KafkaSource

2021-01-04 Thread Qingsheng Ren (Jira)
Qingsheng Ren created FLINK-20848:
-

 Summary: Kafka consumer ID is not specified correctly in new 
KafkaSource
 Key: FLINK-20848
 URL: https://issues.apache.org/jira/browse/FLINK-20848
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Reporter: Qingsheng Ren
 Fix For: 1.13.0, 1.12.1


In the constructor of {{KafkaPartitionSplitReader}}, {{subtaskId}} should be 
assigned before invoking {{createConsumerClientId}} method because this method 
uses subtask ID as Kafka consumer ID's suffix



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Support local aggregate push down for Blink batch planner

2021-01-04 Thread Jingsong Li
Thanks for your proposal! Sebastian.

+1 for SupportsAggregatePushDown. The above wonderful discussion has solved
many of my concerns.

## Semantic problems

We may need to add some mechanisms or comments, because as far as I know,
the semantics of each database is actually different, which may need to be
reflected in your specific implementation.

For example, the AVG output types of various databases may be different.
For example, MySQL outputs double, this is different from Flink. What
should we do? (Lucky, avg will be splitted into sum and count, But we also
need care about decimal and others)

## The phase of push-down rule

I strongly recommend that you do not put it in the Volcano phase, which may
make the cost calculation very troublesome.
So in PHYSICAL_REWRITE?

## About interface

For scalability, I slightly recommend that we introduce an `Aggregate`
interface, it contains `List aggregateExpressions, int[]
groupingFields, DataType producedDataType` fields. In this way, we can add
fields easily without breaking compatibility.

I think the current design is very good, just put forward some ideas.

Best,
Jingsong

On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu  wrote:

> Hi Jark,
>
> Thx for your further feedback and help. The interface of
> SupportsAggregatePushDown may indeed need some adjustments.
>
> For (1) Agree: Yeah, the upstream only need to know if the TableSource can
> handle all of the aggregates.
> It's better to just return a boolean type to indicate whether all of
> aggregates push down was successful or not. [Resolved in proposal]
>
> For (2) Agree: The aggOutputDataType represent the produced data type of
> the new table source to make sure that the new table source can
> connect with the related exchange node. The format of this
> aggOutputDataType is groupedFields's type + agg function's return type.
> The reason for adding this parameter in this function is also to facilitate
> the user to build the final output type. I have changed this parameter
> to be producedDataType. [Resolved in proposal]
>
> For (3) Agree: Indeed, groupSet may mislead users, I have changed to use
> groupingFields. [Resolved in proposal]
>
> Thx again for the suggestion, looking for the further discussion.
>
> Jark Wu  于2021年1月5日周二 下午12:05写道:
>
> > I'm also +1 for idea#2.
> >
> > Regarding to the updated interface,
> >
> > Result applyAggregates(List aggregateExpressions,
> >  int[] groupSet, DataType aggOutputDataType);
> >
> > final class Result {
> >private final List acceptedAggregates;
> >private final List remainingAggregates;
> > }
> >
> > I have following comments:
> >
> > 1) Do we need the composite Result return type? Is a boolean return type
> > enough?
> > From my understanding, all of the aggregates should be accepted,
> > otherwise the pushdown should fail.
> > Therefore, users don't need to distinguish which aggregates are
> > "accepted".
> >
> > 2) Does the `aggOutputDataType` represent the produced data type of the
> > new source, or just the return type of all the agg functions?
> > I would prefer to `producedDataType` just like
> > `SupportsReadingMetadata` to reduce the effort for users to concat a
> final
> > output type.
> > The return type of each agg function can be obtained from the
> > `CallExpression`.
> >
> > 3) What do you think about renaming `groupSet` to `grouping` or
> > `groupedFields` ?
> > The `groupSet` may confuse users that it relates to "grouping sets".
> >
> >
> > What do you think?
> >
> > Best,
> > Jark
> >
> >
> >
> > On Tue, 5 Jan 2021 at 11:04, Kurt Young  wrote:
> >
> >> Sorry for the typo -_-!
> >> I meant idea #2.
> >>
> >> Best,
> >> Kurt
> >>
> >>
> >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu 
> >> wrote:
> >>
> >>> Hi Kurt,
> >>>
> >>> Thx a lot for your feedback. If local aggregation is more like a
> >>> physical operator rather than logical
> >>> operator, I think your suggestion should be idea #2 which handle all in
> >>> the physical optimization phase?
> >>>
> >>> Looking forward for the further discussion.
> >>>
> >>>
> >>> Kurt Young  于2021年1月5日周二 上午9:52写道:
> >>>
>  Local aggregation is more like a physical operator rather than logical
>  operator. I would suggest going with idea #1.
> 
>  Best,
>  Kurt
> 
> 
>  On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu 
>  wrote:
> 
>  > Hi Jark, Thx a lot for your quick reply and valuable suggestions.
>  > For (1): Agree: Since we are in the period of upgrading the new
> table
>  > source api,
>  > we really should consider the new interface for the new optimize
>  rule. If
>  > the new rule
>  > doesn't use the new api, we'll have to upgrade it sooner or later. I
>  have
>  > change to use
>  > the ability interface for the SupportsAggregatePushDown definition
> in
>  above
>  > proposal.
>  >
>  > For (2): Agree: Change to use CallExpression is a better choice, and
> 

Re: [DISCUSS]Some thoughts about CatalogPartitionSpec

2021-01-04 Thread Jun Zhang
hi ,Jack:

If the partition type is int and we pass in a string type, the system will
throw an exception that the type does not match. We can indeed cast by get
the schema, but I think if CatalogPartitionSpec#partitionSpec is of type
Map, there is no need to do cast operation, and the
universal and compatibility are better

Jark Wu  于2021年1月5日周二 下午1:47写道:

> Hi Jun,
>
> I'm curious why it doesn't work when represented in string?
> You can get the field type from the CatalogTable#getSchema(),
> then parse/cast the partition value to the type you want.
>
> Best,
> Jark
>
>
> On Tue, 5 Jan 2021 at 13:43, Jun Zhang  wrote:
>
> >  Hello dev:
> >  Now I encounter a problem when using the method
> > "Catalog#listPartitions(ObjectPath, CatalogPartitionSpec)".
> >  I found that the partitionSpec type in CatalogPartitionSpec is
> > Map,
> >  This is no problem for hivecatalog, but my subclass of Catalog needs
> > precise types. For example, if the partition is of int type, passing in
> > "123" will not work.
> >  So I think whether the partitionSpec field of Flink's
> > CatalogPartitionSpec is changed to Map type will be more
> > reasonable and universal?
> >
>


Re: Support local aggregate push down for Blink batch planner

2021-01-04 Thread Sebastian Liu
Hi Jark,

Thx for your further feedback and help. The interface of
SupportsAggregatePushDown may indeed need some adjustments.

For (1) Agree: Yeah, the upstream only need to know if the TableSource can
handle all of the aggregates.
It's better to just return a boolean type to indicate whether all of
aggregates push down was successful or not. [Resolved in proposal]

For (2) Agree: The aggOutputDataType represent the produced data type of
the new table source to make sure that the new table source can
connect with the related exchange node. The format of this
aggOutputDataType is groupedFields's type + agg function's return type.
The reason for adding this parameter in this function is also to facilitate
the user to build the final output type. I have changed this parameter
to be producedDataType. [Resolved in proposal]

For (3) Agree: Indeed, groupSet may mislead users, I have changed to use
groupingFields. [Resolved in proposal]

Thx again for the suggestion, looking for the further discussion.

Jark Wu  于2021年1月5日周二 下午12:05写道:

> I'm also +1 for idea#2.
>
> Regarding to the updated interface,
>
> Result applyAggregates(List aggregateExpressions,
>  int[] groupSet, DataType aggOutputDataType);
>
> final class Result {
>private final List acceptedAggregates;
>private final List remainingAggregates;
> }
>
> I have following comments:
>
> 1) Do we need the composite Result return type? Is a boolean return type
> enough?
> From my understanding, all of the aggregates should be accepted,
> otherwise the pushdown should fail.
> Therefore, users don't need to distinguish which aggregates are
> "accepted".
>
> 2) Does the `aggOutputDataType` represent the produced data type of the
> new source, or just the return type of all the agg functions?
> I would prefer to `producedDataType` just like
> `SupportsReadingMetadata` to reduce the effort for users to concat a final
> output type.
> The return type of each agg function can be obtained from the
> `CallExpression`.
>
> 3) What do you think about renaming `groupSet` to `grouping` or
> `groupedFields` ?
> The `groupSet` may confuse users that it relates to "grouping sets".
>
>
> What do you think?
>
> Best,
> Jark
>
>
>
> On Tue, 5 Jan 2021 at 11:04, Kurt Young  wrote:
>
>> Sorry for the typo -_-!
>> I meant idea #2.
>>
>> Best,
>> Kurt
>>
>>
>> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu 
>> wrote:
>>
>>> Hi Kurt,
>>>
>>> Thx a lot for your feedback. If local aggregation is more like a
>>> physical operator rather than logical
>>> operator, I think your suggestion should be idea #2 which handle all in
>>> the physical optimization phase?
>>>
>>> Looking forward for the further discussion.
>>>
>>>
>>> Kurt Young  于2021年1月5日周二 上午9:52写道:
>>>
 Local aggregation is more like a physical operator rather than logical
 operator. I would suggest going with idea #1.

 Best,
 Kurt


 On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu 
 wrote:

 > Hi Jark, Thx a lot for your quick reply and valuable suggestions.
 > For (1): Agree: Since we are in the period of upgrading the new table
 > source api,
 > we really should consider the new interface for the new optimize
 rule. If
 > the new rule
 > doesn't use the new api, we'll have to upgrade it sooner or later. I
 have
 > change to use
 > the ability interface for the SupportsAggregatePushDown definition in
 above
 > proposal.
 >
 > For (2): Agree: Change to use CallExpression is a better choice, and
 have
 > resolved this
 > comment in the proposal.
 >
 > For (3): I suggest we first support the JDBC connector, as we don't
 have
 > Druid connector
 > and ES connector just has sink api at present.
 >
 > But perhaps the biggest question may be whether we should use idea 1
 or
 > idea 2 in proposal.
 >
 > What do you think?  After we reach the agreement on the proposal, our
 team
 > can drive to
 > complete this feature.
 >
 > Jark Wu  于2020年12月29日周二 下午2:58写道:
 >
 > > Hi Sebastian,
 > >
 > > Thanks for the proposal. I think this is a great improvement for
 Flink
 > SQL.
 > > I went through the design doc and have following thoughts:
 > >
 > > 1) Flink has deprecated the legacy TableSource in 1.11 and proposed
 a new
 > >  set of DynamicTableSource interfaces. Could you update your
 proposal to
 > > use the new interfaces?
 > >  Follow the existing ability interfaces, e.g.
 > > SupportsFilterPushDown, SupportsProjectionPushDown.
 > >
 > > 2) Personally, I think CallExpression would be a better
 representation
 > than
 > > separate `FunctionDefinition` and args. Because, it would be easier
 to
 > know
 > > what's the index and type of the arguments.
 > >
 > > 3) It would be better to list which connectors will be supported in

[Discussion] Let catalog manage temporary catalog objects

2021-01-04 Thread Rui Li
Hi Dev,

I'd like to start a discussion about whether we can let catalog handle
temporary catalog objects. Currently temporary table/view is managed by
CatalogManager, and temporary function is managed by FunctionCatalog.

This causes problems when I try to support temporary hive table/function.
For example, hive function class is different from what FunctionCatalog
expects. And a temp hive table needs a temp data location which
CatalogManager doesn't know how to assign.

Maybe we have two options to solve this problem.
1. Let catalog manage temp objects just as persistent ones.
CatalogManager/FunctionCatalog still needs to maintain temp objects that
are associated with a non-existing catalog.
2. CatalogManager/FunctionCatalog can continue managing all the temp
objects, but needs to somehow inform the catalog when a temp object is
created/altered/dropped etc.

And either way, we have to make sure temp table/function is processed by
the TableFactory/FunctionDefinitionFactory associated with the catalog.

Looking forward to your suggestions.

-- 
Best regards!
Rui Li


Re: [DISCUSS]Some thoughts about CatalogPartitionSpec

2021-01-04 Thread Jark Wu
Hi Jun,

I'm curious why it doesn't work when represented in string?
You can get the field type from the CatalogTable#getSchema(),
then parse/cast the partition value to the type you want.

Best,
Jark


On Tue, 5 Jan 2021 at 13:43, Jun Zhang  wrote:

>  Hello dev:
>  Now I encounter a problem when using the method
> "Catalog#listPartitions(ObjectPath, CatalogPartitionSpec)".
>  I found that the partitionSpec type in CatalogPartitionSpec is
> Map,
>  This is no problem for hivecatalog, but my subclass of Catalog needs
> precise types. For example, if the partition is of int type, passing in
> "123" will not work.
>  So I think whether the partitionSpec field of Flink's
> CatalogPartitionSpec is changed to Map type will be more
> reasonable and universal?
>


[DISCUSS]Some thoughts about CatalogPartitionSpec

2021-01-04 Thread Jun Zhang
 Hello dev:
 Now I encounter a problem when using the method
"Catalog#listPartitions(ObjectPath, CatalogPartitionSpec)".
 I found that the partitionSpec type in CatalogPartitionSpec is
Map,
 This is no problem for hivecatalog, but my subclass of Catalog needs
precise types. For example, if the partition is of int type, passing in
"123" will not work.
 So I think whether the partitionSpec field of Flink's
CatalogPartitionSpec is changed to Map type will be more
reasonable and universal?


Re: [DISCUSS] Allow streaming operators to use managed memory

2021-01-04 Thread Jingsong Li
+1 for allowing streaming operators to use managed memory.

The memory use of streams requires some hierarchy, and the bottom layer is
undoubtedly the current StateBackend.
Let the stream operators freely use the managed memory, which will make the
memory management model to be unified and give the operator free space.

Xingtong's proposal looks good to me. +1 to split `DATAPROC` into
`STATE_BACKEND` or `OPERATOR`.

Best,
Jingsong

On Tue, Jan 5, 2021 at 12:33 PM Jark Wu  wrote:

> +1 to Xingtong's proposal!
>
> Best,
> Jark
>
> On Tue, 5 Jan 2021 at 12:13, Xintong Song  wrote:
>
> > +1 for allowing streaming operators to use managed memory.
> >
> > As for the consumer names, I'm afraid using `DATAPROC` for both streaming
> > ops and state backends will not work. Currently, RocksDB state backend
> uses
> > a shared piece of memory for all the states within that slot. It's not
> the
> > operator's decision how much memory it uses for the states.
> >
> > I would suggest the following. (IIUC, the same as what Jark proposed)
> > * `OPERATOR` for both streaming and bath operators
> > * `STATE_BACKEND` for state backends
> > * `PYTHON` for python processes
> > * `DATAPROC` as a legacy key for state backend or batch operators if
> > `STATE_BACKEND` or `OPERATOR` are not specified.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Tue, Jan 5, 2021 at 11:23 AM Jark Wu  wrote:
> >
> > > Hi Aljoscha,
> > >
> > > I think we may need to divide `DATAPROC` into `OPERATOR` and
> > > `STATE_BACKEND`, because they have different scope (slot vs. operator).
> > > But @Xintong Song  may have more insights on
> it.
> > >
> > > Best,
> > > Jark
> > >
> > >
> > > On Mon, 4 Jan 2021 at 20:44, Aljoscha Krettek 
> > wrote:
> > >
> > >> I agree, we should allow streaming operators to use managed memory for
> > >> other use cases.
> > >>
> > >> Do you think we need an additional "consumer" setting or that they
> would
> > >> just use `DATAPROC` and decide by themselves what to use the memory
> for?
> > >>
> > >> Best,
> > >> Aljoscha
> > >>
> > >> On 2020/12/22 17:14, Jark Wu wrote:
> > >> >Hi all,
> > >> >
> > >> >I found that currently the managed memory can only be used in 3
> > workloads
> > >> >[1]:
> > >> >- state backends for streaming jobs
> > >> >- sorting, hash tables for batch jobs
> > >> >- python UDFs
> > >> >
> > >> >And the configuration option
> > >> `taskmanager.memory.managed.consumer-weights`
> > >> >only allows values: PYTHON and DATAPROC (state in streaming or
> > algorithms
> > >> >in batch).
> > >> >I'm confused why it doesn't allow streaming operators to use managed
> > >> memory
> > >> >for purposes other than state backends.
> > >> >
> > >> >The background is that we are planning to use some batch algorithms
> > >> >(sorting & bytes hash table) to improve the performance of streaming
> > SQL
> > >> >operators, especially for the mini-batch operators.
> > >> >Currently, the mini-batch operators are buffering input records and
> > >> >accumulators in heap (i.e. Java HashMap) which is not efficient and
> > there
> > >> >are potential risks of full GC and OOM.
> > >> >With the managed memory, we can fully use the memory to buffer more
> > data
> > >> >without worrying about OOM and improve the performance a lot.
> > >> >
> > >> >What do you think about allowing streaming operators to use managed
> > >> memory
> > >> >and exposing it in configuration.
> > >> >
> > >> >Best,
> > >> >Jark
> > >> >
> > >> >[1]:
> > >> >
> > >>
> >
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup_tm.html#managed-memory
> > >>
> > >
> >
>


-- 
Best, Jingsong Lee


Re: [DISCUSS] FLIP-153: Support state access in Python DataStream API

2021-01-04 Thread Dian Fu
Thanks Shuiqiang for driving this.

The design looks good to me. +1 to start the vote if there are no more comments.

Regards,
Dian

> 在 2021年1月4日,下午7:40,Shuiqiang Chen  写道:
> 
> Hi Yu,
> 
> Thanks a lot for your suggestions.
> 
> I have addressed your inlined comments in the FLIP and also added a new
> section "State backed access synchronization" that explains the way to make
> sure there is no concurrent access to the state backend. Please have a look.
> 
> Best,
> Shuiqiang
> 
> 
> Yu Li  于2021年1月4日周一 下午4:15写道:
> 
>> Thanks for driving the discussion Shuiqiang, and sorry for chiming in late.
>> 
>> *bq. However, all the state access will be synchronized in the Java
>> operator and so there will be no concurrent access to the state backend.*
>> Could you add a section to explicitly mention this in the FLIP document? I
>> think single-threaded state access is an important prerequisite and it's
>> important for later contributors to know about this clearly, from both the
>> design doc and source codes.
>> 
>> The other parts LGTM, added some minor inline comments in the FLIP, please
>> take a look.
>> 
>> Thanks.
>> 
>> Best Regards,
>> Yu
>> 
>> 
>> On Fri, 18 Dec 2020 at 15:10, Shuiqiang Chen  wrote:
>> 
>>> Hi wei,
>>> 
>>> Big thanks for pointing out the mistakes! I have updated the FLIP
>>> according to your suggestions.
>>> 
>>> Best,
>>> Shuiqiang
>>> 
 在 2020年12月18日,下午2:37,Wei Zhong  写道:
 
 Hi Shuiqiang,
 
 Thanks for driving this. +1 for this feature, just a minor comment to
>>> the design doc.
 
 The interface of the `AppendingState` should be:
 
 class AppendingState(State, Generic[IN, OUT]):
 
  @abstractmethod
  def get(self) -> OUT:
  pass
 
  @abstractmethod
  def add(self, value: IN) -> None:
  pass
 
 The output type and the input type of the `AppendingState` maybe
>>> different. And the definition of the child classes should be:
 
 class MergingState(AppendingState[IN, OUT]):
   pass
 
 
 class ListState(MergingState[T, Iterable[T]]):
 
  @abstractmethod
  def update(self, values: List[T]) -> None:
  pass
 
  @abstractmethod
  def add_all(self, values: List[T]) -> None:
  pass
 
  def __iter__(self) -> Iterator[T]:
  return iter(self.get())
 
 Best,
 Wei
 
> 在 2020年12月17日,21:06,Shuiqiang Chen  写道:
> 
> Hi Yun,
> 
> Highly appreciate for your questions! I have the corresponding answers
>>> as bellow:
> 
> Re 1: You are right that the state access occurs in an async thread.
>>> However, all the state access will be synchrouzed in the Java operator
>> and
>>> so there will be no concurrent access to the state backend.
> 
> Re 2: I think it could be handled well in Python DataStream API. In
>>> this case, there will be two operators and so also two keyed state
>> backend.
> 
> Re 3: Sure, you are right. We will store the current key which may be
>>> used by the timer.
> 
> Re 4: Good point. State migration is still not covered in the current
>>> FLIP. I'd like to cover it in a separate FLIP as it should be orthogonal
>> to
>>> this FLIP. I have updated the FLIP and added clear description for this.
> 
> Re 5: Good point. We may need to introduce a Python querable state
>>> client if we want to support Queryable state for Python operators. I'd
>> like
>>> to cover it in a separate FLIP. I have updated the FLIP and add it as a
>>> future work.
> 
> Best,
> Shuiqiang
> 
>> 在 2020年12月17日,下午12:08,Yun Tang  写道:
>> 
>> Hi Shuiqiang,
>> 
>> Thanks for driving this. I have several questions below:
>> 
>> 
>> 1.  Thread safety of state write-access. As you might know, state
>>> access is not thread-safe [1] in Flink, we depend on task single thread
>>> access. Since you change the state access to another async thread, can we
>>> still ensure this? It also includes not allow user to access state in its
>>> java operator along with the bundled python operator.
>> 2.  Number of keyed state backend per task. Flink would only have one
>>> keyed state-backend per operator and would only have one keyed state
>>> backend per operator chain (in the head operator if possible). However,
>>> once we use experimental features such as reinterpretAsKeyedStream [2],
>> we
>>> could have two keyed state-backend in one operator chain within one task.
>>> Can python datastream API could handle this well?
>> 3.  Time to set current key. As we still need current key when
>>> registering timer [3], we need some place to hole the current key even
>> not
>>> registered in keyed state backend.
>> 4.  State migration. Flink supports to migrate state automatically if
>>> new provided serializer is compatible with old serializer[4]. I'm afraid
>> if
>>> python data stream API wraps user's serializer as
>>> 

Re: [DISCUSS] Allow streaming operators to use managed memory

2021-01-04 Thread Jark Wu
+1 to Xingtong's proposal!

Best,
Jark

On Tue, 5 Jan 2021 at 12:13, Xintong Song  wrote:

> +1 for allowing streaming operators to use managed memory.
>
> As for the consumer names, I'm afraid using `DATAPROC` for both streaming
> ops and state backends will not work. Currently, RocksDB state backend uses
> a shared piece of memory for all the states within that slot. It's not the
> operator's decision how much memory it uses for the states.
>
> I would suggest the following. (IIUC, the same as what Jark proposed)
> * `OPERATOR` for both streaming and bath operators
> * `STATE_BACKEND` for state backends
> * `PYTHON` for python processes
> * `DATAPROC` as a legacy key for state backend or batch operators if
> `STATE_BACKEND` or `OPERATOR` are not specified.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Jan 5, 2021 at 11:23 AM Jark Wu  wrote:
>
> > Hi Aljoscha,
> >
> > I think we may need to divide `DATAPROC` into `OPERATOR` and
> > `STATE_BACKEND`, because they have different scope (slot vs. operator).
> > But @Xintong Song  may have more insights on it.
> >
> > Best,
> > Jark
> >
> >
> > On Mon, 4 Jan 2021 at 20:44, Aljoscha Krettek 
> wrote:
> >
> >> I agree, we should allow streaming operators to use managed memory for
> >> other use cases.
> >>
> >> Do you think we need an additional "consumer" setting or that they would
> >> just use `DATAPROC` and decide by themselves what to use the memory for?
> >>
> >> Best,
> >> Aljoscha
> >>
> >> On 2020/12/22 17:14, Jark Wu wrote:
> >> >Hi all,
> >> >
> >> >I found that currently the managed memory can only be used in 3
> workloads
> >> >[1]:
> >> >- state backends for streaming jobs
> >> >- sorting, hash tables for batch jobs
> >> >- python UDFs
> >> >
> >> >And the configuration option
> >> `taskmanager.memory.managed.consumer-weights`
> >> >only allows values: PYTHON and DATAPROC (state in streaming or
> algorithms
> >> >in batch).
> >> >I'm confused why it doesn't allow streaming operators to use managed
> >> memory
> >> >for purposes other than state backends.
> >> >
> >> >The background is that we are planning to use some batch algorithms
> >> >(sorting & bytes hash table) to improve the performance of streaming
> SQL
> >> >operators, especially for the mini-batch operators.
> >> >Currently, the mini-batch operators are buffering input records and
> >> >accumulators in heap (i.e. Java HashMap) which is not efficient and
> there
> >> >are potential risks of full GC and OOM.
> >> >With the managed memory, we can fully use the memory to buffer more
> data
> >> >without worrying about OOM and improve the performance a lot.
> >> >
> >> >What do you think about allowing streaming operators to use managed
> >> memory
> >> >and exposing it in configuration.
> >> >
> >> >Best,
> >> >Jark
> >> >
> >> >[1]:
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup_tm.html#managed-memory
> >>
> >
>


Re: [DISCUSS] Allow streaming operators to use managed memory

2021-01-04 Thread Xintong Song
+1 for allowing streaming operators to use managed memory.

As for the consumer names, I'm afraid using `DATAPROC` for both streaming
ops and state backends will not work. Currently, RocksDB state backend uses
a shared piece of memory for all the states within that slot. It's not the
operator's decision how much memory it uses for the states.

I would suggest the following. (IIUC, the same as what Jark proposed)
* `OPERATOR` for both streaming and bath operators
* `STATE_BACKEND` for state backends
* `PYTHON` for python processes
* `DATAPROC` as a legacy key for state backend or batch operators if
`STATE_BACKEND` or `OPERATOR` are not specified.

Thank you~

Xintong Song



On Tue, Jan 5, 2021 at 11:23 AM Jark Wu  wrote:

> Hi Aljoscha,
>
> I think we may need to divide `DATAPROC` into `OPERATOR` and
> `STATE_BACKEND`, because they have different scope (slot vs. operator).
> But @Xintong Song  may have more insights on it.
>
> Best,
> Jark
>
>
> On Mon, 4 Jan 2021 at 20:44, Aljoscha Krettek  wrote:
>
>> I agree, we should allow streaming operators to use managed memory for
>> other use cases.
>>
>> Do you think we need an additional "consumer" setting or that they would
>> just use `DATAPROC` and decide by themselves what to use the memory for?
>>
>> Best,
>> Aljoscha
>>
>> On 2020/12/22 17:14, Jark Wu wrote:
>> >Hi all,
>> >
>> >I found that currently the managed memory can only be used in 3 workloads
>> >[1]:
>> >- state backends for streaming jobs
>> >- sorting, hash tables for batch jobs
>> >- python UDFs
>> >
>> >And the configuration option
>> `taskmanager.memory.managed.consumer-weights`
>> >only allows values: PYTHON and DATAPROC (state in streaming or algorithms
>> >in batch).
>> >I'm confused why it doesn't allow streaming operators to use managed
>> memory
>> >for purposes other than state backends.
>> >
>> >The background is that we are planning to use some batch algorithms
>> >(sorting & bytes hash table) to improve the performance of streaming SQL
>> >operators, especially for the mini-batch operators.
>> >Currently, the mini-batch operators are buffering input records and
>> >accumulators in heap (i.e. Java HashMap) which is not efficient and there
>> >are potential risks of full GC and OOM.
>> >With the managed memory, we can fully use the memory to buffer more data
>> >without worrying about OOM and improve the performance a lot.
>> >
>> >What do you think about allowing streaming operators to use managed
>> memory
>> >and exposing it in configuration.
>> >
>> >Best,
>> >Jark
>> >
>> >[1]:
>> >
>> https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup_tm.html#managed-memory
>>
>


Re: Support local aggregate push down for Blink batch planner

2021-01-04 Thread Sebastian Liu
Thanks for the clarification. I have resolved all of the comments and added
a conclusion section.

Looking forward to the further feedback from our community. If we get
consensus on the design doc, I can push the implementation related work.

Kurt Young  于2021年1月5日周二 上午11:04写道:

> Sorry for the typo -_-!
> I meant idea #2.
>
> Best,
> Kurt
>
>
> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu 
> wrote:
>
>> Hi Kurt,
>>
>> Thx a lot for your feedback. If local aggregation is more like a physical
>> operator rather than logical
>> operator, I think your suggestion should be idea #2 which handle all in
>> the physical optimization phase?
>>
>> Looking forward for the further discussion.
>>
>>
>> Kurt Young  于2021年1月5日周二 上午9:52写道:
>>
>>> Local aggregation is more like a physical operator rather than logical
>>> operator. I would suggest going with idea #1.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu 
>>> wrote:
>>>
>>> > Hi Jark, Thx a lot for your quick reply and valuable suggestions.
>>> > For (1): Agree: Since we are in the period of upgrading the new table
>>> > source api,
>>> > we really should consider the new interface for the new optimize rule.
>>> If
>>> > the new rule
>>> > doesn't use the new api, we'll have to upgrade it sooner or later. I
>>> have
>>> > change to use
>>> > the ability interface for the SupportsAggregatePushDown definition in
>>> above
>>> > proposal.
>>> >
>>> > For (2): Agree: Change to use CallExpression is a better choice, and
>>> have
>>> > resolved this
>>> > comment in the proposal.
>>> >
>>> > For (3): I suggest we first support the JDBC connector, as we don't
>>> have
>>> > Druid connector
>>> > and ES connector just has sink api at present.
>>> >
>>> > But perhaps the biggest question may be whether we should use idea 1 or
>>> > idea 2 in proposal.
>>> >
>>> > What do you think?  After we reach the agreement on the proposal, our
>>> team
>>> > can drive to
>>> > complete this feature.
>>> >
>>> > Jark Wu  于2020年12月29日周二 下午2:58写道:
>>> >
>>> > > Hi Sebastian,
>>> > >
>>> > > Thanks for the proposal. I think this is a great improvement for
>>> Flink
>>> > SQL.
>>> > > I went through the design doc and have following thoughts:
>>> > >
>>> > > 1) Flink has deprecated the legacy TableSource in 1.11 and proposed
>>> a new
>>> > >  set of DynamicTableSource interfaces. Could you update your
>>> proposal to
>>> > > use the new interfaces?
>>> > >  Follow the existing ability interfaces, e.g.
>>> > > SupportsFilterPushDown, SupportsProjectionPushDown.
>>> > >
>>> > > 2) Personally, I think CallExpression would be a better
>>> representation
>>> > than
>>> > > separate `FunctionDefinition` and args. Because, it would be easier
>>> to
>>> > know
>>> > > what's the index and type of the arguments.
>>> > >
>>> > > 3) It would be better to list which connectors will be supported in
>>> the
>>> > > plan?
>>> > >
>>> > > Best,
>>> > > Jark
>>> > >
>>> > >
>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu 
>>> > wrote:
>>> > >
>>> > > > Hi all,
>>> > > >
>>> > > > I'd like to discuss a new feature for the Blink Planner.
>>> > > > Aggregate operator of Flink SQL is currently fully done at Flink
>>> layer.
>>> > > > With the developing of storage, many downstream storage of Flink
>>> SQL
>>> > has
>>> > > > the ability to deal with Aggregation operator.
>>> > > > Pushing down Aggregate to data source layer will improve
>>> performance
>>> > from
>>> > > > the perspective of the network IO and computation overhead.
>>> > > >
>>> > > > I have drafted a design doc for this new feature.
>>> > > >
>>> > > >
>>> > >
>>> >
>>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>>> > > >
>>> > > > Any comment or discussion is welcome.
>>> > > >
>>> > > > --
>>> > > >
>>> > > > *With kind regards
>>> > > > 
>>> > > > Sebastian Liu 刘洋
>>> > > > Institute of Computing Technology, Chinese Academy of Science
>>> > > > Mobile\WeChat: +86—15201613655
>>> > > > E-mail: liuyang0...@gmail.com 
>>> > > > QQ: 3239559*
>>> > > >
>>> > >
>>> >
>>> >
>>> > --
>>> >
>>> > *With kind regards
>>> > 
>>> > Sebastian Liu 刘洋
>>> > Institute of Computing Technology, Chinese Academy of Science
>>> > Mobile\WeChat: +86—15201613655
>>> > E-mail: liuyang0...@gmail.com 
>>> > QQ: 3239559*
>>> >
>>>
>>
>>
>> --
>>
>> *With kind regards
>> 
>> Sebastian Liu 刘洋
>> Institute of Computing Technology, Chinese Academy of Science
>> Mobile\WeChat: +86—15201613655
>> E-mail: liuyang0...@gmail.com 
>> QQ: 3239559*
>>
>>

-- 

*With kind regards

Sebastian Liu 刘洋
Institute of Computing Technology, Chinese Academy of Science
Mobile\WeChat: +86—15201613655
E-mail: liuyang0...@gmail.com 
QQ: 3239559*


Re: Support local aggregate push down for Blink batch planner

2021-01-04 Thread Jark Wu
I'm also +1 for idea#2.

Regarding to the updated interface,

Result applyAggregates(List aggregateExpressions,
 int[] groupSet, DataType aggOutputDataType);

final class Result {
   private final List acceptedAggregates;
   private final List remainingAggregates;
}

I have following comments:

1) Do we need the composite Result return type? Is a boolean return type
enough?
From my understanding, all of the aggregates should be accepted,
otherwise the pushdown should fail.
Therefore, users don't need to distinguish which aggregates are
"accepted".

2) Does the `aggOutputDataType` represent the produced data type of the new
source, or just the return type of all the agg functions?
I would prefer to `producedDataType` just like
`SupportsReadingMetadata` to reduce the effort for users to concat a final
output type.
The return type of each agg function can be obtained from the
`CallExpression`.

3) What do you think about renaming `groupSet` to `grouping` or
`groupedFields` ?
The `groupSet` may confuse users that it relates to "grouping sets".


What do you think?

Best,
Jark



On Tue, 5 Jan 2021 at 11:04, Kurt Young  wrote:

> Sorry for the typo -_-!
> I meant idea #2.
>
> Best,
> Kurt
>
>
> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu 
> wrote:
>
>> Hi Kurt,
>>
>> Thx a lot for your feedback. If local aggregation is more like a physical
>> operator rather than logical
>> operator, I think your suggestion should be idea #2 which handle all in
>> the physical optimization phase?
>>
>> Looking forward for the further discussion.
>>
>>
>> Kurt Young  于2021年1月5日周二 上午9:52写道:
>>
>>> Local aggregation is more like a physical operator rather than logical
>>> operator. I would suggest going with idea #1.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu 
>>> wrote:
>>>
>>> > Hi Jark, Thx a lot for your quick reply and valuable suggestions.
>>> > For (1): Agree: Since we are in the period of upgrading the new table
>>> > source api,
>>> > we really should consider the new interface for the new optimize rule.
>>> If
>>> > the new rule
>>> > doesn't use the new api, we'll have to upgrade it sooner or later. I
>>> have
>>> > change to use
>>> > the ability interface for the SupportsAggregatePushDown definition in
>>> above
>>> > proposal.
>>> >
>>> > For (2): Agree: Change to use CallExpression is a better choice, and
>>> have
>>> > resolved this
>>> > comment in the proposal.
>>> >
>>> > For (3): I suggest we first support the JDBC connector, as we don't
>>> have
>>> > Druid connector
>>> > and ES connector just has sink api at present.
>>> >
>>> > But perhaps the biggest question may be whether we should use idea 1 or
>>> > idea 2 in proposal.
>>> >
>>> > What do you think?  After we reach the agreement on the proposal, our
>>> team
>>> > can drive to
>>> > complete this feature.
>>> >
>>> > Jark Wu  于2020年12月29日周二 下午2:58写道:
>>> >
>>> > > Hi Sebastian,
>>> > >
>>> > > Thanks for the proposal. I think this is a great improvement for
>>> Flink
>>> > SQL.
>>> > > I went through the design doc and have following thoughts:
>>> > >
>>> > > 1) Flink has deprecated the legacy TableSource in 1.11 and proposed
>>> a new
>>> > >  set of DynamicTableSource interfaces. Could you update your
>>> proposal to
>>> > > use the new interfaces?
>>> > >  Follow the existing ability interfaces, e.g.
>>> > > SupportsFilterPushDown, SupportsProjectionPushDown.
>>> > >
>>> > > 2) Personally, I think CallExpression would be a better
>>> representation
>>> > than
>>> > > separate `FunctionDefinition` and args. Because, it would be easier
>>> to
>>> > know
>>> > > what's the index and type of the arguments.
>>> > >
>>> > > 3) It would be better to list which connectors will be supported in
>>> the
>>> > > plan?
>>> > >
>>> > > Best,
>>> > > Jark
>>> > >
>>> > >
>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu 
>>> > wrote:
>>> > >
>>> > > > Hi all,
>>> > > >
>>> > > > I'd like to discuss a new feature for the Blink Planner.
>>> > > > Aggregate operator of Flink SQL is currently fully done at Flink
>>> layer.
>>> > > > With the developing of storage, many downstream storage of Flink
>>> SQL
>>> > has
>>> > > > the ability to deal with Aggregation operator.
>>> > > > Pushing down Aggregate to data source layer will improve
>>> performance
>>> > from
>>> > > > the perspective of the network IO and computation overhead.
>>> > > >
>>> > > > I have drafted a design doc for this new feature.
>>> > > >
>>> > > >
>>> > >
>>> >
>>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>>> > > >
>>> > > > Any comment or discussion is welcome.
>>> > > >
>>> > > > --
>>> > > >
>>> > > > *With kind regards
>>> > > > 
>>> > > > Sebastian Liu 刘洋
>>> > > > Institute of Computing Technology, Chinese Academy of Science
>>> > > > Mobile\WeChat: +86—15201613655
>>> > > > E-mail: 

Re: [DISCUSS] FLIP-155: Introduce a few convenient operations in Table API

2021-01-04 Thread Dian Fu
Thanks a lot for your comments!

Regarding to Python Table API examples: I thought it should be straightforward 
about how to use these operations in Python Table API and so have not added 
them. However, the suggestions make sense to me and I have added some examples 
about how to use them in Python Table API to make it more clear.

Regarding to dropDuplicates vs deduplicate: +1 to use deduplicate. It's more 
consistent with the feature/concept which is already documented clearly in 
Flink.

Regarding to `myTable.coalesce($("a"), 1).as("a")`: I'm still in favor of 
fillna for now. Compared to coalesce, fillna could handle multiple columns in 
one method call. For the naming convention, the name "fillna/dropna/replace" 
comes from Pandas [1][2][3].

Regarding to `event-time/processing-time temporal join, SQL Hints, window TVF`: 
Good catch! Definitely we should support them in Table API. I will update the 
FLIP about these functionalities.

[1] https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.fillna.html 

[2] https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.dropna.html 

[3] https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.replace.html 

> 在 2021年1月4日,下午10:59,Timo Walther  写道:
> 
> Hi Dian,
> 
> thanks for the proposed FLIP. I haven't taken a deep look at the proposal yet 
> but will do so shortly. In general, we should aim to make the Table API as 
> concise and self-explaining as possible. E.g. `dropna` does not sound obvious 
> to me.
> 
> Regarding `myTable.coalesce($("a"), 1).as("a")`: Instead of introducing more 
> top-level functions, maybe we should also consider introducing more building 
> blocks e.g. for applying an expression to every column. A more functional 
> approach (e.g. with lamba function) could solve more use cases.
> 
> Regards,
> Timo
> 
> On 04.01.21 15:35, Seth Wiesman wrote:
>> This makes sense, I have some questions about method names.
>> What do you think about renaming `dropDuplicates` to `deduplicate`? I don't
>> think that drop is the right word to use for this operation, it implies
>> records are filtered where this operator actually issues updates and
>> retractions. Also, deduplicate is already how we talk about this feature in
>> the docs so I think it would be easier for users to find.
>> For null handling, I don't know how close we want to stick with SQL
>> conventions but what about making `coalesce` a top-level method? Something
>> like:
>> myTable.coalesce($("a"), 1).as("a")
>> We can require the next method to be an `as`. There is already precedent
>> for this sort of thing, `GroupedTable#aggregate` can only be followed by
>> `select`.
>> Seth
>> On Mon, Jan 4, 2021 at 6:27 AM Wei Zhong  wrote:
>>> Hi Dian,
>>> 
>>> Big +1 for making the Table API easier to use. Java users and Python users
>>> can both benefit from it. I think it would be better if we add some Python
>>> API examples.
>>> 
>>> Best,
>>> Wei
>>> 
>>> 
 在 2021年1月4日,20:03,Dian Fu  写道:
 
 Hi all,
 
 I'd like to start a discussion about introducing a few convenient
>>> operations in Table API from the perspective of ease of use.
 
 Currently some tasks are not easy to express in Table API e.g.
>>> deduplication, topn, etc, or not easy to express when there are hundreds of
>>> columns in a table, e.g. null data handling, etc.
 
 I'd like to propose to introduce a few operations in Table API with the
>>> following purposes:
 - Make Table API users to easily leverage the powerful features already
>>> in SQL, e.g. deduplication, topn, etc
 - Provide some convenient operations, e.g. introducing a series of
>>> operations for null data handling (it may become a problem when there are
>>> hundreds of columns), data sampling and splitting (which is a very common
>>> use case in ML which usually needs to split a table into multiple tables
>>> for training and validation separately).
 
 Please refer to FLIP-155 [1] for more details.
 
 Looking forward to your feedback!
 
 Regards,
 Dian
 
 [1]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-155%3A+Introduce+a+few+convenient+operations+in+Table+API
>>> 
>>> 
> 



Re: [DISCUSS] FLIP-155: Introduce a few convenient operations in Table API

2021-01-04 Thread Jark Wu
Thanks Dian,

+1 to `deduplicate`.

Regarding `myTable.coalesce($("a"), 1).as("a")`, I'm afraid it may
conflict/confuse the built-in expression `coalesce(f0, 0)` (we may
introduce it in the future).

Besides that, could we also align other features of Flink SQL, e.g.
event-time/processing-time temporal join, SQL Hints, window TVF (FLIP-145
[1])?

Best,
Jark

[1]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function





On Mon, 4 Jan 2021 at 22:59, Timo Walther  wrote:

> Hi Dian,
>
> thanks for the proposed FLIP. I haven't taken a deep look at the
> proposal yet but will do so shortly. In general, we should aim to make
> the Table API as concise and self-explaining as possible. E.g. `dropna`
> does not sound obvious to me.
>
> Regarding `myTable.coalesce($("a"), 1).as("a")`: Instead of introducing
> more top-level functions, maybe we should also consider introducing more
> building blocks e.g. for applying an expression to every column. A more
> functional approach (e.g. with lamba function) could solve more use cases.
>
> Regards,
> Timo
>
> On 04.01.21 15:35, Seth Wiesman wrote:
> > This makes sense, I have some questions about method names.
> >
> > What do you think about renaming `dropDuplicates` to `deduplicate`? I
> don't
> > think that drop is the right word to use for this operation, it implies
> > records are filtered where this operator actually issues updates and
> > retractions. Also, deduplicate is already how we talk about this feature
> in
> > the docs so I think it would be easier for users to find.
> >
> > For null handling, I don't know how close we want to stick with SQL
> > conventions but what about making `coalesce` a top-level method?
> Something
> > like:
> >
> > myTable.coalesce($("a"), 1).as("a")
> >
> > We can require the next method to be an `as`. There is already precedent
> > for this sort of thing, `GroupedTable#aggregate` can only be followed by
> > `select`.
> >
> > Seth
> >
> > On Mon, Jan 4, 2021 at 6:27 AM Wei Zhong  wrote:
> >
> >> Hi Dian,
> >>
> >> Big +1 for making the Table API easier to use. Java users and Python
> users
> >> can both benefit from it. I think it would be better if we add some
> Python
> >> API examples.
> >>
> >> Best,
> >> Wei
> >>
> >>
> >>> 在 2021年1月4日,20:03,Dian Fu  写道:
> >>>
> >>> Hi all,
> >>>
> >>> I'd like to start a discussion about introducing a few convenient
> >> operations in Table API from the perspective of ease of use.
> >>>
> >>> Currently some tasks are not easy to express in Table API e.g.
> >> deduplication, topn, etc, or not easy to express when there are
> hundreds of
> >> columns in a table, e.g. null data handling, etc.
> >>>
> >>> I'd like to propose to introduce a few operations in Table API with the
> >> following purposes:
> >>> - Make Table API users to easily leverage the powerful features already
> >> in SQL, e.g. deduplication, topn, etc
> >>> - Provide some convenient operations, e.g. introducing a series of
> >> operations for null data handling (it may become a problem when there
> are
> >> hundreds of columns), data sampling and splitting (which is a very
> common
> >> use case in ML which usually needs to split a table into multiple tables
> >> for training and validation separately).
> >>>
> >>> Please refer to FLIP-155 [1] for more details.
> >>>
> >>> Looking forward to your feedback!
> >>>
> >>> Regards,
> >>> Dian
> >>>
> >>> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-155%3A+Introduce+a+few+convenient+operations+in+Table+API
> >>
> >>
> >
>
>


Re: [DISCUSS] Allow streaming operators to use managed memory

2021-01-04 Thread Jark Wu
Hi Aljoscha,

I think we may need to divide `DATAPROC` into `OPERATOR` and
`STATE_BACKEND`, because they have different scope (slot vs. operator).
But @Xintong Song  may have more insights on it.

Best,
Jark


On Mon, 4 Jan 2021 at 20:44, Aljoscha Krettek  wrote:

> I agree, we should allow streaming operators to use managed memory for
> other use cases.
>
> Do you think we need an additional "consumer" setting or that they would
> just use `DATAPROC` and decide by themselves what to use the memory for?
>
> Best,
> Aljoscha
>
> On 2020/12/22 17:14, Jark Wu wrote:
> >Hi all,
> >
> >I found that currently the managed memory can only be used in 3 workloads
> >[1]:
> >- state backends for streaming jobs
> >- sorting, hash tables for batch jobs
> >- python UDFs
> >
> >And the configuration option `taskmanager.memory.managed.consumer-weights`
> >only allows values: PYTHON and DATAPROC (state in streaming or algorithms
> >in batch).
> >I'm confused why it doesn't allow streaming operators to use managed
> memory
> >for purposes other than state backends.
> >
> >The background is that we are planning to use some batch algorithms
> >(sorting & bytes hash table) to improve the performance of streaming SQL
> >operators, especially for the mini-batch operators.
> >Currently, the mini-batch operators are buffering input records and
> >accumulators in heap (i.e. Java HashMap) which is not efficient and there
> >are potential risks of full GC and OOM.
> >With the managed memory, we can fully use the memory to buffer more data
> >without worrying about OOM and improve the performance a lot.
> >
> >What do you think about allowing streaming operators to use managed memory
> >and exposing it in configuration.
> >
> >Best,
> >Jark
> >
> >[1]:
> >
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup_tm.html#managed-memory
>


Re: Support local aggregate push down for Blink batch planner

2021-01-04 Thread Kurt Young
 Sorry for the typo -_-!
I meant idea #2.

Best,
Kurt


On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu  wrote:

> Hi Kurt,
>
> Thx a lot for your feedback. If local aggregation is more like a physical
> operator rather than logical
> operator, I think your suggestion should be idea #2 which handle all in
> the physical optimization phase?
>
> Looking forward for the further discussion.
>
>
> Kurt Young  于2021年1月5日周二 上午9:52写道:
>
>> Local aggregation is more like a physical operator rather than logical
>> operator. I would suggest going with idea #1.
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu 
>> wrote:
>>
>> > Hi Jark, Thx a lot for your quick reply and valuable suggestions.
>> > For (1): Agree: Since we are in the period of upgrading the new table
>> > source api,
>> > we really should consider the new interface for the new optimize rule.
>> If
>> > the new rule
>> > doesn't use the new api, we'll have to upgrade it sooner or later. I
>> have
>> > change to use
>> > the ability interface for the SupportsAggregatePushDown definition in
>> above
>> > proposal.
>> >
>> > For (2): Agree: Change to use CallExpression is a better choice, and
>> have
>> > resolved this
>> > comment in the proposal.
>> >
>> > For (3): I suggest we first support the JDBC connector, as we don't have
>> > Druid connector
>> > and ES connector just has sink api at present.
>> >
>> > But perhaps the biggest question may be whether we should use idea 1 or
>> > idea 2 in proposal.
>> >
>> > What do you think?  After we reach the agreement on the proposal, our
>> team
>> > can drive to
>> > complete this feature.
>> >
>> > Jark Wu  于2020年12月29日周二 下午2:58写道:
>> >
>> > > Hi Sebastian,
>> > >
>> > > Thanks for the proposal. I think this is a great improvement for Flink
>> > SQL.
>> > > I went through the design doc and have following thoughts:
>> > >
>> > > 1) Flink has deprecated the legacy TableSource in 1.11 and proposed a
>> new
>> > >  set of DynamicTableSource interfaces. Could you update your proposal
>> to
>> > > use the new interfaces?
>> > >  Follow the existing ability interfaces, e.g.
>> > > SupportsFilterPushDown, SupportsProjectionPushDown.
>> > >
>> > > 2) Personally, I think CallExpression would be a better representation
>> > than
>> > > separate `FunctionDefinition` and args. Because, it would be easier to
>> > know
>> > > what's the index and type of the arguments.
>> > >
>> > > 3) It would be better to list which connectors will be supported in
>> the
>> > > plan?
>> > >
>> > > Best,
>> > > Jark
>> > >
>> > >
>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu 
>> > wrote:
>> > >
>> > > > Hi all,
>> > > >
>> > > > I'd like to discuss a new feature for the Blink Planner.
>> > > > Aggregate operator of Flink SQL is currently fully done at Flink
>> layer.
>> > > > With the developing of storage, many downstream storage of Flink SQL
>> > has
>> > > > the ability to deal with Aggregation operator.
>> > > > Pushing down Aggregate to data source layer will improve performance
>> > from
>> > > > the perspective of the network IO and computation overhead.
>> > > >
>> > > > I have drafted a design doc for this new feature.
>> > > >
>> > > >
>> > >
>> >
>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>> > > >
>> > > > Any comment or discussion is welcome.
>> > > >
>> > > > --
>> > > >
>> > > > *With kind regards
>> > > > 
>> > > > Sebastian Liu 刘洋
>> > > > Institute of Computing Technology, Chinese Academy of Science
>> > > > Mobile\WeChat: +86—15201613655
>> > > > E-mail: liuyang0...@gmail.com 
>> > > > QQ: 3239559*
>> > > >
>> > >
>> >
>> >
>> > --
>> >
>> > *With kind regards
>> > 
>> > Sebastian Liu 刘洋
>> > Institute of Computing Technology, Chinese Academy of Science
>> > Mobile\WeChat: +86—15201613655
>> > E-mail: liuyang0...@gmail.com 
>> > QQ: 3239559*
>> >
>>
>
>
> --
>
> *With kind regards
> 
> Sebastian Liu 刘洋
> Institute of Computing Technology, Chinese Academy of Science
> Mobile\WeChat: +86—15201613655
> E-mail: liuyang0...@gmail.com 
> QQ: 3239559*
>
>


Re: Support local aggregate push down for Blink batch planner

2021-01-04 Thread Sebastian Liu
Hi Kurt,

Thx a lot for your feedback. If local aggregation is more like a physical
operator rather than logical
operator, I think your suggestion should be idea #2 which handle all in the
physical optimization phase?

Looking forward for the further discussion.


Kurt Young  于2021年1月5日周二 上午9:52写道:

> Local aggregation is more like a physical operator rather than logical
> operator. I would suggest going with idea #1.
>
> Best,
> Kurt
>
>
> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu 
> wrote:
>
> > Hi Jark, Thx a lot for your quick reply and valuable suggestions.
> > For (1): Agree: Since we are in the period of upgrading the new table
> > source api,
> > we really should consider the new interface for the new optimize rule. If
> > the new rule
> > doesn't use the new api, we'll have to upgrade it sooner or later. I have
> > change to use
> > the ability interface for the SupportsAggregatePushDown definition in
> above
> > proposal.
> >
> > For (2): Agree: Change to use CallExpression is a better choice, and have
> > resolved this
> > comment in the proposal.
> >
> > For (3): I suggest we first support the JDBC connector, as we don't have
> > Druid connector
> > and ES connector just has sink api at present.
> >
> > But perhaps the biggest question may be whether we should use idea 1 or
> > idea 2 in proposal.
> >
> > What do you think?  After we reach the agreement on the proposal, our
> team
> > can drive to
> > complete this feature.
> >
> > Jark Wu  于2020年12月29日周二 下午2:58写道:
> >
> > > Hi Sebastian,
> > >
> > > Thanks for the proposal. I think this is a great improvement for Flink
> > SQL.
> > > I went through the design doc and have following thoughts:
> > >
> > > 1) Flink has deprecated the legacy TableSource in 1.11 and proposed a
> new
> > >  set of DynamicTableSource interfaces. Could you update your proposal
> to
> > > use the new interfaces?
> > >  Follow the existing ability interfaces, e.g.
> > > SupportsFilterPushDown, SupportsProjectionPushDown.
> > >
> > > 2) Personally, I think CallExpression would be a better representation
> > than
> > > separate `FunctionDefinition` and args. Because, it would be easier to
> > know
> > > what's the index and type of the arguments.
> > >
> > > 3) It would be better to list which connectors will be supported in the
> > > plan?
> > >
> > > Best,
> > > Jark
> > >
> > >
> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu 
> > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I'd like to discuss a new feature for the Blink Planner.
> > > > Aggregate operator of Flink SQL is currently fully done at Flink
> layer.
> > > > With the developing of storage, many downstream storage of Flink SQL
> > has
> > > > the ability to deal with Aggregation operator.
> > > > Pushing down Aggregate to data source layer will improve performance
> > from
> > > > the perspective of the network IO and computation overhead.
> > > >
> > > > I have drafted a design doc for this new feature.
> > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
> > > >
> > > > Any comment or discussion is welcome.
> > > >
> > > > --
> > > >
> > > > *With kind regards
> > > > 
> > > > Sebastian Liu 刘洋
> > > > Institute of Computing Technology, Chinese Academy of Science
> > > > Mobile\WeChat: +86—15201613655
> > > > E-mail: liuyang0...@gmail.com 
> > > > QQ: 3239559*
> > > >
> > >
> >
> >
> > --
> >
> > *With kind regards
> > 
> > Sebastian Liu 刘洋
> > Institute of Computing Technology, Chinese Academy of Science
> > Mobile\WeChat: +86—15201613655
> > E-mail: liuyang0...@gmail.com 
> > QQ: 3239559*
> >
>


-- 

*With kind regards

Sebastian Liu 刘洋
Institute of Computing Technology, Chinese Academy of Science
Mobile\WeChat: +86—15201613655
E-mail: liuyang0...@gmail.com 
QQ: 3239559*


Re: Support local aggregate push down for Blink batch planner

2021-01-04 Thread Kurt Young
Local aggregation is more like a physical operator rather than logical
operator. I would suggest going with idea #1.

Best,
Kurt


On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu  wrote:

> Hi Jark, Thx a lot for your quick reply and valuable suggestions.
> For (1): Agree: Since we are in the period of upgrading the new table
> source api,
> we really should consider the new interface for the new optimize rule. If
> the new rule
> doesn't use the new api, we'll have to upgrade it sooner or later. I have
> change to use
> the ability interface for the SupportsAggregatePushDown definition in above
> proposal.
>
> For (2): Agree: Change to use CallExpression is a better choice, and have
> resolved this
> comment in the proposal.
>
> For (3): I suggest we first support the JDBC connector, as we don't have
> Druid connector
> and ES connector just has sink api at present.
>
> But perhaps the biggest question may be whether we should use idea 1 or
> idea 2 in proposal.
>
> What do you think?  After we reach the agreement on the proposal, our team
> can drive to
> complete this feature.
>
> Jark Wu  于2020年12月29日周二 下午2:58写道:
>
> > Hi Sebastian,
> >
> > Thanks for the proposal. I think this is a great improvement for Flink
> SQL.
> > I went through the design doc and have following thoughts:
> >
> > 1) Flink has deprecated the legacy TableSource in 1.11 and proposed a new
> >  set of DynamicTableSource interfaces. Could you update your proposal to
> > use the new interfaces?
> >  Follow the existing ability interfaces, e.g.
> > SupportsFilterPushDown, SupportsProjectionPushDown.
> >
> > 2) Personally, I think CallExpression would be a better representation
> than
> > separate `FunctionDefinition` and args. Because, it would be easier to
> know
> > what's the index and type of the arguments.
> >
> > 3) It would be better to list which connectors will be supported in the
> > plan?
> >
> > Best,
> > Jark
> >
> >
> > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu 
> wrote:
> >
> > > Hi all,
> > >
> > > I'd like to discuss a new feature for the Blink Planner.
> > > Aggregate operator of Flink SQL is currently fully done at Flink layer.
> > > With the developing of storage, many downstream storage of Flink SQL
> has
> > > the ability to deal with Aggregation operator.
> > > Pushing down Aggregate to data source layer will improve performance
> from
> > > the perspective of the network IO and computation overhead.
> > >
> > > I have drafted a design doc for this new feature.
> > >
> > >
> >
> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
> > >
> > > Any comment or discussion is welcome.
> > >
> > > --
> > >
> > > *With kind regards
> > > 
> > > Sebastian Liu 刘洋
> > > Institute of Computing Technology, Chinese Academy of Science
> > > Mobile\WeChat: +86—15201613655
> > > E-mail: liuyang0...@gmail.com 
> > > QQ: 3239559*
> > >
> >
>
>
> --
>
> *With kind regards
> 
> Sebastian Liu 刘洋
> Institute of Computing Technology, Chinese Academy of Science
> Mobile\WeChat: +86—15201613655
> E-mail: liuyang0...@gmail.com 
> QQ: 3239559*
>


[jira] [Created] (FLINK-20847) Update CompletedCheckpointStore.shutdown() signature

2021-01-04 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-20847:
-

 Summary: Update CompletedCheckpointStore.shutdown() signature
 Key: FLINK-20847
 URL: https://issues.apache.org/jira/browse/FLINK-20847
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
 Fix For: 1.13.0


# remove unused postCleanup argument
 # add javadoc for checkpointsCleaner



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20846) Decouple checkpoint services from CheckpointCoordinator

2021-01-04 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-20846:
-

 Summary: Decouple checkpoint services from CheckpointCoordinator
 Key: FLINK-20846
 URL: https://issues.apache.org/jira/browse/FLINK-20846
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.13.0


In order to reuse checkpoints across different {{ExecutionGraph}} instances 
(w/o HA), we need to decouple the checkpoint services 
{{CompletedCheckpointStore}} and {{CheckpointIDCounter}} from the 
{{CheckpointCoordinator}}. The reason is that the {{CheckpointCoordinator}} is 
bound to the lifetime of an {{ExecutionGraph}} and shuts the services down once 
the {{ExecutionGraph}} reaches a terminal state. Once this is no longer the 
case, it would be possible to create a new {{ExecutionGraph}} which can resume 
from a previous checkpoint if the appropriate {{CompletedCheckpointStore}} is 
provided.

For future scheduler implementations which want to reuse the {{ExecutionGraph}} 
but need to create different instances this is a requirement to support state 
recovery.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-155: Introduce a few convenient operations in Table API

2021-01-04 Thread Timo Walther

Hi Dian,

thanks for the proposed FLIP. I haven't taken a deep look at the 
proposal yet but will do so shortly. In general, we should aim to make 
the Table API as concise and self-explaining as possible. E.g. `dropna` 
does not sound obvious to me.


Regarding `myTable.coalesce($("a"), 1).as("a")`: Instead of introducing 
more top-level functions, maybe we should also consider introducing more 
building blocks e.g. for applying an expression to every column. A more 
functional approach (e.g. with lamba function) could solve more use cases.


Regards,
Timo

On 04.01.21 15:35, Seth Wiesman wrote:

This makes sense, I have some questions about method names.

What do you think about renaming `dropDuplicates` to `deduplicate`? I don't
think that drop is the right word to use for this operation, it implies
records are filtered where this operator actually issues updates and
retractions. Also, deduplicate is already how we talk about this feature in
the docs so I think it would be easier for users to find.

For null handling, I don't know how close we want to stick with SQL
conventions but what about making `coalesce` a top-level method? Something
like:

myTable.coalesce($("a"), 1).as("a")

We can require the next method to be an `as`. There is already precedent
for this sort of thing, `GroupedTable#aggregate` can only be followed by
`select`.

Seth

On Mon, Jan 4, 2021 at 6:27 AM Wei Zhong  wrote:


Hi Dian,

Big +1 for making the Table API easier to use. Java users and Python users
can both benefit from it. I think it would be better if we add some Python
API examples.

Best,
Wei



在 2021年1月4日,20:03,Dian Fu  写道:

Hi all,

I'd like to start a discussion about introducing a few convenient

operations in Table API from the perspective of ease of use.


Currently some tasks are not easy to express in Table API e.g.

deduplication, topn, etc, or not easy to express when there are hundreds of
columns in a table, e.g. null data handling, etc.


I'd like to propose to introduce a few operations in Table API with the

following purposes:

- Make Table API users to easily leverage the powerful features already

in SQL, e.g. deduplication, topn, etc

- Provide some convenient operations, e.g. introducing a series of

operations for null data handling (it may become a problem when there are
hundreds of columns), data sampling and splitting (which is a very common
use case in ML which usually needs to split a table into multiple tables
for training and validation separately).


Please refer to FLIP-155 [1] for more details.

Looking forward to your feedback!

Regards,
Dian

[1]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-155%3A+Introduce+a+few+convenient+operations+in+Table+API








[ANNOUNCE] Changes to generated avro files

2021-01-04 Thread Chesnay Schepler

Hello,

in FLINK-20790 we have changed where generated Avro files are 
placed.Until then they were put directly under the src/ tree, with some 
committed to git, other being ignored via .gitignore.


This has caused issues when switching branches (specifically, not being 
able to compile 1.11 after having compiled the master branch), because 
these files stayed around and were not deleted by maven. The only way to 
resolve the issue was a manual deletion of said files.


These files are now put under target/generated[-test]-sources/ 
directories, which means they are now deleted when doing a mvn clean. 
The corresponding .gitignore entries have been removed.


Because of that, after a rebase on the latest 
master/release-1.12/release-1.11 branches you might find that git tells 
you that there are some new files all of a sudden. These are remnants 
from before this change, and can be safely removed, either manually or 
via git clean -fd.


In practice this should have little effect on your work, but you may 
have to run mvn generate[-test]-sources from time to time when switching 
between versions.


Regards,

Chesnay



Re: Task manger isn’t initiating with defined values in Flink 1.11 version as part of EMR 6.1.0

2021-01-04 Thread DEEP NARAYAN Singh
Thanks Till, for the detailed explanation.I  tried and it is working fine.

Once again thanks for your quick response.

Regards,
-Deep

On Mon, 4 Jan, 2021, 2:20 PM Till Rohrmann,  wrote:

> Hi Deep,
>
> Flink has dropped support for specifying the number of TMs via -n since the
> introduction of Flip-6. Since then, Flink will automatically start TMs
> depending on the required resources. Hence, there is no need to specify the
> -n parameter anymore. Instead, you should specify the parallelism with
> which you would like to run your job via the -p option.
>
> Since Flink 1.11.0 there is the option slotmanager.number-of-slots.max to
> limit the upper limit of slots a cluster is allowed to allocate [1].
>
> [1] https://issues.apache.org/jira/browse/FLINK-16605
>
> Cheers,
> Till
>
> On Mon, Jan 4, 2021 at 8:33 AM DEEP NARAYAN Singh 
> wrote:
>
> > Hi Guys,
> >
> > I’m struggling while initiating the task manager with flink 1.11.0 in AWS
> > EMR but with older versions it is not. Let me put the full context here.
> >
> > *When using Flink 1.9.1 and EMR 5.29.0*
> >
> > To create a long running session, we used the below command.
> >
> > *sudo flink-yarn-session -n  -s  -jm
> 
> > -tm  -d*
> >
> > and followed by below command to run the final job.
> >
> > *flink run -m yarn-cluster -yid  -yn  -ys
> >  -yjm  -ytm  -c  *
> >
> > and if “n” is 6 then it is used to create 6 task managers to start the
> job,
> > so whatever “n” is configured the result was that number of TM the job is
> > being started.
> >
> > But Now when we scaled up with the configuration (*i.e. Flink 1.11.0 and
> > EMR 6.1.0*) we are unable to achieve the desired values for TM.
> >
> > Please find the session Ids of new configuration,
> >
> > *sudo flink-yarn-session -Djobmanager.memory.process.size=
> > -Dtaskmanager.memory.process.size= -n  -s  > slot/core> -d*
> >
> > And the final Job command
> >
> > *flink run -m yarn-cluster -yid  -c   > Path>*
> >
> > I have tried a lot of combinations, but nothing worked out so far. I
> > request your help in this regard as the plan to have this configuration
> in
> > *PRODUCTION* soon.
> >
> > Thanks in advance.
> >
> >
> > Regards,
> >
> > -Deep
> >
>


Re: [DISCUSS] FLIP-155: Introduce a few convenient operations in Table API

2021-01-04 Thread Seth Wiesman
This makes sense, I have some questions about method names.

What do you think about renaming `dropDuplicates` to `deduplicate`? I don't
think that drop is the right word to use for this operation, it implies
records are filtered where this operator actually issues updates and
retractions. Also, deduplicate is already how we talk about this feature in
the docs so I think it would be easier for users to find.

For null handling, I don't know how close we want to stick with SQL
conventions but what about making `coalesce` a top-level method? Something
like:

myTable.coalesce($("a"), 1).as("a")

We can require the next method to be an `as`. There is already precedent
for this sort of thing, `GroupedTable#aggregate` can only be followed by
`select`.

Seth

On Mon, Jan 4, 2021 at 6:27 AM Wei Zhong  wrote:

> Hi Dian,
>
> Big +1 for making the Table API easier to use. Java users and Python users
> can both benefit from it. I think it would be better if we add some Python
> API examples.
>
> Best,
> Wei
>
>
> > 在 2021年1月4日,20:03,Dian Fu  写道:
> >
> > Hi all,
> >
> > I'd like to start a discussion about introducing a few convenient
> operations in Table API from the perspective of ease of use.
> >
> > Currently some tasks are not easy to express in Table API e.g.
> deduplication, topn, etc, or not easy to express when there are hundreds of
> columns in a table, e.g. null data handling, etc.
> >
> > I'd like to propose to introduce a few operations in Table API with the
> following purposes:
> > - Make Table API users to easily leverage the powerful features already
> in SQL, e.g. deduplication, topn, etc
> > - Provide some convenient operations, e.g. introducing a series of
> operations for null data handling (it may become a problem when there are
> hundreds of columns), data sampling and splitting (which is a very common
> use case in ML which usually needs to split a table into multiple tables
> for training and validation separately).
> >
> > Please refer to FLIP-155 [1] for more details.
> >
> > Looking forward to your feedback!
> >
> > Regards,
> > Dian
> >
> > [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-155%3A+Introduce+a+few+convenient+operations+in+Table+API
>
>


[jira] [Created] (FLINK-20845) Drop support for Scala 2.11

2021-01-04 Thread Nick Burkard (Jira)
Nick Burkard created FLINK-20845:


 Summary: Drop support for Scala 2.11
 Key: FLINK-20845
 URL: https://issues.apache.org/jira/browse/FLINK-20845
 Project: Flink
  Issue Type: Sub-task
  Components: API / Scala
Reporter: Nick Burkard


The first step to adding support for Scala 2.13 is to drop Scala 2.11. 
Community discussion can be found 
[here|https://lists.apache.org/thread.html/ra817c5b54e3de48d80e5b4e0ae67941d387ee25cf9779f5ae37d0486%40%3Cdev.flink.apache.org%3E].

* Scala 2.11 was released in November 2017 and is quite old now. Most 
open-source libraries no longer build for it.
* Upgrading libraries to support 2.13 will be much easier without 2.11. Many do 
not support 2.11, 2.12 and 2.13 at the same time, so this is basically required 
to get 2.13 support.

Considerations:

* The Flink Scala Shell submodule still does not support Scala 2.12. It isn't a 
strict dependency for dropping Scala 2.11, but would be nice to have before 
making the cut.
* Stateful functions previously needed Scala 2.11, but it looks like it [now 
supports 2.12|https://github.com/apache/flink-statefun/pull/149].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20844) Support add jar in SQL client

2021-01-04 Thread Rui Li (Jira)
Rui Li created FLINK-20844:
--

 Summary: Support add jar in SQL client
 Key: FLINK-20844
 URL: https://issues.apache.org/jira/browse/FLINK-20844
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Client
Reporter: Rui Li






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Allow streaming operators to use managed memory

2021-01-04 Thread Aljoscha Krettek
I agree, we should allow streaming operators to use managed memory for 
other use cases.


Do you think we need an additional "consumer" setting or that they would 
just use `DATAPROC` and decide by themselves what to use the memory for?


Best,
Aljoscha

On 2020/12/22 17:14, Jark Wu wrote:

Hi all,

I found that currently the managed memory can only be used in 3 workloads
[1]:
- state backends for streaming jobs
- sorting, hash tables for batch jobs
- python UDFs

And the configuration option `taskmanager.memory.managed.consumer-weights`
only allows values: PYTHON and DATAPROC (state in streaming or algorithms
in batch).
I'm confused why it doesn't allow streaming operators to use managed memory
for purposes other than state backends.

The background is that we are planning to use some batch algorithms
(sorting & bytes hash table) to improve the performance of streaming SQL
operators, especially for the mini-batch operators.
Currently, the mini-batch operators are buffering input records and
accumulators in heap (i.e. Java HashMap) which is not efficient and there
are potential risks of full GC and OOM.
With the managed memory, we can fully use the memory to buffer more data
without worrying about OOM and improve the performance a lot.

What do you think about allowing streaming operators to use managed memory
and exposing it in configuration.

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/deployment/memory/mem_setup_tm.html#managed-memory


Re: [DISCUSS] FLIP-155: Introduce a few convenient operations in Table API

2021-01-04 Thread Wei Zhong
Hi Dian,

Big +1 for making the Table API easier to use. Java users and Python users can 
both benefit from it. I think it would be better if we add some Python API 
examples. 

Best,
Wei


> 在 2021年1月4日,20:03,Dian Fu  写道:
> 
> Hi all,
> 
> I'd like to start a discussion about introducing a few convenient operations 
> in Table API from the perspective of ease of use. 
> 
> Currently some tasks are not easy to express in Table API e.g. deduplication, 
> topn, etc, or not easy to express when there are hundreds of columns in a 
> table, e.g. null data handling, etc.
> 
> I'd like to propose to introduce a few operations in Table API with the 
> following purposes:
> - Make Table API users to easily leverage the powerful features already in 
> SQL, e.g. deduplication, topn, etc
> - Provide some convenient operations, e.g. introducing a series of operations 
> for null data handling (it may become a problem when there are hundreds of 
> columns), data sampling and splitting (which is a very common use case in ML 
> which usually needs to split a table into multiple tables for training and 
> validation separately).
> 
> Please refer to FLIP-155 [1] for more details.
> 
> Looking forward to your feedback!
> 
> Regards,
> Dian
> 
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-155%3A+Introduce+a+few+convenient+operations+in+Table+API



[DISCUSS] FLIP-155: Introduce a few convenient operations in Table API

2021-01-04 Thread Dian Fu
Hi all,

I'd like to start a discussion about introducing a few convenient operations in 
Table API from the perspective of ease of use. 

Currently some tasks are not easy to express in Table API e.g. deduplication, 
topn, etc, or not easy to express when there are hundreds of columns in a 
table, e.g. null data handling, etc.

I'd like to propose to introduce a few operations in Table API with the 
following purposes:
- Make Table API users to easily leverage the powerful features already in SQL, 
e.g. deduplication, topn, etc
- Provide some convenient operations, e.g. introducing a series of operations 
for null data handling (it may become a problem when there are hundreds of 
columns), data sampling and splitting (which is a very common use case in ML 
which usually needs to split a table into multiple tables for training and 
validation separately).

Please refer to FLIP-155 [1] for more details.

Looking forward to your feedback!

Regards,
Dian

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-155%3A+Introduce+a+few+convenient+operations+in+Table+API

[jira] [Created] (FLINK-20843) UnalignedCheckpointITCase is unstable

2021-01-04 Thread Aljoscha Krettek (Jira)
Aljoscha Krettek created FLINK-20843:


 Summary: UnalignedCheckpointITCase is unstable
 Key: FLINK-20843
 URL: https://issues.apache.org/jira/browse/FLINK-20843
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Reporter: Aljoscha Krettek


https://dev.azure.com/aljoschakrettek/Flink/_build/results?buildId=493=logs=6e55a443-5252-5db5-c632-109baf464772=9df6efca-61d0-513a-97ad-edb76d85786a=9432



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-153: Support state access in Python DataStream API

2021-01-04 Thread Shuiqiang Chen
Hi Yu,

Thanks a lot for your suggestions.

I have addressed your inlined comments in the FLIP and also added a new
section "State backed access synchronization" that explains the way to make
sure there is no concurrent access to the state backend. Please have a look.

Best,
Shuiqiang


Yu Li  于2021年1月4日周一 下午4:15写道:

> Thanks for driving the discussion Shuiqiang, and sorry for chiming in late.
>
> *bq. However, all the state access will be synchronized in the Java
> operator and so there will be no concurrent access to the state backend.*
> Could you add a section to explicitly mention this in the FLIP document? I
> think single-threaded state access is an important prerequisite and it's
> important for later contributors to know about this clearly, from both the
> design doc and source codes.
>
> The other parts LGTM, added some minor inline comments in the FLIP, please
> take a look.
>
> Thanks.
>
> Best Regards,
> Yu
>
>
> On Fri, 18 Dec 2020 at 15:10, Shuiqiang Chen  wrote:
>
> > Hi wei,
> >
> > Big thanks for pointing out the mistakes! I have updated the FLIP
> > according to your suggestions.
> >
> > Best,
> > Shuiqiang
> >
> > > 在 2020年12月18日,下午2:37,Wei Zhong  写道:
> > >
> > > Hi Shuiqiang,
> > >
> > > Thanks for driving this. +1 for this feature, just a minor comment to
> > the design doc.
> > >
> > > The interface of the `AppendingState` should be:
> > >
> > > class AppendingState(State, Generic[IN, OUT]):
> > >
> > >   @abstractmethod
> > >   def get(self) -> OUT:
> > >   pass
> > >
> > >   @abstractmethod
> > >   def add(self, value: IN) -> None:
> > >   pass
> > >
> > > The output type and the input type of the `AppendingState` maybe
> > different. And the definition of the child classes should be:
> > >
> > > class MergingState(AppendingState[IN, OUT]):
> > >pass
> > >
> > >
> > > class ListState(MergingState[T, Iterable[T]]):
> > >
> > >   @abstractmethod
> > >   def update(self, values: List[T]) -> None:
> > >   pass
> > >
> > >   @abstractmethod
> > >   def add_all(self, values: List[T]) -> None:
> > >   pass
> > >
> > >   def __iter__(self) -> Iterator[T]:
> > >   return iter(self.get())
> > >
> > > Best,
> > > Wei
> > >
> > >> 在 2020年12月17日,21:06,Shuiqiang Chen  写道:
> > >>
> > >> Hi Yun,
> > >>
> > >> Highly appreciate for your questions! I have the corresponding answers
> > as bellow:
> > >>
> > >> Re 1: You are right that the state access occurs in an async thread.
> > However, all the state access will be synchrouzed in the Java operator
> and
> > so there will be no concurrent access to the state backend.
> > >>
> > >> Re 2: I think it could be handled well in Python DataStream API. In
> > this case, there will be two operators and so also two keyed state
> backend.
> > >>
> > >> Re 3: Sure, you are right. We will store the current key which may be
> > used by the timer.
> > >>
> > >> Re 4: Good point. State migration is still not covered in the current
> > FLIP. I'd like to cover it in a separate FLIP as it should be orthogonal
> to
> > this FLIP. I have updated the FLIP and added clear description for this.
> > >>
> > >> Re 5: Good point. We may need to introduce a Python querable state
> > client if we want to support Queryable state for Python operators. I'd
> like
> > to cover it in a separate FLIP. I have updated the FLIP and add it as a
> > future work.
> > >>
> > >> Best,
> > >> Shuiqiang
> > >>
> > >>> 在 2020年12月17日,下午12:08,Yun Tang  写道:
> > >>>
> > >>> Hi Shuiqiang,
> > >>>
> > >>> Thanks for driving this. I have several questions below:
> > >>>
> > >>>
> > >>> 1.  Thread safety of state write-access. As you might know, state
> > access is not thread-safe [1] in Flink, we depend on task single thread
> > access. Since you change the state access to another async thread, can we
> > still ensure this? It also includes not allow user to access state in its
> > java operator along with the bundled python operator.
> > >>> 2.  Number of keyed state backend per task. Flink would only have one
> > keyed state-backend per operator and would only have one keyed state
> > backend per operator chain (in the head operator if possible). However,
> > once we use experimental features such as reinterpretAsKeyedStream [2],
> we
> > could have two keyed state-backend in one operator chain within one task.
> > Can python datastream API could handle this well?
> > >>> 3.  Time to set current key. As we still need current key when
> > registering timer [3], we need some place to hole the current key even
> not
> > registered in keyed state backend.
> > >>> 4.  State migration. Flink supports to migrate state automatically if
> > new provided serializer is compatible with old serializer[4]. I'm afraid
> if
> > python data stream API wraps user's serializer as
> > BytePrimitiveArraySerializer, we will lose such functionality. Moreover,
> > RocksDB will migrate state automatically on java side [5] and this will
> > break if python related bytes involved.
> > >>> 5.  

[jira] [Created] (FLINK-20842) Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema

2021-01-04 Thread zhouenning (Jira)
zhouenning created FLINK-20842:
--

 Summary: Data row is smaller than a column index, internal schema 
representation is probably out of sync with real database schema
 Key: FLINK-20842
 URL: https://issues.apache.org/jira/browse/FLINK-20842
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.11.1
 Environment: FLINK VERSION 1.11.1
SCALA 2.11.1

KAFKA 2.11

MYSQL 5.6
com.alibaba.ververica:flink-connector-mysql-cdc:1.1.0
Reporter: zhouenning
 Fix For: 1.11.1
 Attachments: error.log

I try to use FLINK CDC Sink Kafka ,The error  has been prompted 'Data row is 
smaller than a column index, internal schema representation is probably out of 
sync with real database schema', it should be the table structure change caused 
the program to stop.
I try to use the latest 
com.alibaba.ververica:flink-connector-mysql-cdc:1.1.0,Error is still happend

When I change other database instance  who's new is ok
How can I skip this error and let the program continue



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20841) Fix compile error due to duplicated generated files

2021-01-04 Thread Matthias (Jira)
Matthias created FLINK-20841:


 Summary: Fix compile error due to duplicated generated files
 Key: FLINK-20841
 URL: https://issues.apache.org/jira/browse/FLINK-20841
 Project: Flink
  Issue Type: Bug
Reporter: Matthias
Assignee: Matthias


FLINK-20790 caused a compile error due to duplicated generated Java files. The 
previous location for the file generation was not removed properly as the 
folder appeared in Flink's {{.gitignore}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Task manger isn’t initiating with defined values in Flink 1.11 version as part of EMR 6.1.0

2021-01-04 Thread Till Rohrmann
Hi Deep,

Flink has dropped support for specifying the number of TMs via -n since the
introduction of Flip-6. Since then, Flink will automatically start TMs
depending on the required resources. Hence, there is no need to specify the
-n parameter anymore. Instead, you should specify the parallelism with
which you would like to run your job via the -p option.

Since Flink 1.11.0 there is the option slotmanager.number-of-slots.max to
limit the upper limit of slots a cluster is allowed to allocate [1].

[1] https://issues.apache.org/jira/browse/FLINK-16605

Cheers,
Till

On Mon, Jan 4, 2021 at 8:33 AM DEEP NARAYAN Singh 
wrote:

> Hi Guys,
>
> I’m struggling while initiating the task manager with flink 1.11.0 in AWS
> EMR but with older versions it is not. Let me put the full context here.
>
> *When using Flink 1.9.1 and EMR 5.29.0*
>
> To create a long running session, we used the below command.
>
> *sudo flink-yarn-session -n  -s  -jm 
> -tm  -d*
>
> and followed by below command to run the final job.
>
> *flink run -m yarn-cluster -yid  -yn  -ys
>  -yjm  -ytm  -c  *
>
> and if “n” is 6 then it is used to create 6 task managers to start the job,
> so whatever “n” is configured the result was that number of TM the job is
> being started.
>
> But Now when we scaled up with the configuration (*i.e. Flink 1.11.0 and
> EMR 6.1.0*) we are unable to achieve the desired values for TM.
>
> Please find the session Ids of new configuration,
>
> *sudo flink-yarn-session -Djobmanager.memory.process.size=
> -Dtaskmanager.memory.process.size= -n  -s  slot/core> -d*
>
> And the final Job command
>
> *flink run -m yarn-cluster -yid  -c   Path>*
>
> I have tried a lot of combinations, but nothing worked out so far. I
> request your help in this regard as the plan to have this configuration in
> *PRODUCTION* soon.
>
> Thanks in advance.
>
>
> Regards,
>
> -Deep
>


[jira] [Created] (FLINK-20840) Projection pushdown doesn't work in temporal(lookup) join

2021-01-04 Thread Leonard Xu (Jira)
Leonard Xu created FLINK-20840:
--

 Summary: Projection pushdown doesn't work in temporal(lookup) join 
 Key: FLINK-20840
 URL: https://issues.apache.org/jira/browse/FLINK-20840
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.12.0
Reporter: Leonard Xu


{code:java}
sql: 
|SELECT T.*, D.id
|FROM MyTable AS T
|JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
|ON T.a = D.id

optmized plan:
Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id])
+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], async=[false], lookup=[id=a], select=[a, b, c, proctime, 
rowtime, id, name, age])
   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20839) some mistakes in state.md and state_zh.md

2021-01-04 Thread wym_maozi (Jira)
wym_maozi created FLINK-20839:
-

 Summary: some mistakes in state.md and state_zh.md 
 Key: FLINK-20839
 URL: https://issues.apache.org/jira/browse/FLINK-20839
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.12.0
Reporter: wym_maozi
 Attachments: Snipaste_2021-01-04_16-13-05.png

When creating a ListStateDescriptor object, the generic type information is 
inconsistent with the specified state type information:
The generic type of ListStateDescriptor is: *Tuple*, but the 
specified type information is: *Tuple.* Details as attached



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-153: Support state access in Python DataStream API

2021-01-04 Thread Yu Li
Thanks for driving the discussion Shuiqiang, and sorry for chiming in late.

*bq. However, all the state access will be synchronized in the Java
operator and so there will be no concurrent access to the state backend.*
Could you add a section to explicitly mention this in the FLIP document? I
think single-threaded state access is an important prerequisite and it's
important for later contributors to know about this clearly, from both the
design doc and source codes.

The other parts LGTM, added some minor inline comments in the FLIP, please
take a look.

Thanks.

Best Regards,
Yu


On Fri, 18 Dec 2020 at 15:10, Shuiqiang Chen  wrote:

> Hi wei,
>
> Big thanks for pointing out the mistakes! I have updated the FLIP
> according to your suggestions.
>
> Best,
> Shuiqiang
>
> > 在 2020年12月18日,下午2:37,Wei Zhong  写道:
> >
> > Hi Shuiqiang,
> >
> > Thanks for driving this. +1 for this feature, just a minor comment to
> the design doc.
> >
> > The interface of the `AppendingState` should be:
> >
> > class AppendingState(State, Generic[IN, OUT]):
> >
> >   @abstractmethod
> >   def get(self) -> OUT:
> >   pass
> >
> >   @abstractmethod
> >   def add(self, value: IN) -> None:
> >   pass
> >
> > The output type and the input type of the `AppendingState` maybe
> different. And the definition of the child classes should be:
> >
> > class MergingState(AppendingState[IN, OUT]):
> >pass
> >
> >
> > class ListState(MergingState[T, Iterable[T]]):
> >
> >   @abstractmethod
> >   def update(self, values: List[T]) -> None:
> >   pass
> >
> >   @abstractmethod
> >   def add_all(self, values: List[T]) -> None:
> >   pass
> >
> >   def __iter__(self) -> Iterator[T]:
> >   return iter(self.get())
> >
> > Best,
> > Wei
> >
> >> 在 2020年12月17日,21:06,Shuiqiang Chen  写道:
> >>
> >> Hi Yun,
> >>
> >> Highly appreciate for your questions! I have the corresponding answers
> as bellow:
> >>
> >> Re 1: You are right that the state access occurs in an async thread.
> However, all the state access will be synchrouzed in the Java operator and
> so there will be no concurrent access to the state backend.
> >>
> >> Re 2: I think it could be handled well in Python DataStream API. In
> this case, there will be two operators and so also two keyed state backend.
> >>
> >> Re 3: Sure, you are right. We will store the current key which may be
> used by the timer.
> >>
> >> Re 4: Good point. State migration is still not covered in the current
> FLIP. I'd like to cover it in a separate FLIP as it should be orthogonal to
> this FLIP. I have updated the FLIP and added clear description for this.
> >>
> >> Re 5: Good point. We may need to introduce a Python querable state
> client if we want to support Queryable state for Python operators. I'd like
> to cover it in a separate FLIP. I have updated the FLIP and add it as a
> future work.
> >>
> >> Best,
> >> Shuiqiang
> >>
> >>> 在 2020年12月17日,下午12:08,Yun Tang  写道:
> >>>
> >>> Hi Shuiqiang,
> >>>
> >>> Thanks for driving this. I have several questions below:
> >>>
> >>>
> >>> 1.  Thread safety of state write-access. As you might know, state
> access is not thread-safe [1] in Flink, we depend on task single thread
> access. Since you change the state access to another async thread, can we
> still ensure this? It also includes not allow user to access state in its
> java operator along with the bundled python operator.
> >>> 2.  Number of keyed state backend per task. Flink would only have one
> keyed state-backend per operator and would only have one keyed state
> backend per operator chain (in the head operator if possible). However,
> once we use experimental features such as reinterpretAsKeyedStream [2], we
> could have two keyed state-backend in one operator chain within one task.
> Can python datastream API could handle this well?
> >>> 3.  Time to set current key. As we still need current key when
> registering timer [3], we need some place to hole the current key even not
> registered in keyed state backend.
> >>> 4.  State migration. Flink supports to migrate state automatically if
> new provided serializer is compatible with old serializer[4]. I'm afraid if
> python data stream API wraps user's serializer as
> BytePrimitiveArraySerializer, we will lose such functionality. Moreover,
> RocksDB will migrate state automatically on java side [5] and this will
> break if python related bytes involved.
> >>> 5.  Queryable state client. Currently, we only have java-based
> queryable state client [6], and we need another python-based queryable
> state client if involved python bytes.
> >>>
> >>> [1] https://issues.apache.org/jira/browse/FLINK-13072
> >>> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
> >>> [3]
>