Re: [DISCUSS] Shall casting functions return null or throw exceptions for invalid input

2021-11-18 Thread Kurt Young
Sorry I forgot to add user ML. I also would like to gather some users
feedback on this thing.
Since I didn't get any feedback on this topic before from users.

Best,
Kurt


On Thu, Nov 18, 2021 at 6:33 PM Kurt Young  wrote:

> (added user ML to this thread)
>
> HI all,
>
> I would like to raise a different opinion about this change. I agree with
> Ingo that
> we should not just break some existing behavior, and even if we introduce
> an
> option to control the behavior, i would propose to set the default value
> to current
> behavior.
>
> I want to mention one angle to assess whether we should change it or not,
> which
> is "what could users benefit from the changes". To me, it looks like:
>
> * new users: happy about the behavior
> * existing users: suffer from the change, it either cause them to modify
> the SQL or
> got a call in late night reporting his online job got crashed and couldn't
> be able to
> restart.
>
> I would like to quote another breaking change we did when we adjust the
> time-related
> function in FLIP-162 [1]. In that case, both new users and existing users
> are suffered
> from *incorrectly* implemented time function behavior, and we saw a lots
> of feedbacks and
> complains from various channels. After we fixed that, we never saw related
> problems again.
>
> Back to this topic, do we ever seen a user complain about current CAST
> behavior? Form my
> side, no.
>
> To summarize:
>
> +1 to introduce TRY_CAST to better prepare for the future.
> -1 to modify the default behavior.
> +0 to introduce a config option, but with the default value to existing
> behavior. it's +0 because it
> seems not necessary if i'm -1 to change the default behavior and also
> don't see an urgent to modify.
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-162%3A+Consistent+Flink+SQL+time+function+behavior
>
> Best,
> Kurt
>
>
> On Thu, Nov 18, 2021 at 4:26 PM Ingo Bürk  wrote:
>
>> Hi,
>>
>> first of all, thanks for the summary of both sides, and for bringing up
>> the
>> discussion on this.
>> I think it is obvious that this is not something we can just "break", so
>> the config option seems mandatory to me.
>>
>> Overall I agree with Martijn and Till that throwing errors is the more
>> expected behavior. I mostly think this is valuable default behavior
>> because
>> it allows developers to find mistakes early and diagnose them much easier
>> compare to having to "work backwards" and figure out that it is the CAST
>> that failed. It also means that pipelines using TRY_CAST are
>> self-documenting because using that can signal "we might receive broken
>> data here".
>>
>>
>> Best
>> Ingo
>>
>> On Thu, Nov 18, 2021 at 9:11 AM Till Rohrmann 
>> wrote:
>>
>> > Hi everyone,
>> >
>> > personally I would also prefer the system telling me that something is
>> > wrong instead of silently ignoring records. If there is a TRY_CAST
>> function
>> > that has the old behaviour, people can still get the old behaviour. For
>> > backwards compatibility reasons it is a good idea to introduce a switch
>> to
>> > get back the old behaviour. By default we could set it to the new
>> > behaviour, though. Of course, we should explicitly document this new
>> > behaviour so that people are aware of it before running their jobs for
>> days
>> > and then encountering an invalid input.
>> >
>> > Cheers,
>> > Till
>> >
>> > On Thu, Nov 18, 2021 at 9:02 AM Martijn Visser 
>> > wrote:
>> >
>> > > Hi Caizhi,
>> > >
>> > > Thanks for bringing this up for discussion. I think the important
>> part is
>> > > what do developers expect as the default behaviour of a CAST function
>> > when
>> > > casting fails. If I look at Postgres [1] or MSSQL [2], the default
>> > > behaviour of a CAST failing would be to return an error, which would
>> be
>> > the
>> > > new behaviour. Returning a value when a CAST fails can lead to users
>> not
>> > > understanding immediately where that value comes from. So, I would be
>> in
>> > > favor of the new behaviour by default, but including a configuration
>> flag
>> > > to maintain the old behaviour to avoid that you need to rewrite all
>> these
>> > > jobs.
>> > >
>> > > Best regards,
>> > >
>> > > Martijn
>> > >
>> > > [1] htt

Re: State migration for sql job

2021-06-08 Thread Kurt Young
What kind of expectation do you have after you add the "max(a)" aggregation:

a. Keep summing a and start to calculate max(a) after you added. In other
words, max(a) won't take the history data into account.
b. First process all the historical data to get a result of max(a), and
then start to compute sum(a) and max(a) together for the real-time data.

Best,
Kurt


On Tue, Jun 8, 2021 at 2:11 PM JING ZHANG  wrote:

> Hi aitozi,
> This is a popular demand that many users mentioned, which appears in user
> mail list for several times.
> Unfortunately, it is not supported by Flink SQL yet, maybe would be solved
> in the future. BTW, a few company try to solve the problem in some
> specified user cases on their internal Flink version[2].
> Currently, you may try use `State Processor API`[1] as temporary solution.
> 1. Do a savepoint
> 2. Generates updated the savepoint based on State Processor API
> 3. Recover from the new savepoint.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/libs/state_processor_api/
> [2] https://developer.aliyun.com/article/781455
>
> Best regards,
> JING ZHANG
>
> aitozi  于2021年6月8日周二 下午1:54写道:
>
>> When use flink sql, we encounter a big problem to deal with sql state
>> compatibility. Think we have a group agg sql like ```sql select sum(`a`)
>> from source_t group by `uid` ``` But if i want to add a new agg column to
>> ```sql select sum(`a`), max(`a`) from source_t group by `uid` ``` Then sql
>> state will not be compatible. Is there any on-going work/thoughts to
>> improve this situation?
>> --
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> 
>> at Nabble.com.
>>
>


Re: How to recovery from last count when using CUMULATE window after restart flink-sql job?

2021-05-07 Thread Kurt Young
Hi, please use user mailing list only to discuss these issues.

Best,
Kurt


On Sat, May 8, 2021 at 1:05 PM 1095193...@qq.com <1095193...@qq.com> wrote:

> Hi
>I have tried cumalate window function in Flink-1.13 sql to accumulate
> data from Kafka. When I restart a cumulate window sql job,  last count
> state is not considered and the count state accumulates from 1. Any
> solutions can help recovery from last count state when restarting Flink-sql
> job?
> Thank you
> --
> 1095193...@qq.com
>


Re: Flink 1.13 and CSV (batch) writing

2021-04-11 Thread Kurt Young
The Flink community has a plan to delete the DataSet API in the future, the
requirements will be fulfilled by both
Table & DataStream API. It would be helpful to let us know what kind of
functionality is missing in these two APIs.
If you have further information you want to share, please let us know.

Best,
Kurt


On Sun, Apr 11, 2021 at 9:18 PM Flavio Pompermaier 
wrote:

> Thanks for the suggestions Kurt. Actually I could use Table Api I think,
> it's just that most of our Flink code use DataSet Api.
>
> Il dom 11 apr 2021, 13:44 Kurt Young  ha scritto:
>
>> Thanks for the suggestions Flavio. Join without window & left outer join
>> already worked in Table API & SQL.
>> And for reduceGroup, you can try either user defined aggregate function
>> or use table aggregate which is
>> available in Table API now. I'm wondering whether these can meet your
>> requirement, or you have other
>> use cases only feasible with DataStream.
>>
>> Best,
>> Kurt
>>
>>
>> On Fri, Apr 9, 2021 at 7:41 PM Flavio Pompermaier 
>> wrote:
>>
>>> That's absolutely useful. IMHO also join should work without
>>> windows/triggers and left/right outer joins should be easier in order to
>>> really migrate legacy code.
>>> Also reduceGroup would help but less urgent.
>>> I hope that my feedback as Flink user could be useful.
>>>
>>> Best,
>>> Flavio
>>>
>>> On Fri, Apr 9, 2021 at 12:38 PM Kurt Young  wrote:
>>>
>>>> Converting from table to DataStream in batch mode is indeed a problem
>>>> now. But I think this will
>>>> be improved soon.
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>>
>>>> On Fri, Apr 9, 2021 at 6:14 PM Flavio Pompermaier 
>>>> wrote:
>>>>
>>>>> In my real CSV I have LONG columns that can contain null values. In
>>>>> that case I get a parse exception (and I would like to avoid to read it as
>>>>> a string).
>>>>> The ',bye' is just the way you can test that in my example (add that
>>>>> line to the input csv).
>>>>> If I use  'csv.null-literal' = '' it seems to work but, is it a
>>>>> workaround or it is the right solution?
>>>>>
>>>>> Another big problem I'm having with the new APIs is that if I use
>>>>> TableEnvironment tableEnv = TableEnvironment.create(envSettings);
>>>>> then I can't convert a table to a datastream..I need to use
>>>>> StreamTableEnvironment tableEnv =
>>>>> StreamTableEnvironment.create(streamEnv, envSettings);
>>>>> but in that case I can't use inBatchMode..
>>>>>
>>>>> On Fri, Apr 9, 2021 at 11:44 AM Kurt Young  wrote:
>>>>>
>>>>>> `format.ignore-first-line` is unfortunately a regression compared to
>>>>>> the old one.
>>>>>> I've created a ticket [1] to track this but according to current
>>>>>> design, it seems not easy to do.
>>>>>>
>>>>>> Regarding null values, I'm not sure if I understand the issue you
>>>>>> had. What do you mean by
>>>>>> using ',bye' to test null Long values?
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-22178
>>>>>>
>>>>>> Best,
>>>>>> Kurt
>>>>>>
>>>>>>
>>>>>> On Fri, Apr 9, 2021 at 4:46 PM Flavio Pompermaier <
>>>>>> pomperma...@okkam.it> wrote:
>>>>>>
>>>>>>> And another thing: in my csv I added ',bye' (to test null Long
>>>>>>> values) but I get a parse error..if I add  'csv.null-literal' = '' it 
>>>>>>> seems
>>>>>>> to work..is that the right way to solve this problem?
>>>>>>>
>>>>>>> On Fri, Apr 9, 2021 at 10:13 AM Flavio Pompermaier <
>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>
>>>>>>>> Thanks Kurt, now it works. However I can't find a way to skip the
>>>>>>>> CSV header..before there was  "format.ignore-first-line" but now I 
>>>>>>>> can't
>>>>>>>> find another way to skip it.
>>>>>>>> I could set csv.ignore-parse-errors to true but then

Re: Flink 1.13 and CSV (batch) writing

2021-04-11 Thread Kurt Young
Thanks for the suggestions Flavio. Join without window & left outer join
already worked in Table API & SQL.
And for reduceGroup, you can try either user defined aggregate function or
use table aggregate which is
available in Table API now. I'm wondering whether these can meet your
requirement, or you have other
use cases only feasible with DataStream.

Best,
Kurt


On Fri, Apr 9, 2021 at 7:41 PM Flavio Pompermaier 
wrote:

> That's absolutely useful. IMHO also join should work without
> windows/triggers and left/right outer joins should be easier in order to
> really migrate legacy code.
> Also reduceGroup would help but less urgent.
> I hope that my feedback as Flink user could be useful.
>
> Best,
> Flavio
>
> On Fri, Apr 9, 2021 at 12:38 PM Kurt Young  wrote:
>
>> Converting from table to DataStream in batch mode is indeed a problem
>> now. But I think this will
>> be improved soon.
>>
>> Best,
>> Kurt
>>
>>
>> On Fri, Apr 9, 2021 at 6:14 PM Flavio Pompermaier 
>> wrote:
>>
>>> In my real CSV I have LONG columns that can contain null values. In that
>>> case I get a parse exception (and I would like to avoid to read it as a
>>> string).
>>> The ',bye' is just the way you can test that in my example (add that
>>> line to the input csv).
>>> If I use  'csv.null-literal' = '' it seems to work but, is it a
>>> workaround or it is the right solution?
>>>
>>> Another big problem I'm having with the new APIs is that if I use
>>> TableEnvironment tableEnv = TableEnvironment.create(envSettings);
>>> then I can't convert a table to a datastream..I need to use
>>> StreamTableEnvironment tableEnv =
>>> StreamTableEnvironment.create(streamEnv, envSettings);
>>> but in that case I can't use inBatchMode..
>>>
>>> On Fri, Apr 9, 2021 at 11:44 AM Kurt Young  wrote:
>>>
>>>> `format.ignore-first-line` is unfortunately a regression compared to
>>>> the old one.
>>>> I've created a ticket [1] to track this but according to current
>>>> design, it seems not easy to do.
>>>>
>>>> Regarding null values, I'm not sure if I understand the issue you had.
>>>> What do you mean by
>>>> using ',bye' to test null Long values?
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-22178
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>>
>>>> On Fri, Apr 9, 2021 at 4:46 PM Flavio Pompermaier 
>>>> wrote:
>>>>
>>>>> And another thing: in my csv I added ',bye' (to test null Long values)
>>>>> but I get a parse error..if I add  'csv.null-literal' = '' it seems to
>>>>> work..is that the right way to solve this problem?
>>>>>
>>>>> On Fri, Apr 9, 2021 at 10:13 AM Flavio Pompermaier <
>>>>> pomperma...@okkam.it> wrote:
>>>>>
>>>>>> Thanks Kurt, now it works. However I can't find a way to skip the CSV
>>>>>> header..before there was  "format.ignore-first-line" but now I can't find
>>>>>> another way to skip it.
>>>>>> I could set csv.ignore-parse-errors to true but then I can't detect
>>>>>> other parsing errors, otherwise I need to manually transofrm the header
>>>>>> into a comment adding the # character at the start of the line..
>>>>>> How can I solve that?
>>>>>>
>>>>>> On Fri, Apr 9, 2021 at 4:07 AM Kurt Young  wrote:
>>>>>>
>>>>>>> My DDL is:
>>>>>>>
>>>>>>> CREATE TABLE csv (
>>>>>>>id BIGINT,
>>>>>>>name STRING
>>>>>>> ) WITH (
>>>>>>>'connector' = 'filesystem',
>>>>>>>'path' = '.',
>>>>>>>'format' = 'csv'
>>>>>>> );
>>>>>>>
>>>>>>> Best,
>>>>>>> Kurt
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Apr 9, 2021 at 10:00 AM Kurt Young  wrote:
>>>>>>>
>>>>>>>> Hi Flavio,
>>>>>>>>
>>>>>>>> We would recommend you to use new table source & sink interfaces,
>>>>>>>> which have different
>>>>>>>> property keys compared to the old one

Re: Flink 1.13 and CSV (batch) writing

2021-04-09 Thread Kurt Young
Converting from table to DataStream in batch mode is indeed a problem now.
But I think this will
be improved soon.

Best,
Kurt


On Fri, Apr 9, 2021 at 6:14 PM Flavio Pompermaier 
wrote:

> In my real CSV I have LONG columns that can contain null values. In that
> case I get a parse exception (and I would like to avoid to read it as a
> string).
> The ',bye' is just the way you can test that in my example (add that line
> to the input csv).
> If I use  'csv.null-literal' = '' it seems to work but, is it a workaround
> or it is the right solution?
>
> Another big problem I'm having with the new APIs is that if I use
> TableEnvironment tableEnv = TableEnvironment.create(envSettings);
> then I can't convert a table to a datastream..I need to use
> StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(streamEnv, envSettings);
> but in that case I can't use inBatchMode..
>
> On Fri, Apr 9, 2021 at 11:44 AM Kurt Young  wrote:
>
>> `format.ignore-first-line` is unfortunately a regression compared to the
>> old one.
>> I've created a ticket [1] to track this but according to current design,
>> it seems not easy to do.
>>
>> Regarding null values, I'm not sure if I understand the issue you had.
>> What do you mean by
>> using ',bye' to test null Long values?
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-22178
>>
>> Best,
>> Kurt
>>
>>
>> On Fri, Apr 9, 2021 at 4:46 PM Flavio Pompermaier 
>> wrote:
>>
>>> And another thing: in my csv I added ',bye' (to test null Long values)
>>> but I get a parse error..if I add  'csv.null-literal' = '' it seems to
>>> work..is that the right way to solve this problem?
>>>
>>> On Fri, Apr 9, 2021 at 10:13 AM Flavio Pompermaier 
>>> wrote:
>>>
>>>> Thanks Kurt, now it works. However I can't find a way to skip the CSV
>>>> header..before there was  "format.ignore-first-line" but now I can't find
>>>> another way to skip it.
>>>> I could set csv.ignore-parse-errors to true but then I can't detect
>>>> other parsing errors, otherwise I need to manually transofrm the header
>>>> into a comment adding the # character at the start of the line..
>>>> How can I solve that?
>>>>
>>>> On Fri, Apr 9, 2021 at 4:07 AM Kurt Young  wrote:
>>>>
>>>>> My DDL is:
>>>>>
>>>>> CREATE TABLE csv (
>>>>>id BIGINT,
>>>>>name STRING
>>>>> ) WITH (
>>>>>'connector' = 'filesystem',
>>>>>'path' = '.',
>>>>>'format' = 'csv'
>>>>> );
>>>>>
>>>>> Best,
>>>>> Kurt
>>>>>
>>>>>
>>>>> On Fri, Apr 9, 2021 at 10:00 AM Kurt Young  wrote:
>>>>>
>>>>>> Hi Flavio,
>>>>>>
>>>>>> We would recommend you to use new table source & sink interfaces,
>>>>>> which have different
>>>>>> property keys compared to the old ones, e.g. 'connector' v.s.
>>>>>> 'connector.type'.
>>>>>>
>>>>>> You can follow the 1.12 doc [1] to define your csv table, everything
>>>>>> should work just fine.
>>>>>>
>>>>>> *Flink SQL> set table.dml-sync=true;*
>>>>>>
>>>>>> *[INFO] Session property has been set.*
>>>>>>
>>>>>>
>>>>>> *Flink SQL> select * from csv;*
>>>>>>
>>>>>> *+--+--+*
>>>>>>
>>>>>> *|   id | name |*
>>>>>>
>>>>>> *+--+--+*
>>>>>>
>>>>>> *|3 |c |*
>>>>>>
>>>>>> *+--+--+*
>>>>>>
>>>>>> *Received a total of 1 row*
>>>>>>
>>>>>>
>>>>>> *Flink SQL> insert overwrite csv values(4, 'd');*
>>>>>>
>>>>>> *[INFO] Submitting SQL update statement to the cluster...*
>>>>>>
>>>>>> *[INFO] Execute statement in sync mode. Please wait for the execution
>>>>>> finish...*
>>>>>>
>>>>>> *

Re: Flink 1.13 and CSV (batch) writing

2021-04-09 Thread Kurt Young
`format.ignore-first-line` is unfortunately a regression compared to the
old one.
I've created a ticket [1] to track this but according to current design, it
seems not easy to do.

Regarding null values, I'm not sure if I understand the issue you had. What
do you mean by
using ',bye' to test null Long values?

[1] https://issues.apache.org/jira/browse/FLINK-22178

Best,
Kurt


On Fri, Apr 9, 2021 at 4:46 PM Flavio Pompermaier 
wrote:

> And another thing: in my csv I added ',bye' (to test null Long values) but
> I get a parse error..if I add  'csv.null-literal' = '' it seems to work..is
> that the right way to solve this problem?
>
> On Fri, Apr 9, 2021 at 10:13 AM Flavio Pompermaier 
> wrote:
>
>> Thanks Kurt, now it works. However I can't find a way to skip the CSV
>> header..before there was  "format.ignore-first-line" but now I can't find
>> another way to skip it.
>> I could set csv.ignore-parse-errors to true but then I can't detect other
>> parsing errors, otherwise I need to manually transofrm the header into a
>> comment adding the # character at the start of the line..
>> How can I solve that?
>>
>> On Fri, Apr 9, 2021 at 4:07 AM Kurt Young  wrote:
>>
>>> My DDL is:
>>>
>>> CREATE TABLE csv (
>>>id BIGINT,
>>>name STRING
>>> ) WITH (
>>>'connector' = 'filesystem',
>>>'path' = '.',
>>>'format' = 'csv'
>>> );
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Fri, Apr 9, 2021 at 10:00 AM Kurt Young  wrote:
>>>
>>>> Hi Flavio,
>>>>
>>>> We would recommend you to use new table source & sink interfaces, which
>>>> have different
>>>> property keys compared to the old ones, e.g. 'connector' v.s.
>>>> 'connector.type'.
>>>>
>>>> You can follow the 1.12 doc [1] to define your csv table, everything
>>>> should work just fine.
>>>>
>>>> *Flink SQL> set table.dml-sync=true;*
>>>>
>>>> *[INFO] Session property has been set.*
>>>>
>>>>
>>>> *Flink SQL> select * from csv;*
>>>>
>>>> *+--+--+*
>>>>
>>>> *|   id | name |*
>>>>
>>>> *+--+--+*
>>>>
>>>> *|3 |c |*
>>>>
>>>> *+--+--+*
>>>>
>>>> *Received a total of 1 row*
>>>>
>>>>
>>>> *Flink SQL> insert overwrite csv values(4, 'd');*
>>>>
>>>> *[INFO] Submitting SQL update statement to the cluster...*
>>>>
>>>> *[INFO] Execute statement in sync mode. Please wait for the execution
>>>> finish...*
>>>>
>>>> *[INFO] Complete execution of the SQL update statement.*
>>>>
>>>>
>>>> *Flink SQL> select * from csv;*
>>>>
>>>> *+--+--+*
>>>>
>>>> *|   id | name |*
>>>>
>>>> *+--+--+*
>>>>
>>>> *|4 |d |*
>>>>
>>>> *+--+--+*
>>>>
>>>> *Received a total of 1 row*
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>>
>>>> On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier 
>>>> wrote:
>>>>
>>>>> Hi Till,
>>>>> since I was using the same WITH-clause both for reading and writing I
>>>>> discovered that overwrite is actually supported in the Sinks, while in the
>>>>> Sources an exception is thrown (I was thinking that those properties were
>>>>> simply ignored).
>>>>> However the quote-character is not supported in the sinks: is this a
>>>>> bug or is it the intended behaviour?.
>>>>> Here is a minimal example that reproduce the problem (put in the
>>>>> /tmp/test.csv something like '1,hello' or '2,hi').
>>>>>
>>>>> import org.apache.flink.table.api.EnvironmentSettings;
>>>>> import or

Re: Flink 1.13 and CSV (batch) writing

2021-04-08 Thread Kurt Young
My DDL is:

CREATE TABLE csv (
   id BIGINT,
   name STRING
) WITH (
   'connector' = 'filesystem',
   'path' = '.',
   'format' = 'csv'
);

Best,
Kurt


On Fri, Apr 9, 2021 at 10:00 AM Kurt Young  wrote:

> Hi Flavio,
>
> We would recommend you to use new table source & sink interfaces, which
> have different
> property keys compared to the old ones, e.g. 'connector' v.s.
> 'connector.type'.
>
> You can follow the 1.12 doc [1] to define your csv table, everything
> should work just fine.
>
> *Flink SQL> set table.dml-sync=true;*
>
> *[INFO] Session property has been set.*
>
>
> *Flink SQL> select * from csv;*
>
> *+--+--+*
>
> *|   id | name |*
>
> *+--+--+*
>
> *|3 |c |*
>
> *+--+--+*
>
> *Received a total of 1 row*
>
>
> *Flink SQL> insert overwrite csv values(4, 'd');*
>
> *[INFO] Submitting SQL update statement to the cluster...*
>
> *[INFO] Execute statement in sync mode. Please wait for the execution
> finish...*
>
> *[INFO] Complete execution of the SQL update statement.*
>
>
> *Flink SQL> select * from csv;*
>
> *+--+--+*
>
> *|   id | name |*
>
> *+--+--+*
>
> *|4 |d |*
>
> *+--+--+*
>
> *Received a total of 1 row*
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html
>
> Best,
> Kurt
>
>
> On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier 
> wrote:
>
>> Hi Till,
>> since I was using the same WITH-clause both for reading and writing I
>> discovered that overwrite is actually supported in the Sinks, while in the
>> Sources an exception is thrown (I was thinking that those properties were
>> simply ignored).
>> However the quote-character is not supported in the sinks: is this a bug
>> or is it the intended behaviour?.
>> Here is a minimal example that reproduce the problem (put in the
>> /tmp/test.csv something like '1,hello' or '2,hi').
>>
>> import org.apache.flink.table.api.EnvironmentSettings;
>> import org.apache.flink.table.api.Table;
>> import org.apache.flink.table.api.TableEnvironment;
>>
>> public class FlinkCsvTest {
>>   public static void main(String[] args) throws Exception {
>> final EnvironmentSettings envSettings =
>>
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> final TableEnvironment tableEnv =
>> TableEnvironment.create(envSettings);
>> // ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>> // BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
>> final String tableInName = "testTableIn";
>> final String createInTableDdl = getSourceDdl(tableInName,
>> "/tmp/test.csv"); //
>>
>> final String tableOutName = "testTableOut";
>> final String createOutTableDdl = getSinkDdl(tableOutName,
>> "/tmp/test-out.csv"); //
>> tableEnv.executeSql(createInTableDdl);
>> tableEnv.executeSql(createOutTableDdl);
>>
>> Table tableIn = tableEnv.from(tableInName);
>> Table tableOut = tableEnv.from(tableOutName);
>> tableIn.insertInto(tableOutName);
>> // tableEnv.toDataSet(table, Row.class).print();
>> tableEnv.execute("TEST read/write");
>>
>>   }
>>
>>   private static String getSourceDdl(String tableName, String filePath) {
>> return "CREATE TABLE " + tableName + " (\n" + //
>> " `id` BIGINT,\n" + //
>> " `name` STRING) WITH (\n" + //
>> " 'connector.type' = 'filesystem',\n" + //
>> " 'connector.property-version' = '1',\n" + //
>> " 'connector.path' = '" + filePath + "',\n" + //
>> " 'format.type' = 'csv',\n" + //
>> " 'format.field-delimiter' = ',',\n" + //
>>  //   " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED
>> " 'format.property-version' = '1',\n" + //
>> " 'format.quote-character' = '\"',\n" + //
>> " 'format.ignore-first-line' = 'false'" + //
>>

Re: Flink 1.13 and CSV (batch) writing

2021-04-08 Thread Kurt Young
Hi Flavio,

We would recommend you to use new table source & sink interfaces, which
have different
property keys compared to the old ones, e.g. 'connector' v.s.
'connector.type'.

You can follow the 1.12 doc [1] to define your csv table, everything should
work just fine.

*Flink SQL> set table.dml-sync=true;*

*[INFO] Session property has been set.*


*Flink SQL> select * from csv;*

*+--+--+*

*|   id | name |*

*+--+--+*

*|3 |c |*

*+--+--+*

*Received a total of 1 row*


*Flink SQL> insert overwrite csv values(4, 'd');*

*[INFO] Submitting SQL update statement to the cluster...*

*[INFO] Execute statement in sync mode. Please wait for the execution
finish...*

*[INFO] Complete execution of the SQL update statement.*


*Flink SQL> select * from csv;*

*+--+--+*

*|   id | name |*

*+--+--+*

*|4 |d |*

*+--+--+*

*Received a total of 1 row*

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html

Best,
Kurt


On Fri, Apr 9, 2021 at 3:20 AM Flavio Pompermaier 
wrote:

> Hi Till,
> since I was using the same WITH-clause both for reading and writing I
> discovered that overwrite is actually supported in the Sinks, while in the
> Sources an exception is thrown (I was thinking that those properties were
> simply ignored).
> However the quote-character is not supported in the sinks: is this a bug
> or is it the intended behaviour?.
> Here is a minimal example that reproduce the problem (put in the
> /tmp/test.csv something like '1,hello' or '2,hi').
>
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.TableEnvironment;
>
> public class FlinkCsvTest {
>   public static void main(String[] args) throws Exception {
> final EnvironmentSettings envSettings =
>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> final TableEnvironment tableEnv = TableEnvironment.create(envSettings);
> // ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> // BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
> final String tableInName = "testTableIn";
> final String createInTableDdl = getSourceDdl(tableInName,
> "/tmp/test.csv"); //
>
> final String tableOutName = "testTableOut";
> final String createOutTableDdl = getSinkDdl(tableOutName,
> "/tmp/test-out.csv"); //
> tableEnv.executeSql(createInTableDdl);
> tableEnv.executeSql(createOutTableDdl);
>
> Table tableIn = tableEnv.from(tableInName);
> Table tableOut = tableEnv.from(tableOutName);
> tableIn.insertInto(tableOutName);
> // tableEnv.toDataSet(table, Row.class).print();
> tableEnv.execute("TEST read/write");
>
>   }
>
>   private static String getSourceDdl(String tableName, String filePath) {
> return "CREATE TABLE " + tableName + " (\n" + //
> " `id` BIGINT,\n" + //
> " `name` STRING) WITH (\n" + //
> " 'connector.type' = 'filesystem',\n" + //
> " 'connector.property-version' = '1',\n" + //
> " 'connector.path' = '" + filePath + "',\n" + //
> " 'format.type' = 'csv',\n" + //
> " 'format.field-delimiter' = ',',\n" + //
>  //   " 'format.write-mode' = 'OVERWRITE',\n" + // NOT SUPPORTED
> " 'format.property-version' = '1',\n" + //
> " 'format.quote-character' = '\"',\n" + //
> " 'format.ignore-first-line' = 'false'" + //
> ")";
>   }
>
>   private static String getSinkDdl(String tableName, String filePath) {
> return "CREATE TABLE " + tableName + " (\n" + //
> " `id` BIGINT,\n" + //
> " `name` STRING) WITH (\n" + //
> " 'connector.type' = 'filesystem',\n" + //
> " 'connector.property-version' = '1',\n" + //
> " 'connector.path' = '" + filePath + "',\n" + //
> " 'format.type' = 'csv',\n" + //
> " 'format.field-delimiter' = ',',\n" + //
> " 'format.num-files' = '1',\n" + //
> " 'format.write-mode' = 'OVERWRITE',\n" + // SUPPORTED (sinks only)
> " 'format.quote-character' = '\"',\n" + // NOT SUPPORTED
> " 'format.property-version' = '1'\n" + //
> ")";
>   }
> }
>
> Thanks for the support,
> Flavio
>
>
> On Thu, Apr 8, 2021 at 7:05 PM Till Rohrmann  wrote:
>
>> Hi Flavio,
>>
>> I tried to execute the code snippet you have provided and I could not
>> reproduce the problem.
>>
>> Concretely I am running this code:
>>
>> final EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
>> .useBlinkPlanner()
>> .inStreamingMode()
>> 

Re: [DISCUSS] Feature freeze date for 1.13

2021-04-07 Thread Kurt Young
Hi Yuval,

I think you are good to go, since there is no objection from PMC.

Best,
Kurt


On Wed, Apr 7, 2021 at 12:48 AM Yuval Itzchakov  wrote:

> Hi Guowei,
>
> Who should I speak to regarding this? I am at the final stages of the PR I
> believe (Shengkai is kindly helping me make things work) and I would like
> to push this into 1.13.
>
> On Fri, Apr 2, 2021 at 5:43 AM Guowei Ma  wrote:
>
>> Hi, Yuval
>>
>> Thanks for your contribution. I am not a SQL expert, but it seems to be
>> beneficial to users, and the amount of code is not much and only left is
>> the test. Therefore, I am open to this entry into rc1.
>> But according to the rules, you still have to see if there are other
>> PMC's objections within 48 hours.
>>
>> Best,
>> Guowei
>>
>>
>> On Thu, Apr 1, 2021 at 10:33 PM Yuval Itzchakov 
>> wrote:
>>
>>> Hi All,
>>>
>>> I would really love to merge https://github.com/apache/flink/pull/15307
>>> prior to 1.13 release cutoff, it just needs some more tests which I can
>>> hopefully get to today / tomorrow morning.
>>>
>>> This is a critical fix as now predicate pushdown won't work for any
>>> stream which generates a watermark and wants to push down predicates.
>>>
>>> On Thu, Apr 1, 2021, 10:56 Kurt Young  wrote:
>>>
>>>> Thanks Dawid, I have merged FLINK-20320.
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>>
>>>> On Thu, Apr 1, 2021 at 2:49 PM Dawid Wysakowicz 
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> @Kurt @Arvid I think it's fine to merge those two, as they are pretty
>>>>> much finished. We can wait for those two before creating the RC0.
>>>>>
>>>>> @Leonard Personally I'd be ok with 3 more days for that single PR. I
>>>>> find the request reasonable and I second that it's better to have a proper
>>>>> review rather than rush unfinished feature and try to fix it later.
>>>>> Moreover it got broader support. Unless somebody else objects, I think we
>>>>> can merge this PR later and include it in RC1.
>>>>>
>>>>> Best,
>>>>>
>>>>> Dawid
>>>>> On 01/04/2021 08:39, Arvid Heise wrote:
>>>>>
>>>>> Hi Dawid and Guowei,
>>>>>
>>>>> I'd like to merge [FLINK-13550][rest][ui] Vertex Flame Graph [1]. We
>>>>> are pretty much just waiting for AZP to turn green, it's separate from
>>>>> other components, and it's a super useful feature for Flink users.
>>>>>
>>>>> Best,
>>>>>
>>>>> Arvid
>>>>>
>>>>> [1] https://github.com/apache/flink/pull/15054
>>>>>
>>>>> On Thu, Apr 1, 2021 at 6:21 AM Kurt Young  wrote:
>>>>>
>>>>>> Hi Guowei and Dawid,
>>>>>>
>>>>>> I want to request the permission to merge this feature [1], it's a
>>>>>> useful improvement to sql client and won't affect
>>>>>> other components too much. We were plan to merge it yesterday but met
>>>>>> some tricky multi-process issue which
>>>>>> has a very high possibility hanging the tests. It took us a while to
>>>>>> find out the root cause and fix it.
>>>>>>
>>>>>> Since it's not too far away from feature freeze and RC0 also not
>>>>>> created yet, thus I would like to include this
>>>>>> in 1.13.
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-20320
>>>>>>
>>>>>> Best,
>>>>>> Kurt
>>>>>>
>>>>>>
>>>>>> On Wed, Mar 31, 2021 at 5:55 PM Guowei Ma 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi, community:
>>>>>>>
>>>>>>> Friendly reminder that today (3.31) is the last day of feature
>>>>>>> development. Under normal circumstances, you will not be able to submit 
>>>>>>> new
>>>>>>> features from tomorrow (4.1). Tomorrow we will create 1.13.0-rc0 for
>>>>>>> testing, welcome to help test together.
>>>>>>> After the test is relatively stable, we will cut the release-1.13
>>>>>>> branch.
>>>>>>>
>>>

Re: [DISCUSS] Feature freeze date for 1.13

2021-04-01 Thread Kurt Young
Thanks Dawid, I have merged FLINK-20320.

Best,
Kurt


On Thu, Apr 1, 2021 at 2:49 PM Dawid Wysakowicz 
wrote:

> Hi all,
>
> @Kurt @Arvid I think it's fine to merge those two, as they are pretty much
> finished. We can wait for those two before creating the RC0.
>
> @Leonard Personally I'd be ok with 3 more days for that single PR. I find
> the request reasonable and I second that it's better to have a proper
> review rather than rush unfinished feature and try to fix it later.
> Moreover it got broader support. Unless somebody else objects, I think we
> can merge this PR later and include it in RC1.
>
> Best,
>
> Dawid
> On 01/04/2021 08:39, Arvid Heise wrote:
>
> Hi Dawid and Guowei,
>
> I'd like to merge [FLINK-13550][rest][ui] Vertex Flame Graph [1]. We are
> pretty much just waiting for AZP to turn green, it's separate from other
> components, and it's a super useful feature for Flink users.
>
> Best,
>
> Arvid
>
> [1] https://github.com/apache/flink/pull/15054
>
> On Thu, Apr 1, 2021 at 6:21 AM Kurt Young  wrote:
>
>> Hi Guowei and Dawid,
>>
>> I want to request the permission to merge this feature [1], it's a useful
>> improvement to sql client and won't affect
>> other components too much. We were plan to merge it yesterday but met
>> some tricky multi-process issue which
>> has a very high possibility hanging the tests. It took us a while to find
>> out the root cause and fix it.
>>
>> Since it's not too far away from feature freeze and RC0 also not created
>> yet, thus I would like to include this
>> in 1.13.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-20320
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Mar 31, 2021 at 5:55 PM Guowei Ma  wrote:
>>
>>> Hi, community:
>>>
>>> Friendly reminder that today (3.31) is the last day of feature
>>> development. Under normal circumstances, you will not be able to submit new
>>> features from tomorrow (4.1). Tomorrow we will create 1.13.0-rc0 for
>>> testing, welcome to help test together.
>>> After the test is relatively stable, we will cut the release-1.13 branch.
>>>
>>> Best,
>>> Dawid & Guowei
>>>
>>>
>>> On Mon, Mar 29, 2021 at 5:17 PM Till Rohrmann 
>>> wrote:
>>>
>>>> +1 for the 31st of March for the feature freeze.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Mon, Mar 29, 2021 at 10:12 AM Robert Metzger 
>>>> wrote:
>>>>
>>>> > +1 for March 31st for the feature freeze.
>>>> >
>>>> >
>>>> >
>>>> > On Fri, Mar 26, 2021 at 3:39 PM Dawid Wysakowicz <
>>>> dwysakow...@apache.org>
>>>> > wrote:
>>>> >
>>>> > > Thank you Thomas! I'll definitely check the issue you linked.
>>>> > >
>>>> > > Best,
>>>> > >
>>>> > > Dawid
>>>> > >
>>>> > > On 23/03/2021 20:35, Thomas Weise wrote:
>>>> > > > Hi Dawid,
>>>> > > >
>>>> > > > Thanks for the heads up.
>>>> > > >
>>>> > > > Regarding the "Rebase and merge" button. I find that merge option
>>>> > useful,
>>>> > > > especially for small simple changes and for backports. The
>>>> following
>>>> > > should
>>>> > > > help to safeguard from the issue encountered previously:
>>>> > > > https://github.com/jazzband/pip-tools/issues/1085
>>>> > > >
>>>> > > > Thanks,
>>>> > > > Thomas
>>>> > > >
>>>> > > >
>>>> > > > On Tue, Mar 23, 2021 at 4:58 AM Dawid Wysakowicz <
>>>> > dwysakow...@apache.org
>>>> > > >
>>>> > > > wrote:
>>>> > > >
>>>> > > >> Hi devs, users!
>>>> > > >>
>>>> > > >> 1. *Feature freeze date*
>>>> > > >>
>>>> > > >> We are approaching the end of March which we agreed would be the
>>>> time
>>>> > > for
>>>> > > >> a Feature Freeze. From the knowledge I've gather so far it still
>>>> seems
>>>> > > to
>>>> > > >> be a viable plan. I think it is a good time to agree on a
&g

Re: [DISCUSS] Feature freeze date for 1.13

2021-03-31 Thread Kurt Young
Hi Guowei and Dawid,

I want to request the permission to merge this feature [1], it's a useful
improvement to sql client and won't affect
other components too much. We were plan to merge it yesterday but met some
tricky multi-process issue which
has a very high possibility hanging the tests. It took us a while to find
out the root cause and fix it.

Since it's not too far away from feature freeze and RC0 also not created
yet, thus I would like to include this
in 1.13.

[1] https://issues.apache.org/jira/browse/FLINK-20320

Best,
Kurt


On Wed, Mar 31, 2021 at 5:55 PM Guowei Ma  wrote:

> Hi, community:
>
> Friendly reminder that today (3.31) is the last day of feature
> development. Under normal circumstances, you will not be able to submit new
> features from tomorrow (4.1). Tomorrow we will create 1.13.0-rc0 for
> testing, welcome to help test together.
> After the test is relatively stable, we will cut the release-1.13 branch.
>
> Best,
> Dawid & Guowei
>
>
> On Mon, Mar 29, 2021 at 5:17 PM Till Rohrmann 
> wrote:
>
>> +1 for the 31st of March for the feature freeze.
>>
>> Cheers,
>> Till
>>
>> On Mon, Mar 29, 2021 at 10:12 AM Robert Metzger 
>> wrote:
>>
>> > +1 for March 31st for the feature freeze.
>> >
>> >
>> >
>> > On Fri, Mar 26, 2021 at 3:39 PM Dawid Wysakowicz <
>> dwysakow...@apache.org>
>> > wrote:
>> >
>> > > Thank you Thomas! I'll definitely check the issue you linked.
>> > >
>> > > Best,
>> > >
>> > > Dawid
>> > >
>> > > On 23/03/2021 20:35, Thomas Weise wrote:
>> > > > Hi Dawid,
>> > > >
>> > > > Thanks for the heads up.
>> > > >
>> > > > Regarding the "Rebase and merge" button. I find that merge option
>> > useful,
>> > > > especially for small simple changes and for backports. The following
>> > > should
>> > > > help to safeguard from the issue encountered previously:
>> > > > https://github.com/jazzband/pip-tools/issues/1085
>> > > >
>> > > > Thanks,
>> > > > Thomas
>> > > >
>> > > >
>> > > > On Tue, Mar 23, 2021 at 4:58 AM Dawid Wysakowicz <
>> > dwysakow...@apache.org
>> > > >
>> > > > wrote:
>> > > >
>> > > >> Hi devs, users!
>> > > >>
>> > > >> 1. *Feature freeze date*
>> > > >>
>> > > >> We are approaching the end of March which we agreed would be the
>> time
>> > > for
>> > > >> a Feature Freeze. From the knowledge I've gather so far it still
>> seems
>> > > to
>> > > >> be a viable plan. I think it is a good time to agree on a
>> particular
>> > > date,
>> > > >> when it should happen. We suggest *(end of day CEST) March 31st*
>> > > >> (Wednesday next week) as the feature freeze time.
>> > > >>
>> > > >> Similarly as last time, we want to create RC0 on the day after the
>> > > feature
>> > > >> freeze, to make sure the RC creation process is running smoothly,
>> and
>> > to
>> > > >> have a common testing reference point.
>> > > >>
>> > > >> Having said that let us remind after Robert & Dian from the
>> previous
>> > > >> release what it a Feature Freeze means:
>> > > >>
>> > > >> *B) What does feature freeze mean?*After the feature freeze, no new
>> > > >> features are allowed to be merged to master. Only bug fixes and
>> > > >> documentation improvements.
>> > > >> The release managers will revert new feature commits after the
>> feature
>> > > >> freeze.
>> > > >> Rational: The goal of the feature freeze phase is to improve the
>> > system
>> > > >> stability by addressing known bugs. New features tend to introduce
>> new
>> > > >> instabilities, which would prolong the release process.
>> > > >> If you need to merge a new feature after the freeze, please open a
>> > > >> discussion on the dev@ list. If there are no objections by a PMC
>> > member
>> > > >> within 48 (workday)hours, the feature can be merged.
>> > > >>
>> > > >> 2. *Merge PRs from the command line*
>> > > >>
>> > > >> In the past releases it was quite frequent around the Feature
>> Freeze
>> > > date
>> > > >> that we ended up with a broken main branch that either did not
>> compile
>> > > or
>> > > >> there were failing tests. It was often due to concurrent merges to
>> the
>> > > main
>> > > >> branch via the "Rebase and merge" button. To overcome the problem
>> we
>> > > would
>> > > >> like to suggest only ever merging PRs from a command line. Thank
>> you
>> > > >> Stephan for the idea! The suggested workflow would look as follows:
>> > > >>
>> > > >>1. Pull the change and rebase on the current main branch
>> > > >>2. Build the project (e.g. from IDE, which should be faster than
>> > > >>building entire project from cmd) -> this should ensure the
>> project
>> > > compiles
>> > > >>3. Run the tests in the module that the change affects -> this
>> > should
>> > > >>greatly minimize the chances of failling tests
>> > > >>4. Push the change to the main branch
>> > > >>
>> > > >> Let us know what you think!
>> > > >>
>> > > >> Best,
>> > > >>
>> > > >> Guowei & Dawid
>> > > >>
>> > > >>
>> > > >>
>> > >
>> > >
>> >
>>
>


Re: Flink sql 实现全局row_number()分组排序

2021-03-17 Thread Kurt Young
直接 SQL Top-N 即可:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#top-n

Best,
Kurt


On Tue, Mar 16, 2021 at 3:40 PM Tian Hengyu  wrote:

> 咋么有人啊~~~
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink + Hive + Compaction + Parquet?

2021-03-15 Thread Kurt Young
Hi Theo,

Regarding your first 2 questions, the answer is yes Flink supports
streaming write to Hive.
And Flink also supports automatically compacting small files during
streaming write [1].
(Hive and Filesystem shared the same mechanism to do compaction, we forgot
to add a dedicated document for hive.)

And you don't need the hive transaction table for this because Flink will
compact all the small files
_before_ commit the files or partition to hive. From hive's perspective,
the written files are already
large files.

I think this might address most of your confusions and let me know if you
have further questions.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html#file-compaction

Best,
Kurt


On Mon, Mar 15, 2021 at 5:05 PM Flavio Pompermaier 
wrote:

> What about using Apache Hudi o Apache Iceberg?
>
> On Thu, Mar 4, 2021 at 10:15 AM Dawid Wysakowicz 
> wrote:
>
>> Hi,
>>
>> I know Jingsong worked on Flink/Hive filesystem integration in the
>> Table/SQL API. Maybe he can shed some light on your questions.
>>
>> Best,
>>
>> Dawid
>> On 02/03/2021 21:03, Theo Diefenthal wrote:
>>
>> Hi there,
>>
>> Currently, I have a Flink 1.11 job which writes parquet files via the
>> StreamingFileSink to HDFS (simply using DataStream API). I commit like
>> every 3 minutes and thus have many small files in HDFS. Downstream, the
>> generated table is consumed from Spark Jobs and Impala queries. HDFS
>> doesn't like to have too many small files and writing to parquet fast but
>> also desiring large files is a rather common problem and solutions were
>> suggested like recently in the mailing list [1] or in flink forward talks
>> [2]. Cloudera also posted two possible scenarios in their blog posts [3],
>> [4]. Mostly, it comes down to asynchronously compact the many small files
>> into larger ones, at best non blocking and in an occasionally running batch
>> job.
>>
>> I am now about to implement something like suggested in the cloudera blog
>> [4] but from parquet to parquet. For me, it seems to be not straight
>> forward but rather involved, especially as my data is partitioned in
>> eventtime and I need the compaction to be non blocking (my users query
>> impala and expect near real time performance in each query). When starting
>> the work on that, I noticed that Hive already has a compaction mechanism
>> included and the Flink community works a lot in terms of integrating with
>> hive in the latest releases. Some of my questions are not directly related
>> to Flink, but I guess many of you have also experience with hive and
>> writing from Flink to Hive is rather common nowadays.
>>
>> I read online that Spark should integrate nicely with Hive tables, i.e.
>> instead of querying HDFS files, querying a hive table has the same
>> performance [5]. We also all know that Impala integrates nicely with Hive
>> so that overall, I can expect writing to Hive internal tables instead of
>> HDFS parquet doesn't have any disadvantages for me.
>>
>> My questions:
>> 1. Can I use Flink to "streaming write" to Hive?
>> 2. For compaction, I need "transactional tables" and according to the
>> hive docs, transactional tables must be fully managed by hive (i.e., they
>> are not external). Does Flink support writing to those out of the box? (I
>> only have Hive 2 available)
>> 3. Does Flink use the "Hive Streaming Data Ingest" APIs?
>> 4. Do you see any downsides in writing to hive compared to writing to
>> parquet directly? (Especially in my usecase only having impala and spark
>> consumers)
>> 5. Not Flink related: Have you ever experienced performance issues when
>> using hive transactional tables over writing parquet directly? I guess
>> there must be a reason why "transactional" is off by default in Hive? I
>> won't use any features except for compaction, i.e. there are only streaming
>> inserts, no updates, no deletes. (Delete only after given retention and
>> always delete entire partitions)
>>
>>
>> Best regards
>> Theo
>>
>> [1]
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Streaming-data-to-parquet-td38029.html
>> [2] https://www.youtube.com/watch?v=eOQ2073iWt4
>> [3]
>> https://blog.cloudera.com/how-to-ingest-and-query-fast-data-with-impala-without-kudu/
>> [4]
>> https://blog.cloudera.com/transparent-hierarchical-storage-management-with-apache-kudu-and-impala/
>> [5]
>> https://stackoverflow.com/questions/51190646/spark-dataset-on-hive-vs-parquet-file
>>
>>


Re: [DISCUSS] Deprecation and removal of the legacy SQL planner

2021-02-25 Thread Kurt Young


Hi Timo,

First of all I want to thank you for introducing this planner design back
in 1.9, this is a great work
that allows lots of blink features to be merged to Flink in a reasonably
short time. It greatly
accelerates the evolution speed of Table & SQL.

Everything comes with a cost, as you said, right now we are facing the
overhead of maintaining
two planners and it causes bugs and also increases imbalance between these
two planners. As
a developer and also for the good of all Table & SQL users, I also think
it's better for us to be more
focused on a single planner.

Your proposed roadmap looks good to me, +1 from my side and thanks
again for all your efforts!

Best,
Kurt


On Thu, Feb 25, 2021 at 5:01 PM Timo Walther  wrote:

> Hi everyone,
>
> since Flink 1.9 we have supported two SQL planners. Most of the original
> plan of FLIP-32 [1] has been implemented. The Blink code merge has been
> completed and many additional features have been added exclusively to
> the new planner. The new planner is now in a much better shape than the
> legacy one.
>
> In order to avoid user confusion, reduce duplicate code, and improve
> maintainability and testing times of the Flink project as a whole we
> would like to propose the following steps to complete FLIP-32:
>
> In Flink 1.13:
> - Deprecate the `flink-table-planner` module
> - Deprecate `BatchTableEnvironment` for both Java, Scala, and Python
>
> In Flink 1.14:
> - Drop `flink-table-planner` early
> - Drop many deprecated interfaces and API on demand
> - Rename `flink-table-planner-blink` to `flink-table-planner`
> - Rename `flink-table-runtime-blink` to `flink-table-runtime`
> - Remove references of "Blink" in the code base
>
> This will have an impact on users that still use DataSet API together
> with Table API. With this change we will not support converting between
> DataSet API and Table API anymore. We hope to compensate the missing
> functionality in the new unified TableEnvironment and/or the batch mode
> in DataStream API during 1.14 and 1.15. For this, we are looking for
> further feedback which features are required in Table API/DataStream API
> to have a smooth migration path.
>
> Looking forward to your feedback.
>
> Regards,
> Timo
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions
>


Re: [DISCUSS] Deprecation and removal of the legacy SQL planner

2021-02-25 Thread Kurt Young


Hi Timo,

First of all I want to thank you for introducing this planner design back
in 1.9, this is a great work
that allows lots of blink features to be merged to Flink in a reasonably
short time. It greatly
accelerates the evolution speed of Table & SQL.

Everything comes with a cost, as you said, right now we are facing the
overhead of maintaining
two planners and it causes bugs and also increases imbalance between these
two planners. As
a developer and also for the good of all Table & SQL users, I also think
it's better for us to be more
focused on a single planner.

Your proposed roadmap looks good to me, +1 from my side and thanks
again for all your efforts!

Best,
Kurt


On Thu, Feb 25, 2021 at 5:01 PM Timo Walther  wrote:

> Hi everyone,
>
> since Flink 1.9 we have supported two SQL planners. Most of the original
> plan of FLIP-32 [1] has been implemented. The Blink code merge has been
> completed and many additional features have been added exclusively to
> the new planner. The new planner is now in a much better shape than the
> legacy one.
>
> In order to avoid user confusion, reduce duplicate code, and improve
> maintainability and testing times of the Flink project as a whole we
> would like to propose the following steps to complete FLIP-32:
>
> In Flink 1.13:
> - Deprecate the `flink-table-planner` module
> - Deprecate `BatchTableEnvironment` for both Java, Scala, and Python
>
> In Flink 1.14:
> - Drop `flink-table-planner` early
> - Drop many deprecated interfaces and API on demand
> - Rename `flink-table-planner-blink` to `flink-table-planner`
> - Rename `flink-table-runtime-blink` to `flink-table-runtime`
> - Remove references of "Blink" in the code base
>
> This will have an impact on users that still use DataSet API together
> with Table API. With this change we will not support converting between
> DataSet API and Table API anymore. We hope to compensate the missing
> functionality in the new unified TableEnvironment and/or the batch mode
> in DataStream API during 1.14 and 1.15. For this, we are looking for
> further feedback which features are required in Table API/DataStream API
> to have a smooth migration path.
>
> Looking forward to your feedback.
>
> Regards,
> Timo
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions
>


Re: [DISCUSS] Correct time-related function behavior in Flink SQL

2021-01-20 Thread Kurt Young
cc this to user & user-zh mailing list because this will affect lots of
users, and also quite a lot of users
were asking questions around this topic.

Let me try to understand this from user's perspective.

Your proposal will affect five functions, which are:

   - PROCTIME()
   - NOW()
   - CURRENT_DATE
   - CURRENT_TIME
   - CURRENT_TIMESTAMP

Before the changes, as I am writing this reply, the local time here is
*2021-01-21
12:03:35 (Beijing time, UTC+8)*.
And I tried these 5 functions in sql client, and got:

*Flink SQL> select now(), PROCTIME(), CURRENT_TIMESTAMP, CURRENT_DATE,
CURRENT_TIME;*

*+-+-+-+--+--+*

*|  EXPR$0 |  EXPR$1 |
CURRENT_TIMESTAMP | CURRENT_DATE | CURRENT_TIME |*

*+-+-+-+--+--+*

*| 2021-01-21T04:03:35.228 | 2021-01-21T04:03:35.228 |
2021-01-21T04:03:35.228 |   2021-01-21 | 04:03:35.228 |*

*+-+-+-+--+--+*
After the changes, the expected behavior will change to:

*Flink SQL> select now(), PROCTIME(), CURRENT_TIMESTAMP, CURRENT_DATE,
CURRENT_TIME;*

*+-+-+-+--+--+*

*|  EXPR$0 |  EXPR$1 |
CURRENT_TIMESTAMP | CURRENT_DATE | CURRENT_TIME |*

*+-+-+-+--+--+*

*| 2021-01-21T12:03:35.228 | 2021-01-21T12:03:35.228 |
2021-01-21T12:03:35.228 |   2021-01-21 | 12:03:35.228 |*

*+-+-+-+--+--+*
The return type of now(), proctime() and CURRENT_TIMESTAMP still be
TIMESTAMP;

Best,
Kurt


On Tue, Jan 19, 2021 at 6:42 PM Leonard Xu  wrote:

> I found above example format may mess up in different mail client, I post
> a picture here[1].
>
> Best,
> Leonard
>
> [1]
> https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/pictures/CURRRENT_TIMESTAMP.png
> <
> https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/pictures/CURRRENT_TIMESTAMP.png>
>
>
> > 在 2021年1月19日,16:22,Leonard Xu  写道:
> >
> > Hi, all
> >
> > I want to start the discussion about correcting time-related function
> behavior in Flink SQL, this is a tricky topic but I think it’s time to
> address it.
> >
> > Currently some temporal function behaviors are wired to users.
> > 1.  When users use a PROCTIME() in SQL, the value of PROCTIME() has a
> timezone offset with the wall-clock time in users' local time zone, users
> need to add their local time zone offset manually to get expected local
> timestamp(e.g: Users in Germany need to +1h to get expected local
> timestamp).
> >
> > 2. Users can not use CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP  to get
> wall-clock timestamp in local time zone, and thus they need write UDF in
> their SQL just for implementing a simple filter like WHERE date_col =
> CURRENT_DATE.
> >
> > 3. Another common case  is the time window  with day interval based on
> PROCTIME(), user plan to put all data from one day into the same window,
> but the window is assigned using timestamp in UTC+0 timezone rather than
> the session timezone which leads to the window starts with an offset(e.g:
> Users in China need to add -8h in their business sql start and then +8h
> when output the result, the conversion like a magic for users).
> >
> > These problems come from that lots of time-related functions like
> PROCTIME(), NOW(), CURRENT_DATE, CURRENT_TIME and CURRENT_TIMESTAMP are
> returning time values based on UTC+0 time zone.
> >
> > This topic will lead to a comparison of the three types, i.e.
> TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE, TIMESTAMP WITH LOCAL TIME ZONE and
> TIMESTAMP WITH TIME ZONE. In order to better understand the three types, I
> wrote a document[1] to help understand them better. You can also know the
> tree timestamp types behavior in Hadoop ecosystem from the reference link
> int the doc.
> >
> >
> > I Invested all Flink time-related functions current behavior and
> compared with other DB vendors like Pg,Presto, Hive, Spark, Snowflake,  I
> made an excel [2] to organize them well, we can use it for the next
> discussion. Please let me know if I missed something.
> > From my investigation, I think we need to correct the behavior of
> function NOW()/PROCTIME()/CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP, to
> correct them, we can change the function return type or function return
> value or change return type and return value both. All of those way are
> valid because SQL:2011 does not specify the function return type and every
> SQL engine vendor has its own implementation. For example the
> CURRENT_TIMESTAMP 

Re: [DISCUSS] Correct time-related function behavior in Flink SQL

2021-01-20 Thread Kurt Young
cc this to user & user-zh mailing list because this will affect lots of
users, and also quite a lot of users
were asking questions around this topic.

Let me try to understand this from user's perspective.

Your proposal will affect five functions, which are:

   - PROCTIME()
   - NOW()
   - CURRENT_DATE
   - CURRENT_TIME
   - CURRENT_TIMESTAMP

Before the changes, as I am writing this reply, the local time here is
*2021-01-21
12:03:35 (Beijing time, UTC+8)*.
And I tried these 5 functions in sql client, and got:

*Flink SQL> select now(), PROCTIME(), CURRENT_TIMESTAMP, CURRENT_DATE,
CURRENT_TIME;*

*+-+-+-+--+--+*

*|  EXPR$0 |  EXPR$1 |
CURRENT_TIMESTAMP | CURRENT_DATE | CURRENT_TIME |*

*+-+-+-+--+--+*

*| 2021-01-21T04:03:35.228 | 2021-01-21T04:03:35.228 |
2021-01-21T04:03:35.228 |   2021-01-21 | 04:03:35.228 |*

*+-+-+-+--+--+*
After the changes, the expected behavior will change to:

*Flink SQL> select now(), PROCTIME(), CURRENT_TIMESTAMP, CURRENT_DATE,
CURRENT_TIME;*

*+-+-+-+--+--+*

*|  EXPR$0 |  EXPR$1 |
CURRENT_TIMESTAMP | CURRENT_DATE | CURRENT_TIME |*

*+-+-+-+--+--+*

*| 2021-01-21T12:03:35.228 | 2021-01-21T12:03:35.228 |
2021-01-21T12:03:35.228 |   2021-01-21 | 12:03:35.228 |*

*+-+-+-+--+--+*
The return type of now(), proctime() and CURRENT_TIMESTAMP still be
TIMESTAMP;

Best,
Kurt


On Tue, Jan 19, 2021 at 6:42 PM Leonard Xu  wrote:

> I found above example format may mess up in different mail client, I post
> a picture here[1].
>
> Best,
> Leonard
>
> [1]
> https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/pictures/CURRRENT_TIMESTAMP.png
> <
> https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/pictures/CURRRENT_TIMESTAMP.png>
>
>
> > 在 2021年1月19日,16:22,Leonard Xu  写道:
> >
> > Hi, all
> >
> > I want to start the discussion about correcting time-related function
> behavior in Flink SQL, this is a tricky topic but I think it’s time to
> address it.
> >
> > Currently some temporal function behaviors are wired to users.
> > 1.  When users use a PROCTIME() in SQL, the value of PROCTIME() has a
> timezone offset with the wall-clock time in users' local time zone, users
> need to add their local time zone offset manually to get expected local
> timestamp(e.g: Users in Germany need to +1h to get expected local
> timestamp).
> >
> > 2. Users can not use CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP  to get
> wall-clock timestamp in local time zone, and thus they need write UDF in
> their SQL just for implementing a simple filter like WHERE date_col =
> CURRENT_DATE.
> >
> > 3. Another common case  is the time window  with day interval based on
> PROCTIME(), user plan to put all data from one day into the same window,
> but the window is assigned using timestamp in UTC+0 timezone rather than
> the session timezone which leads to the window starts with an offset(e.g:
> Users in China need to add -8h in their business sql start and then +8h
> when output the result, the conversion like a magic for users).
> >
> > These problems come from that lots of time-related functions like
> PROCTIME(), NOW(), CURRENT_DATE, CURRENT_TIME and CURRENT_TIMESTAMP are
> returning time values based on UTC+0 time zone.
> >
> > This topic will lead to a comparison of the three types, i.e.
> TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE, TIMESTAMP WITH LOCAL TIME ZONE and
> TIMESTAMP WITH TIME ZONE. In order to better understand the three types, I
> wrote a document[1] to help understand them better. You can also know the
> tree timestamp types behavior in Hadoop ecosystem from the reference link
> int the doc.
> >
> >
> > I Invested all Flink time-related functions current behavior and
> compared with other DB vendors like Pg,Presto, Hive, Spark, Snowflake,  I
> made an excel [2] to organize them well, we can use it for the next
> discussion. Please let me know if I missed something.
> > From my investigation, I think we need to correct the behavior of
> function NOW()/PROCTIME()/CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP, to
> correct them, we can change the function return type or function return
> value or change return type and return value both. All of those way are
> valid because SQL:2011 does not specify the function return type and every
> SQL engine vendor has its own implementation. For example the
> CURRENT_TIMESTAMP 

Re: In 1.11.2/flinkSql/batch, tableEnv.getConfig.setNullCheck(false) seems to break group by-s

2020-10-16 Thread Kurt Young
Yes, I think this is a bug, feel free to open a jira and a pull request.

Best,
Kurt


On Fri, Oct 16, 2020 at 4:13 PM Jon Alberdi  wrote:

> Hello to all,
>
> on flink-1.11.2 the program  written at
> https://gist.github.com/yetanotherion/d007fa113d97411226eaea4f20cd4c2d
>
> creates unexpected stack traces when the line “// triggerBug…”
>
> Is uncommented (some lines of the stack trace are written in the gist).
>
> (It correctly outputs
> “
>
> +-+
>
> |   c |
>
> +-+
>
> |   1 |
>
> |   2 |
>
> +-+
>
> “ else)
>
> Is that behavior expected?
>
> If not, do you think a jira should be created to handle that? (I’d be glad
> to do so)
>
>
> Regards,
>
> Ion
>
>
>


【公告】Flink Forward 2020 亚洲峰会议题提交时间延长

2020-10-09 Thread Kurt Young
大家好,

希望大家都过了一个美好充实的国庆。由于长假的影响,我们也决定将 Flink Forward 2020 亚洲峰会的议题提交截止时间延长到
*2020年10月22日*,提交链接:https://sourl.cn/ZEXM2Y

期待您的投递和参会!如果您有任何问题欢迎与我联系。

谢谢,
Kurt


【公告】Flink Forward 2020 亚洲峰会议题征集

2020-09-27 Thread Kurt Young
大家好,

自 2018 年 Flink Forward 大会首次引入亚洲以后,Flink 社区已成功举办了两届盛况空前的大会。不论是在参会公司、参会人数,还是议题
的深度和丰富度,无一不体现了这是目前国内最具规模和影响力的数据处理领域大会之一。

结合 2020 年的特殊情况,Flink Forward 亚洲峰会将转为全免费的线上模式。与以往相比,今年大会的主要特色在于:
1. *在线直播互动,听众反馈更及时*:大会将在线收集听众反馈,实时了解听众疑惑和问题并进行快速互动形成良性沟通闭环。
2. *组合传播,影响范围更广泛*:除主题分享的直播之外,大会还会将内容整理为视频、文字、电子书、专题等多种形式组合传播,扩大您的内容的影响力。
3. *内容主题划分更丰富*:在保留过往的热门主题之外,今年新增机器学习,云原生等热门领域,技术覆盖面更广。

目前大会已定档 *2020年12月26日* 正式举办,现已开放议题投递通道,您可以在 *2020年10月12日前* 通过以下链接提交您的议题:
https://sourl.cn/ZEXM2Y

期待您的投递和参会!如果您有任何问题欢迎与我联系。

谢谢,
Kurt


Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Kurt Young
Congratulations Dian!

Best,
Kurt


On Thu, Aug 27, 2020 at 7:28 PM Rui Li  wrote:

> Congratulations Dian!
>
> On Thu, Aug 27, 2020 at 5:39 PM Yuan Mei  wrote:
>
>> Congrats!
>>
>> On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang  wrote:
>>
>>> Congratulations Dian!
>>>
>>> Best,
>>> Xingbo
>>>
>>> jincheng sun  于2020年8月27日周四 下午5:24写道:
>>>
 Hi all,

 On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now
 part of the Apache Flink Project Management Committee (PMC).

 Dian Fu has been very active on PyFlink component, working on various
 important features, such as the Python UDF and Pandas integration, and
 keeps checking and voting for our releases, and also has successfully
 produced two releases(1.9.3&1.11.1) as RM, currently working as RM to push
 forward the release of Flink 1.12.

 Please join me in congratulating Dian Fu for becoming a Flink PMC
 Member!

 Best,
 Jincheng(on behalf of the Flink PMC)

>>>
>
> --
> Best regards!
> Rui Li
>


Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Kurt Young
Congratulations Dian!

Best,
Kurt


On Thu, Aug 27, 2020 at 7:28 PM Rui Li  wrote:

> Congratulations Dian!
>
> On Thu, Aug 27, 2020 at 5:39 PM Yuan Mei  wrote:
>
>> Congrats!
>>
>> On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang  wrote:
>>
>>> Congratulations Dian!
>>>
>>> Best,
>>> Xingbo
>>>
>>> jincheng sun  于2020年8月27日周四 下午5:24写道:
>>>
 Hi all,

 On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now
 part of the Apache Flink Project Management Committee (PMC).

 Dian Fu has been very active on PyFlink component, working on various
 important features, such as the Python UDF and Pandas integration, and
 keeps checking and voting for our releases, and also has successfully
 produced two releases(1.9.3&1.11.1) as RM, currently working as RM to push
 forward the release of Flink 1.12.

 Please join me in congratulating Dian Fu for becoming a Flink PMC
 Member!

 Best,
 Jincheng(on behalf of the Flink PMC)

>>>
>
> --
> Best regards!
> Rui Li
>


Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-16 Thread Kurt Young
Hi Kostas,

Thanks for starting this discussion. The first part of this FLIP: "Batch vs
Streaming Scheduling" looks reasonable to me.
However, there is another dimension I think we should also take into
consideration, which is whether checkpointing is enabled.

This option is orthogonal (but not fully) to the boundedness and
persistence of the input. For example, consider an arbitrary operator
who uses state, we can enable checkpoint to achieve better failure recovery
if the input is bounded and pipelined. And if the input
is bounded and persistent, we can still use checkpointing, but we might
need to checkpoint the offset of the intermediate result set of
the operator. This would require much more work and we can defer this to
the future.

Beyond this dimension, there is another question to be asked. If the
topology is mixed with some bounded and unbounded inputs, what
would be the behavior? E.g. a join operator with one of its input bounded,
and another input unbounded. Can we still use BATCH or
STREAMING to define the schedule policy? What kind of failure recovery
guarantee Flink can provide to the users.

I don't have a clear answer for now, but just want to raise them up to seek
some discussion.

Best,
Kurt


On Wed, Aug 12, 2020 at 11:22 PM Kostas Kloudas  wrote:

> Hi all,
>
> As described in FLIP-131 [1], we are aiming at deprecating the DataSet
> API in favour of the DataStream API and the Table API. After this work
> is done, the user will be able to write a program using the DataStream
> API and this will execute efficiently on both bounded and unbounded
> data. But before we reach this point, it is worth discussing and
> agreeing on the semantics of some operations as we transition from the
> streaming world to the batch one.
>
> This thread and the associated FLIP [2] aim at discussing these issues
> as these topics are pretty important to users and can lead to
> unpleasant surprises if we do not pay attention.
>
> Let's have a healthy discussion here and I will be updating the FLIP
> accordingly.
>
> Cheers,
> Kostas
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
> [2]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158871522
>


Re: [DISCUSS] FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API)

2020-07-30 Thread Kurt Young
+1, looking forward to the follow up FLIPs.

Best,
Kurt


On Thu, Jul 30, 2020 at 6:40 PM Arvid Heise  wrote:

> +1 of getting rid of the DataSet API. Is DataStream#iterate already
> superseding DataSet iterations or would that also need to be accounted for?
>
> In general, all surviving APIs should also offer a smooth experience for
> switching back and forth.
>
> On Thu, Jul 30, 2020 at 9:39 AM Márton Balassi 
> wrote:
>
> > Hi All,
> >
> > Thanks for the write up and starting the discussion. I am in favor of
> > unifying the APIs the way described in the FLIP and deprecating the
> DataSet
> > API. I am looking forward to the detailed discussion of the changes
> > necessary.
> >
> > Best,
> > Marton
> >
> > On Wed, Jul 29, 2020 at 12:46 PM Aljoscha Krettek 
> > wrote:
> >
> >> Hi Everyone,
> >>
> >> my colleagues (in cc) and I would like to propose this FLIP for
> >> discussion. In short, we want to reduce the number of APIs that we have
> >> by deprecating the DataSet API. This is a big step for Flink, that's why
> >> I'm also cross-posting this to the User Mailing List.
> >>
> >> FLIP-131: http://s.apache.org/FLIP-131
> >>
> >> I'm posting the introduction of the FLIP below but please refer to the
> >> document linked above for the full details:
> >>
> >> --
> >> Flink provides three main SDKs/APIs for writing Dataflow Programs: Table
> >> API/SQL, the DataStream API, and the DataSet API. We believe that this
> >> is one API too many and propose to deprecate the DataSet API in favor of
> >> the Table API/SQL and the DataStream API. Of course, this is easier said
> >> than done, so in the following, we will outline why we think that having
> >> too many APIs is detrimental to the project and community. We will then
> >> describe how we can enhance the Table API/SQL and the DataStream API to
> >> subsume the DataSet API's functionality.
> >>
> >> In this FLIP, we will not describe all the technical details of how the
> >> Table API/SQL and DataStream will be enhanced. The goal is to achieve
> >> consensus on the idea of deprecating the DataSet API. There will have to
> >> be follow-up FLIPs that describe the necessary changes for the APIs that
> >> we maintain.
> >> --
> >>
> >> Please let us know if you have any concerns or comments. Also, please
> >> keep discussion to this ML thread instead of commenting in the Wiki so
> >> that we can have a consistent view of the discussion.
> >>
> >> Best,
> >> Aljoscha
> >>
> >
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: flink-1.11 集成hive-1.2.1 DDL问题

2020-07-19 Thread Kurt Young
1.11 把默认planner换成blink了,需要添加下blink planner的依赖

Best,
Kurt


On Mon, Jul 20, 2020 at 11:39 AM Rui Li  wrote:

> stacktrace上看起来是创建blink planner的时候出错的。检查下依赖的blink planner版本是不是正确?
>
> On Fri, Jul 17, 2020 at 7:29 PM kcz <573693...@qq.com> wrote:
>
> > idea 本地测试
> > 跟hive有关pom依赖
> > hive-exec flink-connector-hive_2.11
> > 代码如下:
> >  StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> > env.setParallelism(1);
> > env.enableCheckpointing(60*1000, CheckpointingMode.EXACTLY_ONCE);
> > // 同一时间只允许进行一个检查点
> > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> >
> > env.setStateBackend(new FsStateBackend(path));
> >
> > StreamTableEnvironment tableEnv =
> > StreamTableEnvironment.create(env);
> >
> > String name= "myhive";
> > String defaultDatabase = "situation";
> > String hiveConfDir = "/load/data/hive/hive-conf"; // a local
> > path
> > String version = "1.2.1";
> >
> > HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> > hiveConfDir, version);
> > tableEnv.registerCatalog("myhive", hive);
> >
> > // set the HiveCatalog as the current catalog of the session
> > tableEnv.useCatalog("myhive");
> > tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS stream_tmp");
> > tableEnv.executeSql("DROP TABLE IF EXISTS
> > stream_tmp.source_table");
> >
> >
> > 报错如下:
> > 
> > Exception in thread "main" java.lang.IncompatibleClassChangeError:
> > Implementing class
> > at java.lang.ClassLoader.defineClass1(Native Method)
> > at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
> > at
> > java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> > at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
> > at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
> > at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
> > at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> > at
> >
> org.apache.flink.table.planner.delegation.PlannerBase. > at
> >
> org.apache.flink.table.planner.delegation.StreamPlanner. > at
> >
> org.apache.flink.table.planner.delegation.BlinkPlannerFactory.create(BlinkPlannerFactory.java:50)
> > at
> >
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:130)
> > at
> >
> org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:111)
> > at com.hive.HiveTest.main(HiveTest.java:33)
>
>
>
> --
> Best regards!
> Rui Li
>


Re: Print table content in Flink 1.11

2020-07-15 Thread Kurt Young
Hi Flavio,

In 1.11 we have provided an easier way to print table content, after you
got the `table` object,
all you need to to is calling `table.execute().print();`

Best,
Kurt


On Thu, Jul 16, 2020 at 9:35 AM Leonard Xu  wrote:

> Hi, Flavio
>
>
> 在 2020年7月16日,00:19,Flavio Pompermaier  写道:
>
> final JobExecutionResult jobRes = tableEnv.execute("test-job");
>
>
> In Flink 1.11, once a Table has transformed to DataStream, only
> StreamExecutionEnvironment can execute the DataStream program, please use
> env.execute(“test-job”) in this case, you can get mote information from [1].
>
>
> Best,
> Leonard Xu
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#convert-a-table-into-a-datastream-or-dataset
>
>


Re: Flink SQL ddl 中含有关键字 且在ddl中使用TO_TIMESTAMP、TO_DATE函数语法检查异常问题

2020-06-16 Thread Kurt Young
应该是这个: https://issues.apache.org/jira/browse/FLINK-16068

Best,
Kurt


On Tue, Jun 16, 2020 at 5:09 PM zilong xiao  wrote:

> 我看了下1.10.1的release note,您说的应该就是这个issue:
> https://issues.apache.org/jira/browse/FLINK-16345
> ,但是这个issue所描述的问题貌似和我的不太一样,我的这个问题是在使用TO_TIMESTAMP、TO_
> DATE函数且,ddl中含有关键字字段时,语法检测会报错,不知道这个问题是否跟这个issue有关呢?
>
> Benchao Li  于2020年6月16日周二 下午5:00写道:
>
> > 你用的是1.10.0版本么?这个版本的计算列处理有点问题,建议用1.10.1尝试一下。
> >
> > zilong xiao  于2020年6月16日周二 下午4:56写道:
> >
> >> 如题,在SQL
> >>
> ddl中如含有关键字字段(已用反引号括上),并且在ddl中使用了TO_TIMESTAMP、TO_DATE函数,在进行语法检查的时候会报关键字冲突异常,如果将图中的```itime
> >> as TO_TIMESTAMP(time2)```去掉,语法检查正常通过,还麻烦各位大佬帮忙看看~
> >> 代码如下图:
> >> [image: image.png]
> >> 异常堆栈:
> >>
> >
>


Re: flink1.11 小疑问(提升 DDL 易用性(动态 Table 属性))

2020-06-15 Thread Kurt Young
table hint的语法是紧跟在你query中访问某张表的时候,所以我理解并不会有 ”这个动态参数作用在哪张表“ 上的疑问吧?

Best,
Kurt


On Tue, Jun 16, 2020 at 10:02 AM Yichao Yang <1048262...@qq.com> wrote:

> Hi
>
>
> 1.2版本将会有like字句的支持,参考[1],不过也是通过定义一张表的方式,而不是直接在query内指定。
>
> 个人理解在query内指定其实会涉及到很多因素,假设涉及到多张表的时候,涉及到同key属性时,你在query内指定的属性到底是赋予给哪张表的?这个其实是比较模糊的。
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
>
>
> Best,
> Yichao Yang
>
>
>
>
> --原始邮件--
> 发件人:"Kurt Young" 发送时间:2020年6月16日(星期二) 上午9:53
> 收件人:"user-zh"
> 主题:Re: flink1.11 小疑问(提升 DDL 易用性(动态 Table 属性))
>
>
>
> 就是你DDL定义表的时候的WITH参数,有时候有个别参数写的不对或者需要调整,可以在query里直接修改,而不用重新定义一张新表。
>
> Best,
> Kurt
>
>
> On Tue, Jun 16, 2020 at 9:49 AM kcz <573693...@qq.com wrote:
>
>  动态 Table 属性是指什么?可以举一个列子吗。


Re: flink1.11 小疑问(提升 DDL 易用性(动态 Table 属性))

2020-06-15 Thread Kurt Young
就是你DDL定义表的时候的WITH参数,有时候有个别参数写的不对或者需要调整,可以在query里直接修改,而不用重新定义一张新表。

Best,
Kurt


On Tue, Jun 16, 2020 at 9:49 AM kcz <573693...@qq.com> wrote:

> 动态 Table 属性是指什么?可以举一个列子吗。


Re: Flink/SparkStreaming 性能测试(吞吐/延时)

2020-06-11 Thread Kurt Young
我们最近做了一个基于beam nexmark的性能对比测试[1],你可以参考一下。
和beam的测试不同的是,我们用各自引擎的API对着测试case描述的场景重新写了一下,并不是像这个里面一样全都用
beam的api写测试case,然后翻译到多个runner之上。

[1] https://beam.apache.org/documentation/sdks/java/testing/nexmark/

Best,
Kurt


On Fri, Jun 12, 2020 at 10:49 AM Zhonghan Tang <13122260...@163.com> wrote:

> Hi,
> 近期我需要做一个 Flink/SparkStreaming 吞吐/延时的性能分析, 我发现网上这方面资料很少,  只有17年美团/15年yahoo
> 做了一个类似的分析. 问题如下:
> 1. 简单的读kafka写kafka 要如何记录数据进flink/出flink 的时间? 如果是打时间戳要怎么打? 打的话会不会影响性能?
> 2. 我想到的场景是: 简单插数etl, 基本的过滤, 窗口. 请问还有没有什么定性定量的方式可以测量框架性能?
>
>
> 美团链接:
>
> https://tech.meituan.com/2017/11/17/flink-benchmark.html?spm=a2c6h.13066369.0.0.5e3c1455V4UrXH
> yahoo:
>
> https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming-computation-engines-at
>
>
> | |
> Zhonghan Tang
> |
> |
> 13122260...@163.com
> |
> 签名由网易邮箱大师定制
>
>


Re: What's the best practice to determine whether a job has finished or not?

2020-05-08 Thread Kurt Young
+dev 

Best,
Kurt


On Fri, May 8, 2020 at 3:35 PM Caizhi Weng  wrote:

> Hi Jeff,
>
> Thanks for the response. However I'm using executeAsync so that I can run
> the job asynchronously and get a JobClient to monitor the job. JobListener
> only works for synchronous execute method. Is there other way to achieve
> this?
>
> Jeff Zhang  于2020年5月8日周五 下午3:29写道:
>
>> I use JobListener#onJobExecuted to be notified that the flink job is
>> done.
>> It is pretty reliable for me, the only exception is the client process is
>> down.
>>
>> BTW, the reason you see ApplicationNotFound exception is that yarn app
>> is terminated which means the flink cluster is shutdown. While for
>> standalone mode, the flink cluster is always up.
>>
>>
>> Caizhi Weng  于2020年5月8日周五 下午2:47写道:
>>
>>> Hi dear Flink community,
>>>
>>> I would like to determine whether a job has finished (no matter
>>> successfully or exceptionally) in my code.
>>>
>>> I used to think that JobClient#getJobStatus is a good idea, but I found
>>> that it behaves quite differently under different executing environments.
>>> For example, under a standalone session cluster it will return the FINISHED
>>> status for a finished job, while under a yarn per job cluster it will throw
>>> a ApplicationNotFound exception. I'm afraid that there might be other
>>> behaviors for other environments.
>>>
>>> So what's the best practice to determine whether a job has finished or
>>> not? Note that I'm not waiting for the job to finish. If the job hasn't
>>> finished I would like to know it and do something else.
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>


Re: table.show() in Flink

2020-05-05 Thread Kurt Young
A more straightforward way after FLIP-84 would be:
TableResult result = tEnv.executeSql("select xxx ...");
result.print();

And if you are using 1.10 now, you can use TableUtils#collectToList(table)
to collect the
result to a list, and then print rows by yourself.

Best,
Kurt


On Tue, May 5, 2020 at 8:44 PM Jark Wu  wrote:

> Hi Matyas,
>
> AFAIK, currently, this is the recommended way to print result of table.
> In FLIP-84 [1] , which is targeted to 1.11, we will introduce some new
> APIs to do the fluent printing like this.
>
> Table table2 = tEnv.sqlQuery("select yy ...");
> TableResult result2 = table2.execute();
> result2.print();
>
> cc @Godfrey, please correct if I misunderstand the above API.
>
> Best,
> Jark
>
> [1]:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
>
> On Tue, 5 May 2020 at 20:19, Őrhidi Mátyás 
> wrote:
>
>> Dear Flink Community,
>>
>> I'm missing Spark's table.show() method in Flink. I'm using the following
>> alternative at the moment:
>>
>> Table results = tableEnv.sqlQuery("SELECT * FROM my_table");
>> tableEnv.toAppendStream(results, Row.class).print();
>>
>> Is it the recommended way to print the content of a table?
>>
>>
>> Thanks,
>>
>> Matyas
>>
>>
>>
>>


Re: [DISCUSS] Hierarchies in ConfigOption

2020-04-29 Thread Kurt Young
IIUC FLIP-122 already delegate the responsibility for designing and parsing
connector properties to connector developers.
So frankly speaking, no matter which style we choose, there is no strong
guarantee for either of these. So it's also possible
that developers can choose a totally different way to express properties,
such as:

'format' = 'csv',
'csv.allow-comments' = 'true',
'csv.ignore-parse-errors' = 'true'

which also seems quite straightforward and easy to use. So my opinion on
this would be since there is no guarantee for developers
to choose "format" as common prefix of all format related properties, there
is not much value to extend 'format' to 'format.kind'.


Best,
Kurt


On Thu, Apr 30, 2020 at 10:17 AM Jingsong Li  wrote:

> Thanks Timo for staring the discussion.
>
> I am +1 for "format: 'json'".
> Take a look to Dawid's yaml case:
>
> connector: 'filesystem'
> path: '...'
> format: 'json'
> format:
> option1: '...'
> option2: '...'
> option3: '...'
>
> Is this work?
> According to my understanding, 'format' key is the attribute of connector,
> which can be separately configured outside. In the 'format' block, they are
> the attribute of format.
> So this json style block can only contain the properties exclude format
> itself.
>
> Best,
> Jingsong Lee
>
> On Thu, Apr 30, 2020 at 9:58 AM Benchao Li  wrote:
>
>> Thanks Timo for staring the discussion.
>>
>> Generally I like the idea to keep the config align with a standard like
>> json/yaml.
>>
>> From the user's perspective, I don't use table configs from a config file
>> like yaml or json for now,
>> And it's ok to change it to yaml like style. Actually we didn't know that
>> this could be a yaml like
>> configuration hierarchy. If it has a hierarchy, we maybe consider that in
>> the future to load the
>> config from a yaml/json file.
>>
>> Regarding the name,
>> 'format.kind' looks fine to me. However there is another name from the
>> top of my head:
>> 'format.name', WDYT?
>>
>> Dawid Wysakowicz  于2020年4月29日周三 下午11:56写道:
>>
>>> Hi all,
>>>
>>> I also wanted to share my opinion.
>>>
>>> When talking about a ConfigOption hierarchy we use for configuring Flink
>>> cluster I would be a strong advocate for keeping a yaml/hocon/json/...
>>> compatible style. Those options are primarily read from a file and thus
>>> should at least try to follow common practices for nested formats if we
>>> ever decide to switch to one.
>>>
>>> Here the question is about the properties we use in SQL statements. The
>>> origin/destination of these usually will be external catalog, usually in a
>>> flattened(key/value) representation so I agree it is not as important as in
>>> the aforementioned case. Nevertheless having a yaml based catalog or being
>>> able to have e.g. yaml based snapshots of a catalog in my opinion is
>>> appealing. At the same time cost of being able to have a nice
>>> yaml/hocon/json representation is just adding a single suffix to a
>>> single(at most 2 key + value) property. The question is between `format` =
>>> `json` vs `format.kind` = `json`. That said I'd be slighty in favor of
>>> doing it.
>>>
>>> Just to have a full picture. Both cases can be represented in yaml, but
>>> the difference is significant:
>>> format: 'json'
>>> format.option: 'value'
>>>
>>> vs
>>> format:
>>> kind: 'json'
>>>
>>> option: 'value'
>>>
>>> Best,
>>> Dawid
>>>
>>> On 29/04/2020 17:13, Flavio Pompermaier wrote:
>>>
>>> Personally I don't have any preference here.  Compliance wih standard
>>> YAML parser is probably more important
>>>
>>> On Wed, Apr 29, 2020 at 5:10 PM Jark Wu  wrote:
>>>
 From a user's perspective, I prefer the shorter one "format=json",
 because
 it's more concise and straightforward. The "kind" is redundant for
 users.
 Is there a real case requires to represent the configuration in JSON
 style?
 As far as I can see, I don't see such requirement, and everything works
 fine by now.

 So I'm in favor of "format=json". But if the community insist to follow
 code style on this, I'm also fine with the longer one.

 Btw, I also CC user mailing list to listen more user's feedback.
 Because I
 think this is relative to usability.

 Best,
 Jark

 On Wed, 29 Apr 2020 at 22:09, Chesnay Schepler 
 wrote:

 >  > Therefore, should we advocate instead:
 >  >
 >  > 'format.kind' = 'json',
 >  > 'format.fail-on-missing-field' = 'false'
 >
 > Yes. That's pretty much it.
 >
 > This is reasonable important to nail down as with such violations I
 > believe we could not actually switch to a standard YAML parser.
 >
 > On 29/04/2020 16:05, Timo Walther wrote:
 > > Hi everyone,
 > >
 > > discussions around ConfigOption seem to be very popular recently.
 So I
 > > would also like to get some opinions on a different topic.
 > >
 > > How do we represent hierarchies in ConfigOption? In 

Re: [DISCUSS] Hierarchies in ConfigOption

2020-04-29 Thread Kurt Young
IIUC FLIP-122 already delegate the responsibility for designing and parsing
connector properties to connector developers.
So frankly speaking, no matter which style we choose, there is no strong
guarantee for either of these. So it's also possible
that developers can choose a totally different way to express properties,
such as:

'format' = 'csv',
'csv.allow-comments' = 'true',
'csv.ignore-parse-errors' = 'true'

which also seems quite straightforward and easy to use. So my opinion on
this would be since there is no guarantee for developers
to choose "format" as common prefix of all format related properties, there
is not much value to extend 'format' to 'format.kind'.


Best,
Kurt


On Thu, Apr 30, 2020 at 10:17 AM Jingsong Li  wrote:

> Thanks Timo for staring the discussion.
>
> I am +1 for "format: 'json'".
> Take a look to Dawid's yaml case:
>
> connector: 'filesystem'
> path: '...'
> format: 'json'
> format:
> option1: '...'
> option2: '...'
> option3: '...'
>
> Is this work?
> According to my understanding, 'format' key is the attribute of connector,
> which can be separately configured outside. In the 'format' block, they are
> the attribute of format.
> So this json style block can only contain the properties exclude format
> itself.
>
> Best,
> Jingsong Lee
>
> On Thu, Apr 30, 2020 at 9:58 AM Benchao Li  wrote:
>
>> Thanks Timo for staring the discussion.
>>
>> Generally I like the idea to keep the config align with a standard like
>> json/yaml.
>>
>> From the user's perspective, I don't use table configs from a config file
>> like yaml or json for now,
>> And it's ok to change it to yaml like style. Actually we didn't know that
>> this could be a yaml like
>> configuration hierarchy. If it has a hierarchy, we maybe consider that in
>> the future to load the
>> config from a yaml/json file.
>>
>> Regarding the name,
>> 'format.kind' looks fine to me. However there is another name from the
>> top of my head:
>> 'format.name', WDYT?
>>
>> Dawid Wysakowicz  于2020年4月29日周三 下午11:56写道:
>>
>>> Hi all,
>>>
>>> I also wanted to share my opinion.
>>>
>>> When talking about a ConfigOption hierarchy we use for configuring Flink
>>> cluster I would be a strong advocate for keeping a yaml/hocon/json/...
>>> compatible style. Those options are primarily read from a file and thus
>>> should at least try to follow common practices for nested formats if we
>>> ever decide to switch to one.
>>>
>>> Here the question is about the properties we use in SQL statements. The
>>> origin/destination of these usually will be external catalog, usually in a
>>> flattened(key/value) representation so I agree it is not as important as in
>>> the aforementioned case. Nevertheless having a yaml based catalog or being
>>> able to have e.g. yaml based snapshots of a catalog in my opinion is
>>> appealing. At the same time cost of being able to have a nice
>>> yaml/hocon/json representation is just adding a single suffix to a
>>> single(at most 2 key + value) property. The question is between `format` =
>>> `json` vs `format.kind` = `json`. That said I'd be slighty in favor of
>>> doing it.
>>>
>>> Just to have a full picture. Both cases can be represented in yaml, but
>>> the difference is significant:
>>> format: 'json'
>>> format.option: 'value'
>>>
>>> vs
>>> format:
>>> kind: 'json'
>>>
>>> option: 'value'
>>>
>>> Best,
>>> Dawid
>>>
>>> On 29/04/2020 17:13, Flavio Pompermaier wrote:
>>>
>>> Personally I don't have any preference here.  Compliance wih standard
>>> YAML parser is probably more important
>>>
>>> On Wed, Apr 29, 2020 at 5:10 PM Jark Wu  wrote:
>>>
 From a user's perspective, I prefer the shorter one "format=json",
 because
 it's more concise and straightforward. The "kind" is redundant for
 users.
 Is there a real case requires to represent the configuration in JSON
 style?
 As far as I can see, I don't see such requirement, and everything works
 fine by now.

 So I'm in favor of "format=json". But if the community insist to follow
 code style on this, I'm also fine with the longer one.

 Btw, I also CC user mailing list to listen more user's feedback.
 Because I
 think this is relative to usability.

 Best,
 Jark

 On Wed, 29 Apr 2020 at 22:09, Chesnay Schepler 
 wrote:

 >  > Therefore, should we advocate instead:
 >  >
 >  > 'format.kind' = 'json',
 >  > 'format.fail-on-missing-field' = 'false'
 >
 > Yes. That's pretty much it.
 >
 > This is reasonable important to nail down as with such violations I
 > believe we could not actually switch to a standard YAML parser.
 >
 > On 29/04/2020 16:05, Timo Walther wrote:
 > > Hi everyone,
 > >
 > > discussions around ConfigOption seem to be very popular recently.
 So I
 > > would also like to get some opinions on a different topic.
 > >
 > > How do we represent hierarchies in ConfigOption? In 

Re: Re: sql 行转列

2020-04-26 Thread Kurt Young
从你的原始sql看起来,我猜测你是想在做统计的时候,要套用一个过滤条件?从你的原始sql我没看出任何和“行转列”相关的迹象和需求,能否详细解释一下

Best,
Kurt


On Sun, Apr 26, 2020 at 6:20 PM Benchao Li  wrote:

> 你指的是多行转多行么?如果是的话,那你需要的应该是Table Aggregate Function[1],但是这个只能在Table Api里使用,
> 在SQL里面没有这种语义可以直接使用。
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html#table-aggregation-functions
>
> 王双利  于2020年4月26日周日 下午6:14写道:
>
> >看到的例子,都是 对单行数据进行转行的,这种多行数据的,有相关例子吗?
> >
> > 发件人: Benchao Li
> > 发送时间: 2020-04-26 17:31
> > 收件人: user-zh
> > 主题: Re: sql 行转列
> > Hi 双利,
> >
> > 在Flink里面行转列用的是Table Function,你可以参考下[1] 里面的 ”Join with Table Function
> > (UDTF)“ 部分。
> >
> > [1]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins
> >
> > 王双利  于2020年4月26日周日 下午5:19写道:
> >
> > > select ip,
> > > case status when 'success' THEN sum(t) ELSE 0 end successct,
> > > case status when 'fail' THEN sum(t) ELSE 0 end failct
> > > from view1
> > > group by ip 这样不能行转列,有解决方案吗?
> > >
> > >
> > >
> >
> > --
> >
> > Benchao Li
> > School of Electronics Engineering and Computer Science, Peking University
> > Tel:+86-15650713730
> > Email: libenc...@gmail.com; libenc...@pku.edu.cn
> >
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Re: batch range sort support

2020-04-23 Thread Kurt Young
Hi Benchao, you can create a jira issue to track this.

Best,
Kurt


On Thu, Apr 23, 2020 at 2:27 PM Benchao Li  wrote:

> Hi Jingsong,
>
> Thanks for your quick response. I've CC'ed Chongchen who understands the
> scenario much better.
>
>
> Jingsong Li  于2020年4月23日周四 下午12:34写道:
>
>> Hi, Benchao,
>>
>> Glad to see your requirement about range partition.
>> I have a branch to support range partition: [1]
>>
>> Can you describe your scene in more detail? What sink did you use for
>> your jobs? A simple and complete business scenario? This can help the
>> community judge the importance of the range partition.
>>
>> [1]https://github.com/JingsongLi/flink/commits/range
>>
>> Best,
>> Jingsong Lee
>>
>> On Thu, Apr 23, 2020 at 12:15 PM Benchao Li  wrote:
>>
>>> Hi,
>>>
>>> Currently the sort operator in blink planner is global, which has
>>> bottleneck if we sort a lot of data.
>>>
>>> And I found 'table.exec.range-sort.enabled' config in BatchExecSortRule,
>>> which makes me very exciting.
>>> After enabling this config, I found that it's not implemented completely
>>> now. This config changes the distribution
>>>  from SINGLETON to range for sort operator, however in BatchExecExchange
>>> we do not deal with range
>>> distribution, and will throw UnsupportedOperationException.
>>>
>>> My question is,
>>> 1. Is this config just a mistake when we merge blink into flink, and we
>>> actually didn't plan to implement this?
>>> 2. If this is in the plan, then which version may we expect it to be
>>> ready?
>>>
>>>
>>> --
>>>
>>> Benchao Li
>>> School of Electronics Engineering and Computer Science, Peking University
>>> Tel:+86-15650713730
>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>
>>>
>>
>> --
>> Best, Jingsong Lee
>>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>


Re: Blink SQL java.lang.ArrayIndexOutOfBoundsException

2020-04-20 Thread Kurt Young
Thanks, once you can reproduce this issue locally, please open a jira with
your testing program.

Best,
Kurt


On Tue, Apr 21, 2020 at 8:48 AM 刘建刚  wrote:

> Thank you. It is an online job and my input is huge. I check the trace and
> find that the array is resized when the array is not enough. The code is as
> below:
>
> public void add (int value) {
>int[] items = this.items;
>if (size == items.length) items = resize(Math.max(8, (int)(size * 1.75f)));
>items[size++] = value;
> }
>
>
> Only blink planner has this error. Can it be a thread-safe problem or
> something else? I will try to reproduce it locally.
>
> 2020年4月21日 上午12:20,Jark Wu-3 [via Apache Flink User Mailing List archive.]
>  写道:
>
> Hi,
>
> Are you using versions < 1.9.2? From the exception stack, it looks like
> caused by FLINK-13702, which is already fixed in 1.9.2 and 1.10.0.
> Could you try it using 1.9.2?
>
> Best,
> Jark
>
> On Mon, 20 Apr 2020 at 21:00, Kurt Young <[hidden email]> wrote:
>
>> Can you reproduce this in a local program with mini-cluster?
>>
>> Best,
>> Kurt
>>
>>
>> On Mon, Apr 20, 2020 at 8:07 PM Zahid Rahman <[hidden email]> wrote:
>>
>>> You can read this for this type error.
>>>
>>>
>>> https://stackoverflow.com/questions/28189446/i-always-get-this-error-exception-in-thread-main-java-lang-arrayindexoutofbou#comment44747327_28189446
>>>
>>> I would suggest you set break points  in your code. Step through the
>>> code, this  method should show you which array variable is being passed a
>>> null argument when the array variable is not null able.
>>>
>>>
>>>
>>>
>>> On Mon, 20 Apr 2020, 10:07 刘建刚, <[hidden email]> wrote:
>>>
>>>>I am using Roaring64NavigableMap to compute uv. It is ok to us
>>>> flink planner and not ok with blink planner. The SQL is as following:
>>>>
>>>> SELECT toLong(TUMBLE_START(eventTime, interval '1' minute)) as 
>>>> curTimestamp, A, B, C, D,
>>>> E, uv(bitmap(id)) as bmp
>>>> FROM person
>>>> GROUP BY TUMBLE(eventTime, interval '1' minute), A, B, C, D, E
>>>>
>>>>
>>>>   The udf is as following:
>>>>
>>>> public static class Bitmap extends 
>>>> AggregateFunction {
>>>>@Override
>>>>public Roaring64NavigableMap createAccumulator() {
>>>>   return new Roaring64NavigableMap();
>>>>}
>>>>
>>>>@Override
>>>>public Roaring64NavigableMap getValue(Roaring64NavigableMap 
>>>> accumulator) {
>>>>   return accumulator;
>>>>}
>>>>
>>>>public void accumulate(Roaring64NavigableMap bitmap, long id) {
>>>>   bitmap.add(id);
>>>>}
>>>> }
>>>>
>>>> public static class UV extends ScalarFunction {
>>>>public long eval(Roaring64NavigableMap bitmap) {
>>>>   return bitmap.getLongCardinality();
>>>>}
>>>> }
>>>>
>>>>   The error is as following:
>>>>
>>>> 2020-04-20 16:37:13,868 INFO
>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>>  [flink-akka.actor.default-dispatcher-40]  -
>>>> GroupWindowAggregate(groupBy=[brand, platform, channel, versionName,
>>>> appMajorVersion], window=[TumblingGroupWindow('w$, eventTime, 6)],
>>>> properties=[w$start, w$end, w$rowtime, w$proctime], select=[brand,
>>>> platform, channel, versionName, appMajorVersion, bitmap(id) AS $f5,
>>>> start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
>>>> proctime('w$) AS w$proctime]) -> Calc(select=[toLong(w$start) AS
>>>> curTimestamp, brand, platform, channel, versionName, appMajorVersion,
>>>> uv($f5) AS bmp]) -> SinkConversionToTuple2 -> (Flat Map, Flat Map -> Sink:
>>>> Unnamed) (321/480) (8eb918b493ea26e2bb60f8307347dc1a) switched from RUNNING
>>>> to FAILED.
>>>> java.lang.ArrayIndexOutOfBoundsException: -1
>>>>   at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
>>>>   at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
>>>>   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
>>>>   at
>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSe

Re: Joining table with row attribute against an enrichment table

2020-04-20 Thread Kurt Young
According to the current implementation, yes you are right hive table
source will always be bounded.
But conceptually, we can't do this assumption. For example, we
might further improve hive table source
to also support unbounded cases, .e.g. monitoring hive tables and always
read newly appeared data.
So right now, Flink relies on the "global flag" to distinguish whether the
table should be treated as static
or dynamically changing.

The "global flag" is whether you are using `BatchTableEnvironment` or
`StreamTableEnvironment` (old versions)
and EnvironmentSettings's batchMode or streamingMode (newer versions).

But we should admit that Flink hasn't finish the unification work. Your
case will also be considered in the
future when we want to further unify and simplify these concepts and
usages.

Best,
Kurt


On Mon, Apr 20, 2020 at 10:09 PM Gyula Fóra  wrote:

> The HiveTableSource (and many others) return isBounded() -> true.
> In this case it is not even possible for it to change over time, so I am a
> bit confused.
>
> To me it sounds like you should always be able to join a stream against a
> bounded table, temporal or not it is pretty well defined.
> Maybe there is some fundamental concept that I dont understand, I don't
> have much experience with this to be fair.
>
> Gyula
>
> On Mon, Apr 20, 2020 at 4:03 PM Kurt Young  wrote:
>
>> The reason here is Flink doesn't know the hive table is static. After you
>> create these two tables and
>> trying to join them, Flink will assume both table will be changing with
>> time.
>>
>> Best,
>> Kurt
>>
>>
>> On Mon, Apr 20, 2020 at 9:48 PM Gyula Fóra  wrote:
>>
>>> Hi!
>>>
>>> The problem here is that I dont have a temporal table.
>>>
>>> I have a regular stream from kafka (with even time attribute) and a
>>> static table in hive.
>>> The Hive table is static, it doesn't change. It doesn't have any time
>>> attribute, it's not temporal.
>>>
>>> Gyula
>>>
>>> On Mon, Apr 20, 2020 at 3:43 PM godfrey he  wrote:
>>>
>>>> Hi Gyual,
>>>>
>>>> Can you convert the regular join to lookup join (temporal join) [1],
>>>> and then you can use window aggregate.
>>>>
>>>> >  I understand that the problem is that we cannot join with the Hive
>>>> table and still maintain the watermark/even time column. But why is this?
>>>> Regular join can't maintain the time attribute as increasing trend (one
>>>> record may be joined with a very old record),
>>>> that means the watermark does not also been guaranteed to increase.
>>>>
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table
>>>>
>>>> Best,
>>>> Godfrey
>>>>
>>>> Gyula Fóra  于2020年4月20日周一 下午4:46写道:
>>>>
>>>>> Hi All!
>>>>>
>>>>> We hit a the following problem with SQL and trying to understand if
>>>>> there is a valid workaround.
>>>>>
>>>>> We have 2 tables:
>>>>>
>>>>> *Kafka*
>>>>> timestamp (ROWTIME)
>>>>> item
>>>>> quantity
>>>>>
>>>>> *Hive*
>>>>> item
>>>>> price
>>>>>
>>>>> So we basically have incoming (ts, id, quantity) and we want to join
>>>>> it with the hive table to get the total price (price * quantity) got the
>>>>> current item.
>>>>>
>>>>> After this we want to create window aggregate on quantity*price
>>>>> windowed on timestamp (event time attribute).
>>>>>
>>>>> In any way we formulate this query we hit the following error:
>>>>> org.apache.flink.table.api.TableException: Rowtime attributes must not
>>>>> be in the input rows of a regular join. As a workaround you can cast the
>>>>> time attributes of input tables to TIMESTAMP before.
>>>>>
>>>>>  I understand that the problem is that we cannot join with the Hive
>>>>> table and still maintain the watermark/even time column. But why is this?
>>>>>
>>>>> In datastream world I would just simply assign Max watermark to my
>>>>> enrichment input and join outputs will get the ts of the input record. Can
>>>>> I achieve something similar in SQL/Table api?
>>>>>
>>>>> Thank you!
>>>>> Gyula
>>>>>
>>>>>


Re: Joining table with row attribute against an enrichment table

2020-04-20 Thread Kurt Young
The reason here is Flink doesn't know the hive table is static. After you
create these two tables and
trying to join them, Flink will assume both table will be changing with
time.

Best,
Kurt


On Mon, Apr 20, 2020 at 9:48 PM Gyula Fóra  wrote:

> Hi!
>
> The problem here is that I dont have a temporal table.
>
> I have a regular stream from kafka (with even time attribute) and a static
> table in hive.
> The Hive table is static, it doesn't change. It doesn't have any time
> attribute, it's not temporal.
>
> Gyula
>
> On Mon, Apr 20, 2020 at 3:43 PM godfrey he  wrote:
>
>> Hi Gyual,
>>
>> Can you convert the regular join to lookup join (temporal join) [1],
>> and then you can use window aggregate.
>>
>> >  I understand that the problem is that we cannot join with the Hive
>> table and still maintain the watermark/even time column. But why is this?
>> Regular join can't maintain the time attribute as increasing trend (one
>> record may be joined with a very old record),
>> that means the watermark does not also been guaranteed to increase.
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table
>>
>> Best,
>> Godfrey
>>
>> Gyula Fóra  于2020年4月20日周一 下午4:46写道:
>>
>>> Hi All!
>>>
>>> We hit a the following problem with SQL and trying to understand if
>>> there is a valid workaround.
>>>
>>> We have 2 tables:
>>>
>>> *Kafka*
>>> timestamp (ROWTIME)
>>> item
>>> quantity
>>>
>>> *Hive*
>>> item
>>> price
>>>
>>> So we basically have incoming (ts, id, quantity) and we want to join it
>>> with the hive table to get the total price (price * quantity) got the
>>> current item.
>>>
>>> After this we want to create window aggregate on quantity*price windowed
>>> on timestamp (event time attribute).
>>>
>>> In any way we formulate this query we hit the following error:
>>> org.apache.flink.table.api.TableException: Rowtime attributes must not
>>> be in the input rows of a regular join. As a workaround you can cast the
>>> time attributes of input tables to TIMESTAMP before.
>>>
>>>  I understand that the problem is that we cannot join with the Hive
>>> table and still maintain the watermark/even time column. But why is this?
>>>
>>> In datastream world I would just simply assign Max watermark to my
>>> enrichment input and join outputs will get the ts of the input record. Can
>>> I achieve something similar in SQL/Table api?
>>>
>>> Thank you!
>>> Gyula
>>>
>>>


Re: Blink SQL java.lang.ArrayIndexOutOfBoundsException

2020-04-20 Thread Kurt Young
Can you reproduce this in a local program with mini-cluster?

Best,
Kurt


On Mon, Apr 20, 2020 at 8:07 PM Zahid Rahman  wrote:

> You can read this for this type error.
>
>
> https://stackoverflow.com/questions/28189446/i-always-get-this-error-exception-in-thread-main-java-lang-arrayindexoutofbou#comment44747327_28189446
>
> I would suggest you set break points  in your code. Step through the code,
> this  method should show you which array variable is being passed a null
> argument when the array variable is not null able.
>
>
>
>
> On Mon, 20 Apr 2020, 10:07 刘建刚,  wrote:
>
>>I am using Roaring64NavigableMap to compute uv. It is ok to us
>> flink planner and not ok with blink planner. The SQL is as following:
>>
>> SELECT toLong(TUMBLE_START(eventTime, interval '1' minute)) as curTimestamp, 
>> A, B, C, D,
>> E, uv(bitmap(id)) as bmp
>> FROM person
>> GROUP BY TUMBLE(eventTime, interval '1' minute), A, B, C, D, E
>>
>>
>>   The udf is as following:
>>
>> public static class Bitmap extends AggregateFunction> Roaring64NavigableMap> {
>>@Override
>>public Roaring64NavigableMap createAccumulator() {
>>   return new Roaring64NavigableMap();
>>}
>>
>>@Override
>>public Roaring64NavigableMap getValue(Roaring64NavigableMap accumulator) {
>>   return accumulator;
>>}
>>
>>public void accumulate(Roaring64NavigableMap bitmap, long id) {
>>   bitmap.add(id);
>>}
>> }
>>
>> public static class UV extends ScalarFunction {
>>public long eval(Roaring64NavigableMap bitmap) {
>>   return bitmap.getLongCardinality();
>>}
>> }
>>
>>   The error is as following:
>>
>> 2020-04-20 16:37:13,868 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>  [flink-akka.actor.default-dispatcher-40]  -
>> GroupWindowAggregate(groupBy=[brand, platform, channel, versionName,
>> appMajorVersion], window=[TumblingGroupWindow('w$, eventTime, 6)],
>> properties=[w$start, w$end, w$rowtime, w$proctime], select=[brand,
>> platform, channel, versionName, appMajorVersion, bitmap(id) AS $f5,
>> start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
>> proctime('w$) AS w$proctime]) -> Calc(select=[toLong(w$start) AS
>> curTimestamp, brand, platform, channel, versionName, appMajorVersion,
>> uv($f5) AS bmp]) -> SinkConversionToTuple2 -> (Flat Map, Flat Map -> Sink:
>> Unnamed) (321/480) (8eb918b493ea26e2bb60f8307347dc1a) switched from RUNNING
>> to FAILED.
>> java.lang.ArrayIndexOutOfBoundsException: -1
>>   at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
>>   at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
>>   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
>>   at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
>>   at
>> org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:62)
>>   at
>> org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:37)
>>   at
>> org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copyBaseRow(BaseRowSerializer.java:150)
>>   at
>> org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:117)
>>   at
>> org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:50)
>>   at
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:297)
>>   at
>> org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:244)
>>   at
>> org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:138)
>>   at
>> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
>>   at
>> org.apache.flink.table.runtime.operators.window.WindowOperator.processElement(WindowOperator.java:337)
>>   at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:204)
>>   at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:196)
>>   at
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
>>   at
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
>>   at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
>>   at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>>   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>>   at 

Re: how to send back result via job manager to client

2020-04-19 Thread Kurt Young
可以看下这个jira:https://issues.apache.org/jira/browse/FLINK-14807

Best,
Kurt


On Mon, Apr 20, 2020 at 7:07 AM Eleanore Jin  wrote:

> Hi,
> 刚刚读到一篇关于Flink 在OLAP 上的使用案例 (
> https://ververica.cn/developers/olap-engine-performance-optimization-and-application-cases/),
> 其中一点提到了:
> [image: image.png]
> 这部分优化,源于 OLAP 的一个特性:OLAP 会将最终计算结果发给客户端,通过JobManager 转发给 Client。
>
> 想请问一下这一点是如何实现的: 通过JobManager 把结果转发给Client.
>
> 谢谢!
> Eleanore
>


Re: Flink Weekly | 每周社区动态更新 - 2020/04/18

2020-04-18 Thread Kurt Young
感谢整理!

Best,
Kurt


On Sat, Apr 18, 2020 at 9:43 PM 王雷  wrote:

> 大家好,本文为 Flink Weekly 的第十三期,由王雷整理,主要内容包括:近期社区开发进展,邮件问题答疑以及 Flink
> 最新社区动态及技术文章推荐。
>
>
>
>
> Flink 开发进展
>
> ■ [Releases] Tzu-Li (Gordon) Tai 发布了 Apache Flink Stateful Functions 2.0.0。
>
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ANNOUNCE-Apache-Flink-Stateful-Functions-2-0-0-released-td34121.html
>
>
>
>
> ■ [Releases] Yu Li 发起了关于发布 Flink 1.10.1 版本的讨论,即将发布的 1.10.1 版本还有1个
> Blocker。预计下周会有一个 RC 版本。
>
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Releasing-Flink-1-10-1-td38689.html
>
>
>
>
> ■ [Releases] 1.9.3 版本所有的 blocker issues 都已经被解决,Dian Fu 正在准备发布第一个候选版本。
>
> [3]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Releasing-Flink-1-9-3-td40086.html
>
>
>
>
> ■ [SQL] 在 FLIP-84 重构 TableEnvironment 和 Table 方案中,遗漏了提交 DQL
> 任务的场景,godfreyhe 重新发起了 FLIP-84 的投票,已通过。
>
> [4]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-84-Improve-amp-Refactor-API-of-TableEnvironment-amp-Table-td39543.html
>
>
>
>
> ■ [SQL] FLIP-122 关于在新的 TableFactory 中使用新的 connector 属性的投票已通过。
>
> [5]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/RESULT-VOTE-FLIP-122-New-Connector-Property-Keys-for-New-Factory-td39935.html
>
>
>
>
> ■ [SQL] FLIP-110 关于支持在创建表的语句中增加 like 语法的投票已通过。
>
> [6]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-110-Support-LIKE-clause-in-CREATE-TABLE-td39554.html
>
>
>
>
> ■ [Python] FLIP-121 关于支持 Cython 优化 Python UDF 的投票已通过。
>
> [7]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-121-Support-Cython-Optimizing-Python-User-Defined-Function-td39577.html
>
>
>
>
> ■ [Runtime] FLIP-119 关于对调度策略优化的投票通过,优化主要集中在运行批处理作业时避免资源死锁等。
>
> [8]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-119-Pipelined-Region-Scheduling-td39585.html
>
>
>
>
> 邮件问题答疑
>
> ■ guanyq 在社区提问如何查询 flink job 的异常日志。田志声推荐使用 log agent (比如 filebeat)
> 统一采集作业的日志,然后收集到 ElasticSearch 查看。
>
> [9]http://apache-flink.147419.n8.nabble.com/flink-td2378.html
>
>
>
>
> ■ chanamper 希望能在 Java Api 中使用 LocalGlobal 的聚合优化方法,Congxian Qiu
> 予以了回答,DataStream API 暂时没有 local aggregation 的功能,可以通过给 key 拼前缀或者后缀来达到类似的效果。
>
> [10]http://apache-flink.147419.n8.nabble.com/Flink-keyby-td2309.html
>
>
>
>
> ■ 111 发现 flink 中的 calcite 依赖 guava 16 以上的版本,hbase-connector 模块依赖 guava
> 12.0 版本,经过 shade 后,作业运行正常,但是在 IDEA 运行单元测试会冲突。目前只能通过 mvn test
> 来运行单元测试,或者把单元测试改为连接远程 Hbase 的方法来解决。
>
> [11]
> http://apache-flink.147419.n8.nabble.com/Flink1-10-0-flink-hbase-guava-td2385.html
>
>
>
>
> ■ KristoffSC 在社区提问在用 RocksDB 作为 StateBackend 的情况下,少量的 MapState,每个 state
> 的大小很大,和很多的 ValueState,每个 state 的大小很小相比,哪种性能更高。Congxian Qiu
> 认为这两者没有区别,并给予了详细的解析。
>
> [12]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-size-Vs-keys-number-perfromance-td34135.html
>
>
>
>
> ■ 111 在使用 TopN 语句时遇到了 ”This calc has no
>
> useful projection and no filter. It should be removed by CalcRemoveRule”
> 的问题,Jark 认为这是由于 codegen bug 导致。
>
> [13]
> http://apache-flink.147419.n8.nabble.com/Flink-SQL-1-10-ROW-NUMBER-td2355.html
>
>
>
>
> ■ Dongwon Kim 在社区里提问关于在 NullAwareMapIterator 中抛出 NPE 的问题,Jark Wu 认为这是由于
> HeapMapState iterator 的一处 bug 导致返回空的 iterator所致。
>
> [14]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/NPE-from-NullAwareMapIterator-in-flink-table-runtime-blink-td34083.html
>
>
>
>
> ■ Krzysztof Zarzycki 希望能够用 Flink SQL 实现动态修改 job
> 的拓扑图,以动态的增删业务的处理分支。该功能目前还不支持,大家对该功能的实现进行了讨论。
>
> [15]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-Flink-SQL-td33853.html
>
>
>
>
> ■ Salva Alcántara 在 snapshotState 方法中对 keyed state 进行了 clear 处理。job
> 启动后,没有一条数据进入 input streams 时,触发 checkpoint 会报 NPE。Yun Tang 对该问题进行了回复,讲述了
> keyed state 和 operator state 的区别,并根据 Salva Alcántara 的业务逻辑推荐他使用 operator
> state。
>
> [16]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Using-MapState-clear-put-methods-in-snapshotState-within-KeyedCoProcessFunction-valid-or-not-td31353.html
>
>
>
>
> ■ Aaron Levin 遇到了在 RocksDB 中存储含有百万个元素的 ListState
> 的应用场景,担心在这种场景下会遇到一些问题。Seth Wiesman 回复说 RocksDB's JNI bridge 不支持超过 2GB 的
> ListState。Aljoscha Krettek 提供了另外的思路,可以把数据分成多个key,分散处理。
>
> [17]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ListState-with-millions-of-elements-td34153.html
>
>
>
>
> ■ Robin Cassan 遇到了 checkpoint 因超时失败时,接下来的 checkpoint 出现雪球效应的问题。Congxian
> Qiu 回答说非对齐的 checkpoint 可以解决该问题,但该方案还未实现。
>
> [18]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Making-job-fail-on-Checkpoint-Expired-td34051.html
>
>
>
>
> ■ Gyula Fóra 遇到了 source 表含有 nullable 字段,将 null 数据过滤掉,写入对应字段类型 STRING NOT
> NULL 的 sink 表时,报类型不兼容的错误。Timo Walther 告知 type system 仍然在完善中,该问题是已知问题。
>
> [19]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Inserting-nullable-data-into-NOT-NULL-columns-td34198.html
>
>
>
>
> ■ forideal 在一个任务中提交了三个 SQL,使用的同一个 source 表。Flink 

Re: 【反馈收集】在 1.11 版本中将 blink planner 作为默认的 planner

2020-04-03 Thread Kurt Young
你好,这个是预期中的。在新的类型系统下,我们将使用 LocalDateTime 作为 TIMESTAMP 类型的默认对象。
同时我们还禁用了 long 和 TIMESTAMP 对象的直接互转。
具体的原因和细节可以看:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System

Best,
Kurt


On Fri, Apr 3, 2020 at 4:58 PM 1193216154 <1193216...@qq.com> wrote:

>
> 你好,最近改成blinkplanner发现了两个问题。及时两者生成的proctime的时间类型不同,一个是TimeStamp,一个是LocalDateTime。
>
>
> org.apache.flink.table.dataformat.DataFormatConverters中TimestampConverter 的
>
> toInternalImpl方法只支持TimeStamp的参数,而我遇见的情况是传进了long类型,导致类转换异常,如果能重载toInternalImpl方法加一个long,或许可以解决我的问题
> --原始邮件--
> 发件人:"Kurt Young" 发送时间:2020年4月1日(星期三) 上午9:22
> 收件人:"user-zh"
> 主题:【反馈收集】在 1.11 版本中将 blink planner 作为默认的 planner
>
>
>
> 大家好,
>
> 正如大家所知,Blink planner 是 Flink 1.9 版本中引入的一个全新的 Table API 和 SQL 的翻译优化
> 器,并且我们已经在 1.10 版本中将其作为 SQL CLI 的默认 planner。由于社区很早就已经决定不再
> 针对老的优化器去增添任何新的功能,所以从功能和性能上来说,老的 flink planner 已经缺了很多
> 现在社区最新的 SQL 相关的功能,比如新的类型系统,更多的DDL的支持,以及即将在 1.11 发布
> 的新的 TableSource 和 TableSink 接口和随之而来的对 Binlog 类日志的解析。
>
> 因此我们打算尝试在接下来这个 1.11 版本发布时将 blink planner 升级为默认的 planner。但在此之
> 前,我们希望听听您的反馈。尤其是一些使用过程的体验,和您观察到的或者经历过的 blink planner
> 做不了的事情从而导致您退回了老的 flink planner 的原因,可能是两边功能不对等,或者 blink planner
> 有什么新的 bug 而老的 planner 没有。我们距离 1.11 的需求冻结还有差不多一个月的时间,收到
> 您的反馈之后,我们有足够的时间进行修复和完善。
>
> 希望听到您宝贵的声音和意见,谢谢。
>
> Best,
> Kurt


Re: flink 1.10 createTemporaryTable丢失proctime问题

2020-04-02 Thread Kurt Young
看起来你是踩到了这个bug:https://issues.apache.org/jira/browse/FLINK-16160
在这个bug修复前,先继续用老的API吧

Best,
Kurt


On Thu, Apr 2, 2020 at 10:34 AM deadwind4  wrote:

> registerTableSource 被标记了@Deprecated 在flink
> 1.10,我这种情况是继续沿用过期的API(registerTableSource)吗?
>
>
>  原始邮件
> 发件人: deadwind4
> 收件人: user-zh
> 发送时间: 2020年4月2日(周四) 10:30
> 主题: Re: flink 1.10 createTemporaryTable丢失proctime问题
>
>
> 修改前
> tEnv.connect().withFormat().withSchema(
> xxx.proctime()
> ).registerTableSource(“foo”);
>
>
> 修改后
> tEnv.connect().withFormat().withSchema(
> xxx.proctime()
> ).createTemporaryTable(“foo”);
>
>
> 修改后.proctime()就失效了,所以我proctime window也用不了了。
>
>
>  原始邮件
> 发件人: deadwind4
> 收件人: user-zh
> 发送时间: 2020年4月2日(周四) 10:22
> 主题: Re: flink 1.10 createTemporaryTable丢失proctime问题
>
>
> tEnv.connect().withFormat().withSchema().registerTableSource(“foo”);
> tEnv.connect().withFormat().withSchema().createTemporaryTable(“foo”);
>
>
>  原始邮件
> 发件人: Jark Wu
> 收件人: user-zh
> 发送时间: 2020年4月2日(周四) 10:18
> 主题: Re: flink 1.10 createTemporaryTable丢失proctime问题
>
>
> Hi, 你能描述下你的改动前后的代码吗?据我所知 TableEnvironment 上没有 createTemporaryTable
> 方法,只有createTemporaryView方法,而且 registerTableSource 和 createTemporaryView
> 的参数是不一样的。 Best, Jark > 2020年4月1日 23:13,deadwind4 
> 写道: > > 我其实是想用processing time window 但是我把过期的API
> registerTableSource换成createTemporaryTable,proctime就不起作用了。这种情况我应该如何使用呢?谢谢您,叨扰了。
> > > > 原始邮件 > 发件人: Jark Wu > 收件人: user-zh<
> user-zh@flink.apache.org> > 发送时间: 2020年4月1日(周三) 21:37 > 主题: Re: flink
> 1.10 createTemporaryTable丢失proctime问题 > > > Hi, proctime 的含义是机器时间,不等价于
> now()或者 current_timestamp() 函数,该字段只有在真正使用的才会物化(即去取
> System.currentTimeMillis)。 能请描述下你想用 createTemporaryTable
> 做什么呢?当前哪里不能满足你的需求呢? Best, Jark On Wed, 1 Apr 2020 at 18:56, deadwind4 <
> deadwi...@outlook.com> wrote: > >
> 我使用1.10版本的createTemporaryTable发现proctime字段全是null但是换成过时的registerTableSource就可以。
> > 如果我想使用createTemporaryTable该怎么办。 >
> 并且我debug了createTemporaryTable的源码没有发现对proctime的处理。


【反馈收集】在 1.11 版本中将 blink planner 作为默认的 planner

2020-03-31 Thread Kurt Young
大家好,

正如大家所知,Blink planner 是 Flink 1.9 版本中引入的一个全新的 Table API 和 SQL 的翻译优化
器,并且我们已经在 1.10 版本中将其作为 SQL CLI 的默认 planner。由于社区很早就已经决定不再
针对老的优化器去增添任何新的功能,所以从功能和性能上来说,老的 flink planner 已经缺了很多
现在社区最新的 SQL 相关的功能,比如新的类型系统,更多的DDL的支持,以及即将在 1.11 发布
的新的 TableSource 和 TableSink 接口和随之而来的对 Binlog 类日志的解析。

因此我们打算尝试在接下来这个 1.11 版本发布时将 blink planner 升级为默认的 planner。但在此之
前,我们希望听听您的反馈。尤其是一些使用过程的体验,和您观察到的或者经历过的 blink planner
做不了的事情从而导致您退回了老的 flink planner 的原因,可能是两边功能不对等,或者 blink planner
有什么新的 bug 而老的 planner 没有。我们距离 1.11 的需求冻结还有差不多一个月的时间,收到
您的反馈之后,我们有足够的时间进行修复和完善。

希望听到您宝贵的声音和意见,谢谢。

Best,
Kurt


[DISCUSS] Change default planner to blink planner in 1.11

2020-03-31 Thread Kurt Young
Hi Dev and User,

Blink planner for Table API & SQL is introduced in Flink 1.9 and already be
the default planner for
SQL client in Flink 1.10. And since we already decided not introducing any
new features to the
original Flink planner, it already lacked of so many great features that
the community has been working on, such as brand new type system, more DDL
support and more planner capabilities.
During this time, we've also received lots of great feedback from users who
were trying to use blink
planner, both positive and negative (like bugs). This is a good sign, it at
least shows more and more
users are starting to try out.

So I want to start this discussion more formally to talk about
replacing the default planner to blink.
Specifically, I want to gather feedbacks from dev and user about whether
blink planner already
cover the original planner's capabilities, what kind of issues you've ran
into when try out blink
planner and then make you fallback to original one. Since there is still a
month to go when feature
freeze, there's still enough time for community to further enhance blink
planner for this purpose.

Let me know what you think, especially if you want to report or complain
about something. Thanks
in advance.

Best,
Kurt


Re: [External] Re: From Kafka Stream to Flink

2020-03-28 Thread Kurt Young
I think this requirement can be satisfied by temporal table function [1],
am I missing anything?

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html#temporal-table-function

Best,
Kurt


On Sat, Mar 28, 2020 at 2:47 PM Maatary Okouya 
wrote:

> Hi all,
>
> Just wondering what is the status at this point?
>
> On Thu, Sep 19, 2019 at 4:38 PM Hequn Cheng  wrote:
>
>> Hi,
>>
>> Fabian is totally right. Big thanks to the detailed answers and nice
>> examples above.
>>
>> As for the PR, very sorry about the delay. It is mainly because of the
>> merge of blink and my work switching to Flink Python recently.
>> However, I think the later version of blink would cover this feature
>> natively with further merges.
>>
>> Before that, I think we can use the solution Fabian provided above.
>>
>> There are some examples here[1][2] which may be helpful to you
>> @Casado @Maatary.
>> In [1], the test case quite matches your scenario(perform join after
>> groupby+last_value). It also provides the udaf what you want and shows how
>> to register it.
>> In [2], the test shows how to use the built-in last_value in SQL. Note
>> that the built-in last_value UDAF is only supported in blink-planner from
>> flink-1.9.0. If you are using the flink-planner(or version before that),
>> you can register the last_value UDAF with the TableEnvironment like it is
>> showed in [1].
>>
>> Feel free to ask if there are other problems.
>>
>> Best, Hequn
>> [1]
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.scala#L207
>> [2]
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala#L228
>>
>> On Thu, Sep 19, 2019 at 9:40 PM Casado Tejedor, Rubén <
>> ruben.casado.teje...@accenture.com> wrote:
>>
>>> Thanks Fabian. @Hequn Cheng  Could you share the
>>> status? Thanks for your amazing work!
>>>
>>>
>>>
>>> *De: *Fabian Hueske 
>>> *Fecha: *viernes, 16 de agosto de 2019, 9:30
>>> *Para: *"Casado Tejedor, Rubén" 
>>> *CC: *Maatary Okouya , miki haiat <
>>> miko5...@gmail.com>, user , Hequn Cheng <
>>> chenghe...@gmail.com>
>>> *Asunto: *Re: [External] Re: From Kafka Stream to Flink
>>>
>>>
>>>
>>> Hi Ruben,
>>>
>>>
>>>
>>> Work on this feature has already started [1], but stalled a bit
>>> (probably due to the effort of merging the new Blink query processor).
>>>
>>> Hequn (in CC) is the guy working on upsert table ingestion, maybe he can
>>> share what the status of this feature is.
>>>
>>>
>>>
>>> Best, Fabian
>>>
>>>
>>>
>>> [1] https://github.com/apache/flink/pull/6787
>>> 
>>>
>>>
>>>
>>> Am Di., 13. Aug. 2019 um 11:05 Uhr schrieb Casado Tejedor, Rubén <
>>> ruben.casado.teje...@accenture.com>:
>>>
>>> Hi
>>>
>>>
>>>
>>> Do you have an expected version of Flink to include the capability to
>>> ingest an upsert stream as a dynamic table? We have such need in our
>>> current project. What we have done is to emulate such behavior working at
>>> low level with states (e.g. update existing value if key exists, create a
>>> new value if key does not exist). But we cannot use SQL that would help as
>>> to do it faster.
>>>
>>>
>>>
>>> Our use case is many small flink jobs that have to something like:
>>>
>>>
>>>
>>> SELECT *some fields*
>>>
>>> FROM *t1* INNER JOIN *t1 on t1.id
>>> 
>>> = t2.id
>>> 
>>> (maybe join +3 tables)*
>>>
>>> WHERE *some conditions on fields*;
>>>
>>>
>>>
>>> We need the result of that queries taking into account only the last
>>> values of each row. The result is inserted/updated in a in-memory K-V
>>> database for fast access.
>>>
>>>
>>>
>>> Thanks in advance!
>>>
>>>
>>>
>>> Best
>>>
>>>
>>>
>>> *De: *Fabian Hueske 
>>> *Fecha: *miércoles, 7 de agosto de 2019, 11:08
>>> *Para: *Maatary Okouya 
>>> *CC: *miki haiat , user 
>>> *Asunto: *[External] Re: From Kafka Stream to Flink
>>>
>>>
>>>
>>> This message is from an EXTERNAL SENDER - be CAUTIOUS, particularly with
>>> links and attachments.
>>> --
>>>
>>>
>>>

Re: flink 安装包的几个 jar 是怎么 build 出来的

2020-03-26 Thread Kurt Young
flink-table-uber-blink 应该是这个module,它负责打出 blink planner 的胖包(uber jar)

Best,
Kurt


On Thu, Mar 26, 2020 at 5:54 PM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

>
> 单机版 下载 tgz 解压,lib 目录下有好几个 jar, 比如 flink-table-blink_2.12-1.10.0.jar
> 这个 jar 是从哪里 build 出来的呢?
>
> 我 clone github 上的源代码,mvn clean package
> 我以为 flink-table/flink-table-planner-blink 目录下build 出的
> flink-table-planner-blink_2.11-1.10.0.jar 跟发布 tgz 里的
>  flink-table-blink_2.12-1.10.0.jar  是对应的
> 我直接替换到安装目录下,start-cluster.sh 还可以,但 ./bin/sql-client.sh embedded 就出错了。
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
>


Re: 关于 SQL DATE_FORMAT 的时区设置的构想

2020-03-25 Thread Kurt Young
我们先改成 timestamp with local zone,如果这个字段的类型在整个query里都没变过,那个 with time
zone的效果也差不多了。

Best,
Kurt


On Wed, Mar 25, 2020 at 8:43 PM Zhenghua Gao  wrote:

> Hi Jark,
>
> 这里的确是有问题的。
> 目前的问题是Calcite本身并不支持TIMESTAMP WITH TIME ZONE.
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Tue, Mar 24, 2020 at 11:00 PM Jark Wu  wrote:
>
> > Thanks for reporting this Weike.
> >
> > 首先,我觉得目前 Flink 返回 TIMESTAMP WITHOUT TIME ZONE 应该是有问题的。
> > 因为 SQL 标准(SQL:2011 Part 2 Section 6.32)定义了返回类型是 WITH TIME ZONE。
> > 另外 Calcite 文档中 [1] 也说返回的是 TIMESTAMP WITH TIME ZONE (虽然好像和实现不一致)
> > 其他的一些数据库也都差不多:mysql [2], oracle[3]
> >
> > Best,
> > Jark
> >
> > [1]: https://calcite.apache.org/docs/reference.html#datetime-functions
> > [2]:
> >
> >
> https://dev.mysql.com/doc/refman/8.0/en/date-and-time-functions.html#function_current-timestamp
> > [3]:
> >
> >
> https://docs.oracle.com/cd/B28359_01/server.111/b28286/functions038.htm#SQLRF00629
> >
> >
> >
> > On Tue, 24 Mar 2020 at 17:00, DONG, Weike 
> wrote:
> >
> > > Hi Zhenghua,
> > >
> > > 感谢您的回复。感觉既然 Flink 已经提供了时区的设定,这方面也许可以进一步增强一些。CONVERT_TZ
> > > 用户很容易忘记或者漏掉,这里还是有不少完善的空间。
> > >
> > > Best,
> > > Weike
> > >
> > > On Tue, Mar 24, 2020 at 4:20 PM Zhenghua Gao  wrote:
> > >
> > > > CURRENT_TIMESTAMP 返回值类型是 TIMESTAMP (WITHOUT TIME ZONE),
> > > > 其语义可参考 java.time.LocalDateTime。
> > > > 其字符形式的表示并不随着时区变化而变化(如你所见,和UTC+0 一致)。
> > > >
> > > > 你的需求可以通过 CONVERT_TZ(timestamp_string, time_zone_from_string,
> > > > time_zone_to_string)
> > > >
> > > > *Best Regards,*
> > > > *Zhenghua Gao*
> > > >
> > > >
> > > > On Mon, Mar 23, 2020 at 10:12 PM DONG, Weike <
> kyled...@connect.hku.hk>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > 近期发现 Flink 的 Blink Planner 在 DATE_FORMAT 对 CURRENT_TIMESTAMP
> > > > > 做时间格式化为字符串时,默认以 UTC+0 为准。
> > > > >
> > > > > 长期以来,TableConfig 类里面有一个 setLocalTimeZone
> 方法;将其设置为东八区以后,发现格式化后的字符串仍然是
> > > > UTC+0
> > > > > 的。而深入来看,Flink 的时间格式化时的代码生成逻辑(time.scala)并未考虑时区的设置。
> > > > >
> > > > > 由于大多数用户的时区均不是 UTC+0(GMT、UTC),如果时间格式化、显示等都可以考虑到 TableConfig
> 中的时区设置,那么
> > > > Flink
> > > > > 是否会更用户友好一些呢?当然这个会涉及到不兼容的变更,需要谨慎一些。
> > > > >
> > > > > 也许提供一个 DATE_FORMAT_WITH_TIMEZONE 的内置函数,社区是否会更容易接受一些呢?
> > > > >
> > > > > 仅仅是个人一点想法,感谢 :)
> > > > >
> > > >
> > >
> >
>


Re: 关于flink sql 1.10 source并行度自动推断的疑问

2020-03-24 Thread Kurt Young
你的数据量有多大?有一个可能的原因是source的其他并发调度起来的时候,数据已经被先调度起来的并发读完了。

Best,
Kurt


On Tue, Mar 24, 2020 at 10:39 PM Chief  wrote:

> hi all:
> 之前用flink sql查询hive的数据,hive的数据文件是150个,sql
> client配置文件设置的并行度是10,source通过自动推断生成了150并发,但是通过看web
> ui发现只有前十个子任务是读到数据了,其他的任务显示没有读到数据,请问是我设置有问题吗?


Re: Flink SQL1.10 大表join如何优化?

2020-03-21 Thread Kurt Young
你的plan里除了source之外,其他所有节点都是在单并发运行,这对两张1000多万的表join来说是不够的,你可以尝试加大并发。

Best,
Kurt


On Sat, Mar 21, 2020 at 1:30 PM 111  wrote:

> Hi:
> 看了下源代码,了解了下Hybrid hash join。大致了解了瓶颈点:
> Hybrid hash
> join,会把build表(也就是我的右表)通过hash映射成map,并按照某种规则进行分区存储(有的在内存,超过的放入磁盘)。
> 目前看磁盘上的那部分join应该是整个任务的瓶颈。
> 具体调优方法,还在探索中...也许有什么配置可以控制build表内存存储的大小.
> 在2020年03月21日 11:01,111 写道:
> Hi, wu:
> 好的,我这边观察下gc情况。
> 另外,我的sql里面有关联条件的,只是第一个表1400多万条,第二张表1000多万条。
> | select
>
>
>   wte.external_user_id,
>
>   wte.union_id,
>
>   mr.fk_member_id as member_id
>
> from a wte
>
> left join b mr
>
>  on wte.union_id = mr.relation_code
>
> where wte.ods_date = '${today}'
>
> limit 10;
>
> |
> 我在ui里面可以看到任务也在正常运行,只是每秒输入700条左右,每秒输出1700,所以对比总量来说十分缓慢。
>
>
> 目前不太清楚性能的瓶颈点和优化的方向:
> 1
> 网络传输太慢,导致两表不能及时join?这里不知道如何排查,Metrics里面有个netty的相关指标,看不出什么;其他的指标除了hashjoin
> in和out缓慢变化,其他的都没有什么变化。
> 2 并行度过低,导致单点slot需要执行两个千万级表的关联?可否动态修改或者配置probe表的并行度?
> 3 JVM内存问题?详情见附件,观察内存还是很充足的,貌似垃圾回收有点频繁,是否有必要修改jvm配置?
> 4 taskmanager的日志不太理解….到build phase就停住了,是日志卡主了 还是 此时正在进行build的网络传输?
> |
> 2020-03-21 09:23:14,732 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:14,738 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:14,744 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:14,750 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:14,756 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:14,762 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:14,772 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:14,779 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 4 ms for 32768 segments
> 2020-03-21 09:23:16,357 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 14 ms for 65536 segments
> 2020-03-21 09:23:16,453 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 10 ms for 65536 segments
> 2020-03-21 09:23:16,478 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 9 ms for 65536 segments
> 2020-03-21 09:23:16,494 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 9 ms for 65536 segments
> 2020-03-21 09:23:16,509 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 10 ms for 65536 segments
> 2020-03-21 09:23:16,522 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 9 ms for 65536 segments
> 2020-03-21 09:23:16,539 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 9 ms for 65536 segments
> 2020-03-21 09:23:16,554 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 10 ms for 65536 segments
> 2020-03-21 09:23:16,574 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 9 ms for 65536 segments
> 2020-03-21 09:23:16,598 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 9 ms for 65536 segments
> 2020-03-21 09:23:16,611 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 10 ms for 65536 segments
> 2020-03-21 09:23:20,157 INFO
> org.apache.flink.table.runtime.hashtable.BinaryHashBucketArea  - The rehash
> take 213 ms for 131072 segments
> 2020-03-21 09:23:21,579 INFO
> org.apache.flink.table.runtime.operators.join.HashJoinOperator  - Finish
> build phase.
> |
>
>
>
>
> 在2020年03月21日 10:31,Jark Wu 写道:
> Hi,
>
> 看起来你的 join 没有等值关联条件,导致只能单并发运行。你可以观察下这个 join 节点的 gc 情况,看看是不是 full gc 导致运行缓慢。
> 关于 batch join,Jingsong 比我更熟悉一些调优手段,也许他能提供一些思路,cc @Jingsong Li
> 
>
> Best,
> Jark
>
> On Fri, 20 Mar 2020 at 17:56, 111  wrote:
>
>
>
> 图片好像挂了:
>
>
>
>
> https://picabstract-preview-ftn.weiyun.com/ftn_pic_abs_v3/93a8ac1299f8edd31aa93d69bd591dcc5b768e2c6f2d7a32ff3ac244040b1cac3e8afffd0daf92c4703c276fa1202361?pictype=scale=30113=3.3.3.3=23603357=F74D73D5-810B-4AE7-888C-E65BF787E490.png=750
>
>
> 在2020年03月20日 17:52,111 写道:
> 您好:
> 我有两张表数据量都是1000多万条,需要针对两张表做join。
> 提交任务后,发现join十分缓慢,请问有什么调优的思路?
> 需要调整managed memory吗?
>
> 目前每个TaskManager申请的总内存是2g,每个taskManager上面有4个slot。taskmanager的metrics如下:
> | {
> "id":"container_e40_1555496777286_675191_01_000107",
> "path":"akka.tcp://flink@hnode9:33156/user/taskmanager_0",
> "dataPort":39423,
> "timeSinceLastHeartbeat":1584697728127,
> "slotsNumber":4,
> "freeSlots":3,
> "hardware":{
> "cpuCores":32,
> "physicalMemory":135355260928,
> "freeMemory":749731840,
> 

Re: Re: flink sql-client read hive orc table no result

2020-03-18 Thread Kurt Young
also try to remove "transactional"='true'?

Best,
Kurt


On Wed, Mar 18, 2020 at 5:54 PM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

>
> Tried again. Even i  remove the "clustered by (robot_id) into 3 buckets"
>  statement, no result from flink sql-client
>
> Thanks,
> Lei
>
> --
> wangl...@geekplus.com.cn
>
> *From:* Kurt Young 
> *Date:* 2020-03-18 17:41
> *To:* wangl...@geekplus.com.cn; lirui 
> *CC:* user 
> *Subject:* Re: flink sql-client read hive orc table no result
> My guess is we haven't support hive bucket table yet.
> cc Rui Li for confirmation.
>
> Best,
> Kurt
>
>
> On Wed, Mar 18, 2020 at 5:19 PM wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
>> Hive table store as orc format:
>>  CREATE  TABLE `robot_tr`(`robot_id` int,  `robot_time` bigint,
>> `linear_velocity` double, `track_side_error` int)
>>  partitioned by (warehouseid STRING) clustered by (robot_id) into 3
>> buckets
>>  stored as orc tblproperties("transactional"='true');
>>
>> Under hive client,  insert into one record and then select there will be
>> the result to the console.
>>
>> But under flink sql-client, when select * from  robot_tr, there's no
>> result?
>>
>> Any insight on this?
>>
>> Thanks,
>> Lei
>>
>> --
>> wangl...@geekplus.com.cn
>>
>>


Re: flink sql-client read hive orc table no result

2020-03-18 Thread Kurt Young
My guess is we haven't support hive bucket table yet.
cc Rui Li for confirmation.

Best,
Kurt


On Wed, Mar 18, 2020 at 5:19 PM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

> Hive table store as orc format:
>  CREATE  TABLE `robot_tr`(`robot_id` int,  `robot_time` bigint,
> `linear_velocity` double, `track_side_error` int)
>  partitioned by (warehouseid STRING) clustered by (robot_id) into 3
> buckets
>  stored as orc tblproperties("transactional"='true');
>
> Under hive client,  insert into one record and then select there will be
> the result to the console.
>
> But under flink sql-client, when select * from  robot_tr, there's no
> result?
>
> Any insight on this?
>
> Thanks,
> Lei
>
> --
> wangl...@geekplus.com.cn
>
>


Re: sql关键字问题

2020-03-18 Thread Kurt Young
好像已经有了,应该是这个jira:
https://issues.apache.org/jira/browse/FLINK-16526

Best,
Kurt


On Wed, Mar 18, 2020 at 4:19 PM Jingsong Li  wrote:

> Hi lucas,
>
> 赞专业的分析,看起来是Flink的bug,你可以建个Jira来跟踪。
> CC: @Yuzhao Chen 
>
> Best,
> Jingsong Lee
>
> On Wed, Mar 18, 2020 at 4:15 PM lucas.wu  wrote:
>
> > 初步找到了原因
> > 原来我的建表语句用了computed_column_expression 这种语义。
> > 然后flink内部在使用的时候其实是把它转成了select 语句
> > ...
> > if (columnExprs.nonEmpty) {
> >  val fieldExprs = fieldNames
> >  .map { name =
> >  if (columnExprs.contains(name)) {
> >  columnExprs(name)
> >  } else {
> >  name
> >  }
> >  }.toArray
> >  val rexNodes =
> > toRexFactory.create(newRelTable.getRowType).convertToRexNodes(fieldExprs)
> > …..
> >
> >
> > 然后我们看看convertToRexNodes方法
> >
> >
> > public RexNode[] convertToRexNodes(String[] exprs) {
> > ….
> >  String query = String.format(QUERY_FORMAT, String.join(",", exprs));
> >  SqlNode parsed = planner.parser().parse(query);
> > }
> >
> >
> > 重点就在这个QUERY_FORMAT
> > private static final String QUERY_FORMAT = "SELECT %s FROM " +
> > TEMPORARY_TABLE_NAME;
> >
> >
> > 这样写是有问题的,当我的字段本身是有``的时候,就被去掉了,导致后面valid的时候就报错。
> >
> >
> > 所以这个是算flink的bug吗?
> > 原始邮件
> > 发件人:lucas.wulucas...@xiaoying.com
> > 收件人:user-zhuser...@flink.apache.org
> > 发送时间:2020年3月18日(周三) 15:36
> > 主题:sql关键字问题
> >
> >
> > create table `source_table`( `SeqNo` varchar, `Type` varchar, `Table`
> > varchar, `ServerId` varchar, `Database` varchar, `OldData` varchar,
> `GTID`
> > varchar, `Offset` varchar, `event_ts` as
> > to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'),
> > WATERMARK FOR event_ts AS event_ts - interval '60' second ) with(…) 查询语句
> > Select * from source_table; 这是我的建表和查询语句,发现即使用了`` 查询的时候还是会报Table是关键字的问题。
> SQL
> > parse failed. Encountered "Table" at line 1,column 19. 但是一旦我把 `event_ts`
> as
> > to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'),
> > WATERMARK FOR event_ts AS event_ts - interval '60’ second 这两行去掉
> > ,就正常了。是我的使用方法有问题吗?
>
>
>
> --
> Best, Jingsong Lee
>


Re: Issues with Watermark generation after join

2020-03-16 Thread Kurt Young
Hi, could you share the SQL you written for your original purpose, not the
one you attached ProcessFunction for debugging?

Best,
Kurt


On Tue, Mar 17, 2020 at 3:08 AM Dominik Wosiński  wrote:

> Actually, I just put this process function there for debugging purposes.
> My main goal is to join the E & C using the Temporal Table function, but I
> have observed exactly the same behavior i.e. when the parallelism was > 1
> there was no output and when I was setting it to 1 then the output was
> generated. So, I have switched to process function to see whether the
> watermarks are reaching this stage.
>
> Best Regards,
> Dom.
>
> pon., 16 mar 2020 o 19:46 Theo Diefenthal <
> theo.diefent...@scoop-software.de> napisał(a):
>
>> Hi Dominik,
>>
>> I had the same once with a custom processfunction. My processfunction
>> buffered the data for a while and then output it again. As the proces
>> function can do anything with the data (transforming, buffering,
>> aggregating...), I think it's just not safe for flink to reason about the
>> watermark of the output.
>>
>> I solved all my issues by calling `assignTimestampsAndWatermarks`
>> directly post to the (co-)process function.
>>
>> Best regards
>> Theo
>>
>> --
>> *Von: *"Dominik Wosiński" 
>> *An: *"user" 
>> *Gesendet: *Montag, 16. März 2020 16:55:18
>> *Betreff: *Issues with Watermark generation after join
>>
>> Hey,
>> I have noticed a weird behavior with a job that I am currently working
>> on. I have 4 different streams from Kafka, lets call them A, B, C and D.
>> Now the idea is that first I do SQL Join of A & B based on some field, then
>> I create append stream from Joined A, let's call it E. Then I need to
>> assign timestamps to E since it is a result of joining and Flink can't
>> figure out the timestamps.
>>
>> Next, I union E & C, to create some F stream. Then finally I connect E &
>> C using `keyBy` and CoProcessFunction. Now the issue I am facing is that if
>> I try to, it works fine if I enforce the parallelism of E to be 1 by
>> invoking *setParallelism*. But if parallelism is higher than 1, for the
>> same data - the watermark is not progressing correctly. I can see that 
>> *CoProcessFunction
>> *methods are invoked and that data is produced, but the Watermark is
>> never progressing for this function. What I can see is that watermark is
>> always equal to (0 - allowedOutOfOrderness). I can see that timestamps are
>> correctly extracted and when I add debug prints I can actually see that
>> Watermarks are generated for all streams, but for some reason, if the
>> parallelism is > 1 they will never progress up to connect function. Is
>> there anything that needs to be done after SQL joins that I don't know of
>> ??
>>
>> Best Regards,
>> Dom.
>>
>


Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-11 Thread Kurt Young
https://github.com/ververica/flink-sql-gateway  了解一下

Best,
Kurt


On Wed, Mar 11, 2020 at 9:26 PM zhisheng  wrote:

> hi, Kurt Young
>
> 除了使用 sql-client 可以使用纯 SQL 来执行,有没有其他的办法来执行?因为通常不让本地直接连到生产的环境,也不会直接在生产的机器执行
> sql-client
>
> Kurt Young  于2020年3月11日周三 下午7:59写道:
>
> > 那有可能是可以的,你可以试试看
> >
> > Best,
> > Kurt
> >
> >
> > On Wed, Mar 11, 2020 at 6:57 PM wangl...@geekplus.com.cn <
> > wangl...@geekplus.com.cn> wrote:
> >
> > > Hi Kurt,
> > >
> > > 如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从 state
> > > 中恢复的功能吗?
> > > 代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state
> > > 存储并且再次提交任务可以被访问到直接用吗?
> > >
> > > 谢谢,
> > > 王磊
> > >
> > > --
> > > wangl...@geekplus.com.cn
> > >
> > >
> > > *Sender:* Kurt Young 
> > > *Send Time:* 2020-03-11 12:54
> > > *Receiver:* wangl...@geekplus.com.cn
> > > *cc:* user-zh 
> > > *Subject:* Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
> > > sql client 目前还不支持这个功能。
> > >
> > > Best,
> > > Kurt
> > >
> > >
> > > On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn <
> > > wangl...@geekplus.com.cn> wrote:
> > >
> > >> Hi Kurt,
> > >> 确实是可以 直接 flink  cancel -s 保存状态。
> > >> 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢?
> > >>
> > >> 谢谢,
> > >> 王磊
> > >>
> > >>
> > >> *Sender:* Kurt Young 
> > >> *Send Time:* 2020-03-11 10:38
> > >> *Receiver:* user-zh 
> > >> *Subject:* Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
> > >> 理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。
> > >> 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。
> > >>
> > >> Best,
> > >> Kurt
> > >>
> > >>
> > >> On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn <
> > >> wangl...@geekplus.com.cn> wrote:
> > >>
> > >> > 有两个表:
> > >> > tableA: key  valueA
> > >> > tableB: key  valueB
> > >> >
> > >> > 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到
> valueA
> > >> > 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
> > >> > flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?
> > >> >
> > >> > 谢谢,
> > >> > 王磊
> > >> >
> > >>
> > >>
> >
>


Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-11 Thread Kurt Young
那有可能是可以的,你可以试试看

Best,
Kurt


On Wed, Mar 11, 2020 at 6:57 PM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

> Hi Kurt,
>
> 如果我不用 sql-client, 直接把表注册、sql join 之类的操作写在 java 代码里打成 jar 包就能实现从 state
> 中恢复的功能吗?
> 代码里没有任何跟 state 声明、TTL 定义之类的操作。任务 cancel -s 之前已经存在的表信息会在 state
> 存储并且再次提交任务可以被访问到直接用吗?
>
> 谢谢,
> 王磊
>
> --
> wangl...@geekplus.com.cn
>
>
> *Sender:* Kurt Young 
> *Send Time:* 2020-03-11 12:54
> *Receiver:* wangl...@geekplus.com.cn
> *cc:* user-zh 
> *Subject:* Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
> sql client 目前还不支持这个功能。
>
> Best,
> Kurt
>
>
> On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
>> Hi Kurt,
>> 确实是可以 直接 flink  cancel -s 保存状态。
>> 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢?
>>
>> 谢谢,
>> 王磊
>>
>>
>> *Sender:* Kurt Young 
>> *Send Time:* 2020-03-11 10:38
>> *Receiver:* user-zh 
>> *Subject:* Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
>> 理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。
>> 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn <
>> wangl...@geekplus.com.cn> wrote:
>>
>> > 有两个表:
>> > tableA: key  valueA
>> > tableB: key  valueB
>> >
>> > 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA
>> > 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
>> > flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?
>> >
>> > 谢谢,
>> > 王磊
>> >
>>
>>


Re: Use flink to calculate sum of the inventory under certain conditions

2020-03-11 Thread Kurt Young
> The second reason is this query need to scan the whole table. I think we
can do better :-)

Not necessarily, you said all the changes will trigger a DDB stream, you
can use Flink to consume such
stream incrementally.

For the 1st problem, I think you can use DataStream API and register a
timer on every inventory which
got inbound. If the inventory got updated before timeout, you can delete
the timer, otherwise the timer
will trigger the calculation after timeout and you can get the total count
and emit that whenever an inventory
times out.

Best,
Kurt


On Wed, Mar 11, 2020 at 4:53 PM Arvid Heise  wrote:

> About the problem, we have 2 choices. The first one is using Flink as
>> described in this email thread. The second one is using AWS Lambda
>> triggered by CDC stream and compute the latest 15 days record, which is a
>> walk-around solution and looks not as elegant as Flink to me.
>>
>>
> Currently we decided to choose AWS Lambda because we are familiar with it,
>> and the most important, it lead to nearly no operational burden. But we are
>> actively looking for the comparison between Lambda and Flink and want to
>> know in which situation we prefer Flink over Lambda. Several teams in our
>> company are already in a hot debate about the comparison, and the biggest
>> concern is the non-function requirements about Flink, such as fault
>> tolerance, recovery, etc.
>>
>> I also searched the internet but found there are nearly no comparisons
>> between Lambda and Flink except for their market share :-( I'm wondering
>> what do you think of this? Or any comments from flink community is
>> appreciated.
>>
>
> You pretty much described the biggest difference already. Doing any more
> complex operation with Lambda will turn into a mess quickly.
>
> Lambdas currently shine for two use cases because of the ease of operation
> and unlimited scalability:
> - Simple transformations: input -> transform -> output
> - Simple database updates (together with Dynamo): input -> lookup by key
> (db), update by key (db) -> output
>
> As soon as you exceed point queries (time windows, joins) or have state,
> Lambdas actually get harder to manage imho. You need a zoo of supporting
> technologies or sacrifice lots of performance.
>
> In Flink, you have a higher barrier to entry, but as soon as your
> streaming application grows, it pays off quickly. Data is relocated with
> processing, such that you don't need to program access patterns yourself.
>
> So I'd decide it on a case by case basis for each application. If it's one
> of the two above mentioned use cases, just go lambda. You will not gain
> much with Flink, especially if you already have the experience.
> If you know your application will grow out of these use cases or is more
> complex to begin with, consider Flink.
>
> There is also one relatively new technology based on Flink called stateful
> functions [1]. It tries to combine the advanced state processing of Flink
> with the benefits of Lambdas (albeit scalability is not unlimited). You
> might want to check that out, as it may solve your use cases.
>
> [1] https://statefun.io/
>
> On Wed, Mar 11, 2020 at 3:06 AM Jiawei Wu 
> wrote:
>
>> Hi Robert,
>>
>> Your answer really helps.
>>
>> About the problem, we have 2 choices. The first one is using Flink as
>> described in this email thread. The second one is using AWS Lambda
>> triggered by CDC stream and compute the latest 15 days record, which is a
>> walk-around solution and looks not as elegant as Flink to me.
>>
>> Currently we decided to choose AWS Lambda because we are familiar with
>> it, and the most important, it lead to nearly no operational burden. But we
>> are actively looking for the comparison between Lambda and Flink and want
>> to know in which situation we prefer Flink over Lambda. Several teams in
>> our company are already in a hot debate about the comparison, and the
>> biggest concern is the non-function requirements about Flink, such as fault
>> tolerance, recovery, etc.
>>
>> I also searched the internet but found there are nearly no comparisons
>> between Lambda and Flink except for their market share :-( I'm wondering
>> what do you think of this? Or any comments from flink community is
>> appreciated.
>>
>> Thanks,
>> J
>>
>>
>> On Mon, Mar 9, 2020 at 8:22 PM Robert Metzger 
>> wrote:
>>
>>> Hey Jiawei,
>>>
>>> I'm sorry that you haven't received an answer yet.
>>>
>>> So you basically have a stream of dynamodb table updates (let's call id
>>> CDC stream), and you would like to maintain the inventory of the last 15
>>> days for each vendor.
>>> Whenever there's an update in the inventory data (a new event arrives in
>>> the CDC stream), you want to produce a new event with the inventory count.
>>>
>>> If I'm not mistaken, you will need to keep all the inventory in Flink's
>>> state to have an accurate count and to drop old records when they are
>>> expired.
>>> There are two options for maintaining the state:
>>> - in memory (using the 

Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-10 Thread Kurt Young
我在社区建了个issue:https://issues.apache.org/jira/browse/FLINK-16534
后续你可以关注下

Best,
Kurt


On Wed, Mar 11, 2020 at 12:54 PM Kurt Young  wrote:

> sql client 目前还不支持这个功能。
>
> Best,
> Kurt
>
>
> On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
>> Hi Kurt,
>> 确实是可以 直接 flink  cancel -s 保存状态。
>> 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢?
>>
>> 谢谢,
>> 王磊
>>
>>
>> *Sender:* Kurt Young 
>> *Send Time:* 2020-03-11 10:38
>> *Receiver:* user-zh 
>> *Subject:* Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
>> 理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。
>> 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn <
>> wangl...@geekplus.com.cn> wrote:
>>
>> > 有两个表:
>> > tableA: key  valueA
>> > tableB: key  valueB
>> >
>> > 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA
>> > 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
>> > flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?
>> >
>> > 谢谢,
>> > 王磊
>> >
>>
>>


Re: Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-10 Thread Kurt Young
sql client 目前还不支持这个功能。

Best,
Kurt


On Wed, Mar 11, 2020 at 11:35 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

> Hi Kurt,
> 确实是可以 直接 flink  cancel -s 保存状态。
> 但我是用 flink-sql-client 直接写 sql 提交的 job,再提交的时候怎样可以指定状态目录让这个任务从状态恢复呢?
>
> 谢谢,
> 王磊
>
>
> *Sender:* Kurt Young 
> *Send Time:* 2020-03-11 10:38
> *Receiver:* user-zh 
> *Subject:* Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?
> 理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。
> 应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。
>
> Best,
> Kurt
>
>
> On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
> > 有两个表:
> > tableA: key  valueA
> > tableB: key  valueB
> >
> > 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA
> > 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
> > flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?
> >
> > 谢谢,
> > 王磊
> >
>
>


Re: flink HiveTableSink 何时支持 streaming 模式写入

2020-03-10 Thread Kurt Young
预计1.11会ready。

Best,
Kurt


On Wed, Mar 11, 2020 at 10:44 AM chenkaibit  wrote:

> Hi:
> 我看 https://issues.apache.org/jira/browse/FLINK-14255 引入了 一个
> FileSystemStreamingSink,貌似是为 HiveTableSink 支持 streaming
> 模式写入做准备,这个功能预计会在后续哪个版本正式发布呢?
>
>


Re: Use flink to calculate sum of the inventory under certain conditions

2020-03-10 Thread Kurt Young
Hi Jiawai,

Sorry I still didn't fully get your question. What's wrong with your
proposed SQL?

> select vendorId, sum(inventory units)
> from dynamodb
> where today's time - inbound time > 15
> group by vendorId

My guess is that such query would only trigger calculations by new event.
So if a very old
inventory like inbounded 17 days ago, and there is no new events coming
about that inventory,
then the calculation would not be triggered and you can't sum it, right?

Best,
Kurt


On Wed, Mar 11, 2020 at 10:06 AM Jiawei Wu 
wrote:

> Hi Robert,
>
> Your answer really helps.
>
> About the problem, we have 2 choices. The first one is using Flink as
> described in this email thread. The second one is using AWS Lambda
> triggered by CDC stream and compute the latest 15 days record, which is a
> walk-around solution and looks not as elegant as Flink to me.
>
> Currently we decided to choose AWS Lambda because we are familiar with it,
> and the most important, it lead to nearly no operational burden. But we are
> actively looking for the comparison between Lambda and Flink and want to
> know in which situation we prefer Flink over Lambda. Several teams in our
> company are already in a hot debate about the comparison, and the biggest
> concern is the non-function requirements about Flink, such as fault
> tolerance, recovery, etc.
>
> I also searched the internet but found there are nearly no comparisons
> between Lambda and Flink except for their market share :-( I'm wondering
> what do you think of this? Or any comments from flink community is
> appreciated.
>
> Thanks,
> J
>
>
> On Mon, Mar 9, 2020 at 8:22 PM Robert Metzger  wrote:
>
>> Hey Jiawei,
>>
>> I'm sorry that you haven't received an answer yet.
>>
>> So you basically have a stream of dynamodb table updates (let's call id
>> CDC stream), and you would like to maintain the inventory of the last 15
>> days for each vendor.
>> Whenever there's an update in the inventory data (a new event arrives in
>> the CDC stream), you want to produce a new event with the inventory count.
>>
>> If I'm not mistaken, you will need to keep all the inventory in Flink's
>> state to have an accurate count and to drop old records when they are
>> expired.
>> There are two options for maintaining the state:
>> - in memory (using the FsStateBackend)
>> - on disk (using the embedded RocksDBStatebackend)
>>
>> I would recommend starting with the RocksDBStateBackend. It will work as
>> long as your state fits on all your machines hard disks (we'll probably not
>> have an issue there :) )
>> If you run into performance issues, you can consider switching to a
>> memory based backend (by then, you should have some knowledge about your
>> state size)
>>
>> For tracking the events, I would recommend you to look into Flink's
>> windowing API:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html
>>  / https://flink.apache.org/news/2015/12/04/Introducing-windows.html
>> Or alternatively doing an implementation with ProcessFunction:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html
>> I personally would give it a try with ProcessFunction first.
>>
>> For reading the data from DynamoDB, there's an undocumented feature for
>> it in Flink. This is an example for reading from a DynamoDB stream:
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromDynamoDBStreams.java
>> Here's also some info: https://issues.apache.org/jira/browse/FLINK-4582
>>
>> For writing to DynamoDB there is currently no official sink in Flink. It
>> should be fairly straightforward to implement a Sink using the SinkFunction
>> interface of Flink.
>>
>> I hope this answers your question.
>>
>> Best,
>> Robert
>>
>>
>>
>>
>> On Tue, Mar 3, 2020 at 3:25 AM Jiawei Wu 
>> wrote:
>>
>>> Hi flink users,
>>>
>>> We have a problem and think flink may be a good solution for that. But
>>> I'm new to flink and hope can get some insights from flink community :)
>>>
>>> Here is the problem. Suppose we have a DynamoDB table which store the
>>> inventory data, the schema is like:
>>>
>>> * vendorId (primary key)
>>> * inventory name
>>> * inventory units
>>> * inbound time
>>> ...
>>>
>>> This DDB table keeps changing, since we have inventory coming and
>>> removal. *Every change will trigger a DynamoDB stream. *
>>> We need to calculate *all the inventory units that > 15 days for a
>>> specific vendor* like this:
>>> > select vendorId, sum(inventory units)
>>> > from dynamodb
>>> > where today's time - inbound time > 15
>>> > group by vendorId
>>> We don't want to schedule a daily batch job, so we are trying to work on
>>> a micro-batch solution in Flink, and publish this data to another DynamoDB
>>> table.
>>>
>>> A draft idea is to use the total units minus <15 days units, since both
>>> of then have event trigger. But no 

Re: flink sql join 可以有 state 存储并从 state 恢复数据吗?

2020-03-10 Thread Kurt Young
理论上来说,flink SQL的作业在编译完生成JobGraph并提交到集群上后,和Datastream的作业就没有什么本质的不同了。
应该也可以支持flink cancel -s 的功能,你可以先试下,如果碰到什么问题再看看。

Best,
Kurt


On Wed, Mar 11, 2020 at 10:24 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

> 有两个表:
> tableA: key  valueA
> tableB: key  valueB
>
> 我之前用 flink state 的方式存储 tableA,tableB 的消息过来以后去 query 这个 state 得到 valueA
> 直接 写 flinkSQL 也可以实现这种功能,但这两个表有时间差,任务停止后重新提交会丢失部分 join 的结果。
> flinkSQL 有没有类似 flink cancel -s 保存 state 的功能呢?
>
> 谢谢,
> 王磊
>


Re: flink 长时间运行后出现报错

2020-03-09 Thread Kurt Young
我帮你 cc 了对 runtime 更熟悉的 zhuzhu 同学。

Best,
Kurt


On Mon, Mar 9, 2020 at 6:44 PM lucas.wu  wrote:

> 没人回复大概是之前没人遇到过这种问题,所以下午看了flink的代码,终于有了点头绪。
> 原因分析:
> 这个异常的原因就是在task出现异常之后,它需要调用updateTaskExecutionState(TaskExecutionState
> taskExecutionState)这个rpc接口去通知flink jobmanager
>
> 去改变对应task的状态并且重启task。但是呢,taskExecutionState这个参数里面有个error属性,当我的的task打出来的错误栈太多的时候,在序列化的之后超过了
> rpc接口要求的最大数据大小(也就是maximum akka framesize),导致调用updateTaskExecutionState
> 这个rpc接口失败,jobmanager无法获知这个task已经fail
>
> 的状态,也无法重启。这就导致了一系列连锁反应,其中一个就是我的checkpoint一直失败,原因就是我的task其实已经释放了,但是jobmanger无法感知。
>
> 结论:
> 这个算不算flink的一个bug,对于task已经失效,但是无法通知到jobmanger,导致该task一直无法重启。
> 原始邮件
> 发件人:lucas.wulucas...@xiaoying.com
> 收件人:user-zhuser...@flink.apache.org
> 发送时间:2020年3月9日(周一) 11:06
> 主题:flink 长时间运行后出现报错
>
>
> 大家好: 我的flink程序的主要功能是从kafka消费数据,简单处理后,通过jdbc
> outputformat发往mysql,但是长时间运行后,报下面的错。请问是什么原因造成的,我增加参数就可以解决吗? 2020-03-08
> 06:10:30,480 WARN org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler -
> Could not create remote rpc invocation message. Failing rpc invocation
> because... java.io.IOException: The rpc invocation size 34500577 exceeds
> the maximum akka framesize. at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:271)
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:200)
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:129)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:78)
> at com.sun.proxy.$Proxy12.updateTaskExecutionState(Unknown Source) at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1428)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1458)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2200(TaskExecutor.java:154)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1757)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
> akka.actor.Actor$class.aroundReceive(Actor.scala:517) at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:130) at
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:527) at
> akka.actor.ActorCell.invoke(ActorCell.scala:496) at
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at
> akka.dispatch.Mailbox.run(Mailbox.scala:224) at
> akka.dispatch.Mailbox.exec(Mailbox.scala:234) at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2020-03-08 06:10:30,480 ERROR
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor - Caught exception while
> executing runnable in main thread.
> java.lang.reflect.UndeclaredThrowableException at
> com.sun.proxy.$Proxy12.updateTaskExecutionState(Unknown Source) at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1428)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1458)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2200(TaskExecutor.java:154)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1757)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at
> 

Re: Writing retract streams to Kafka

2020-03-06 Thread Kurt Young
@Gyula Fóra  I think your query is right, we should
produce insert only results if you have event time and watermark defined.
I've create https://issues.apache.org/jira/browse/FLINK-16466 to track this
issue.

Best,
Kurt


On Fri, Mar 6, 2020 at 12:14 PM Kurt Young  wrote:

> Actually this use case lead me to start thinking about one question:
> If watermark is enabled, could we also support GROUP BY event_time instead
> of forcing
> user defining a window based on the event_time.
>
> GROUP BY a standalone event_time can also be treated as a special window,
> which has
> both start_time and end_time equals to event_time. And when watermark
> surpass the event_time,
> we can still get the complete data of such group and do required
> aggregation and then emit
> insert only results.
>
> That would ease user's burden for not having to define a window when they
> already have event
> time and watermark defined.
>
> Best,
> Kurt
>
>
> On Fri, Mar 6, 2020 at 10:26 AM Jark Wu  wrote:
>
>> Hi Gyula,
>>
>> Does tumbling 5 seconds for aggregation meet your need? For example:
>>
>> INSERT INTO QueryResult
>> SELECT q.queryId, t.itemId, TUMBLE_START(q.event_time, INTERVAL '5'
>> SECOND), sum(t.quantity) AS quantity
>> FROM
>>   ItemTransactions AS t,
>>   Queries AS q
>> WHERE
>>   t.itemId = q.itemId AND
>>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND q.event_time
>> GROUP BY
>>   t.itemId, q.queryId, TUMBLE(q.event_time, INTERVAL '5' SECOND);
>>
>> Best,
>> Jark
>>
>> On Thu, 5 Mar 2020 at 23:05, Gyula Fóra  wrote:
>>
>>> I see, maybe I just dont understand how to properly express what I am
>>> trying to compute.
>>>
>>> Basically I want to aggregate the quantities of the transactions that
>>> happened in the 5 seconds before the query.
>>> Every query.id belongs to a single query (event_time, itemid) but still
>>> I have to group :/
>>>
>>> Gyula
>>>
>>> On Thu, Mar 5, 2020 at 3:45 PM Kurt Young  wrote:
>>>
>>>> I think the issue is not caused by event time interval join, but the
>>>> aggregation after the join:
>>>> GROUP BY t.itemId, q.event_time, q.queryId;
>>>>
>>>> In this case, there is still no chance for Flink to determine whether
>>>> the groups like (itemId, eventtime, queryId) have complete data or not.
>>>> As a comparison, if you change the grouping key to a window which based
>>>> only on q.event_time, then the query would emit insert only results.
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>>
>>>> On Thu, Mar 5, 2020 at 10:29 PM Gyula Fóra 
>>>> wrote:
>>>>
>>>>> That's exactly the kind of behaviour I am looking for Kurt ("ignore
>>>>> all delete messages").
>>>>>
>>>>> As for the data completion, in my above example it is basically an
>>>>> event time interval join.
>>>>> With watermarks defined Flink should be able to compute results once
>>>>> in exactly the same way as for the tumbling window.
>>>>>
>>>>> Gyula
>>>>>
>>>>> On Thu, Mar 5, 2020 at 3:26 PM Kurt Young  wrote:
>>>>>
>>>>>> Back to this case, I assume you are expecting something like "ignore
>>>>>> all delete messages" flag? With this
>>>>>> flag turned on, Flink will only send insert messages which
>>>>>> corresponding current correct results to kafka and
>>>>>> drop all retractions and deletes on the fly.
>>>>>>
>>>>>> Best,
>>>>>> Kurt
>>>>>>
>>>>>>
>>>>>> On Thu, Mar 5, 2020 at 10:24 PM Kurt Young  wrote:
>>>>>>
>>>>>>> > I also don't completely understand at this point why I can write
>>>>>>> the result of a group, tumble window aggregate to Kafka and not this 
>>>>>>> window
>>>>>>> join / aggregate.
>>>>>>>
>>>>>>> If you are doing a tumble window aggregate with watermark enabled,
>>>>>>> Flink will only fire a final result for
>>>>>>> each window at once, no modification or retractions will happen
>>>>>>> after a window is calculated and fired.
>>>>>>> But with some other arbitrary aggregations, ther

Re: Writing retract streams to Kafka

2020-03-05 Thread Kurt Young
Actually this use case lead me to start thinking about one question:
If watermark is enabled, could we also support GROUP BY event_time instead
of forcing
user defining a window based on the event_time.

GROUP BY a standalone event_time can also be treated as a special window,
which has
both start_time and end_time equals to event_time. And when watermark
surpass the event_time,
we can still get the complete data of such group and do required
aggregation and then emit
insert only results.

That would ease user's burden for not having to define a window when they
already have event
time and watermark defined.

Best,
Kurt


On Fri, Mar 6, 2020 at 10:26 AM Jark Wu  wrote:

> Hi Gyula,
>
> Does tumbling 5 seconds for aggregation meet your need? For example:
>
> INSERT INTO QueryResult
> SELECT q.queryId, t.itemId, TUMBLE_START(q.event_time, INTERVAL '5'
> SECOND), sum(t.quantity) AS quantity
> FROM
>   ItemTransactions AS t,
>   Queries AS q
> WHERE
>   t.itemId = q.itemId AND
>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND q.event_time
> GROUP BY
>   t.itemId, q.queryId, TUMBLE(q.event_time, INTERVAL '5' SECOND);
>
> Best,
> Jark
>
> On Thu, 5 Mar 2020 at 23:05, Gyula Fóra  wrote:
>
>> I see, maybe I just dont understand how to properly express what I am
>> trying to compute.
>>
>> Basically I want to aggregate the quantities of the transactions that
>> happened in the 5 seconds before the query.
>> Every query.id belongs to a single query (event_time, itemid) but still
>> I have to group :/
>>
>> Gyula
>>
>> On Thu, Mar 5, 2020 at 3:45 PM Kurt Young  wrote:
>>
>>> I think the issue is not caused by event time interval join, but the
>>> aggregation after the join:
>>> GROUP BY t.itemId, q.event_time, q.queryId;
>>>
>>> In this case, there is still no chance for Flink to determine whether
>>> the groups like (itemId, eventtime, queryId) have complete data or not.
>>> As a comparison, if you change the grouping key to a window which based
>>> only on q.event_time, then the query would emit insert only results.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Thu, Mar 5, 2020 at 10:29 PM Gyula Fóra  wrote:
>>>
>>>> That's exactly the kind of behaviour I am looking for Kurt ("ignore all
>>>> delete messages").
>>>>
>>>> As for the data completion, in my above example it is basically an
>>>> event time interval join.
>>>> With watermarks defined Flink should be able to compute results once in
>>>> exactly the same way as for the tumbling window.
>>>>
>>>> Gyula
>>>>
>>>> On Thu, Mar 5, 2020 at 3:26 PM Kurt Young  wrote:
>>>>
>>>>> Back to this case, I assume you are expecting something like "ignore
>>>>> all delete messages" flag? With this
>>>>> flag turned on, Flink will only send insert messages which
>>>>> corresponding current correct results to kafka and
>>>>> drop all retractions and deletes on the fly.
>>>>>
>>>>> Best,
>>>>> Kurt
>>>>>
>>>>>
>>>>> On Thu, Mar 5, 2020 at 10:24 PM Kurt Young  wrote:
>>>>>
>>>>>> > I also don't completely understand at this point why I can write
>>>>>> the result of a group, tumble window aggregate to Kafka and not this 
>>>>>> window
>>>>>> join / aggregate.
>>>>>>
>>>>>> If you are doing a tumble window aggregate with watermark enabled,
>>>>>> Flink will only fire a final result for
>>>>>> each window at once, no modification or retractions will happen after
>>>>>> a window is calculated and fired.
>>>>>> But with some other arbitrary aggregations, there is not enough
>>>>>> information for Flink to determine whether
>>>>>> the data is complete or not, so the framework will keep calculating
>>>>>> results when receiving new records and
>>>>>> retract earlier results by firing retraction/deletion messages.
>>>>>>
>>>>>> Best,
>>>>>> Kurt
>>>>>>
>>>>>>
>>>>>> On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra 
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks Benoît!
>>>>>>>
>>>>>>> I can see now how I can implement 

Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Kurt Young
If you already have a running flink cluster and you want submit another job
to this cluster, then all the configurations
relates to process parameters like TM memory, slot number etc are not be
able to modify.

Best,
Kurt


On Thu, Mar 5, 2020 at 11:08 PM Gyula Fóra  wrote:

> Kurt can you please explain which conf parameters do you mean?
>
> In regular executions (Yarn for instance) we  have dynamic config
> parameters overriding any flink-conf argument.
> So it is not about setting them in the user code but it should happen
> before the ClusterDescriptors are created (ie in the together with the
> CustomCommandLine logic)
>
> Gyula
>
> On Thu, Mar 5, 2020 at 3:49 PM Kurt Young  wrote:
>
>> IIRC the tricky thing here is not all the config options belong to
>> flink-conf.yaml can be adjust dynamically in user's program.
>> So it will end up like some of the configurations can be overridden but
>> some are not. The experience is not quite good for users.
>>
>> Best,
>> Kurt
>>
>>
>> On Thu, Mar 5, 2020 at 10:15 PM Jeff Zhang  wrote:
>>
>>> Hi Gyula,
>>>
>>> I am doing integration Flink with Zeppelin. One feature in Zeppelin is
>>> that user could override any features in flink-conf.yaml. (Actually any
>>> features here
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html).
>>> Of course you can run flink sql in Zeppelin, and could also leverage other
>>> features of Zeppelin, like visualization.
>>>
>>> If you are interested, you could try the master branch of Zeppelin +
>>> this improvement PR
>>>
>>> https://github.com/apache/zeppelin
>>> https://github.com/apache/zeppelin/pull/3676
>>> https://github.com/apache/zeppelin/blob/master/docs/interpreter/flink.md
>>>
>>>
>>>
>>>
>>>
>>>
>>> Gyula Fóra  于2020年3月5日周四 下午6:51写道:
>>>
>>>> I could basically list a few things I want to set (execution.target for
>>>> example), but it's fair to assume that I would like to be able to set
>>>> anything :)
>>>>
>>>> Gyula
>>>>
>>>> On Thu, Mar 5, 2020 at 11:35 AM Jingsong Li 
>>>> wrote:
>>>>
>>>>> Hi Gyula,
>>>>>
>>>>> Maybe Blink planner has invoked
>>>>> "StreamExecutionEnvironment.configure", which planner do you use?
>>>>>
>>>>> But "StreamExecutionEnvironment.configure" is only for partial
>>>>> configuration, can not for all configuration in flink-conf.yaml.
>>>>> So what's the config do you want to set? I know some config like
>>>>> "taskmanager.network.blocking-shuffle.compression.enabled" can not set
>>>>>
>>>>> Best,
>>>>> Jingsong Lee
>>>>>
>>>>> On Thu, Mar 5, 2020 at 6:17 PM Jark Wu  wrote:
>>>>>
>>>>>> Hi Gyula,
>>>>>>
>>>>>> Flink configurations can be overrided via
>>>>>> `TableConfig#getConfiguration()`, however, SQL CLI only allows to set 
>>>>>> Table
>>>>>> specific configs.
>>>>>> I will think it as a bug/improvement of SQL CLI which should be fixed
>>>>>> in 1.10.1.
>>>>>>
>>>>>> Best,
>>>>>> Jark
>>>>>>
>>>>>> On Thu, 5 Mar 2020 at 18:12, Gyula Fóra  wrote:
>>>>>>
>>>>>>> Thanks Caizhi,
>>>>>>>
>>>>>>> This seems like a pretty big shortcoming for any
>>>>>>> multi-user/multi-app environment. I will open a jira for this.
>>>>>>>
>>>>>>> Gyula
>>>>>>>
>>>>>>> On Thu, Mar 5, 2020 at 10:58 AM Caizhi Weng 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Gyula.
>>>>>>>>
>>>>>>>> I'm afraid there is no way to override all Flink configurations
>>>>>>>> currently. SQL client yaml file can only override some of the Flink
>>>>>>>> configurations.
>>>>>>>>
>>>>>>>> Configuration entries indeed can only set Table specific configs,
>>>>>>>> while deployment entires are used to set the result fetching address 
>>>>>>>> and
>>>>>>>> port. There is currently no way to change the execution target from 
>>>>>>>> the SQL
>>>>>>>> client.
>>>>>>>>
>>>>>>>> Gyula Fóra  于2020年3月5日周四 下午4:31写道:
>>>>>>>>
>>>>>>>>> Hi All!
>>>>>>>>>
>>>>>>>>> I am trying to understand if there is any way to override flink
>>>>>>>>> configuration parameters when starting the SQL Client.
>>>>>>>>>
>>>>>>>>> It seems that the only way to pass any parameters is through the
>>>>>>>>> environment yaml.
>>>>>>>>>
>>>>>>>>> There I found 2 possible routes:
>>>>>>>>>
>>>>>>>>> configuration: this doesn't work as it only sets Table specific
>>>>>>>>> configs apparently, but maybe I am wrong.
>>>>>>>>>
>>>>>>>>> deployment: I tried using dynamic properties options here but
>>>>>>>>> unfortunately we normalize (lowercase) the YAML keys so it is 
>>>>>>>>> impossible to
>>>>>>>>> pass options like -yD or -D.
>>>>>>>>>
>>>>>>>>> Does anyone have any suggestions?
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Gyula
>>>>>>>>>
>>>>>>>>
>>>>>
>>>>> --
>>>>> Best, Jingsong Lee
>>>>>
>>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>


Re: How to use self defined json format when create table from kafka stream?

2020-03-05 Thread Kurt Young
User defined formats also sounds like an interesting extension.

Best,
Kurt


On Thu, Mar 5, 2020 at 3:06 PM Jark Wu  wrote:

> Hi Lei,
>
> Currently, Flink SQL doesn't support to register a binlog format (i.e.
> just define "order_id" and "order_no", but the json schema has other binlog
> fields).
> This is exactly what we want to support in FLIP-105 [1] and FLIP-95.
>
> For now, if you want to consume such json data, you have to define the
> full schema, e.g. "type", "timestmap", and so on...
>
> Btw, what Change Data Capture (CDC) tool are you using?
>
> Best,
> Jark
>
> [1]:
> https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#
>
>
> On Thu, 5 Mar 2020 at 11:40, wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
>>
>> I want to rigister a table from mysql binlog like this:
>>
>> tEnv.sqlUpdate("CREATE TABLE order(\n"
>> + "order_id BIGINT,\n"
>> + "order_no VARCHAR,\n"
>> + ") WITH (\n"
>> + "'connector.type' = 'kafka',\n"
>> ...
>> + "'update-mode' = 'append',\n"
>> + "'format.type' = 'json',\n"
>> + "'format.derive-schema' = 'true'\n"
>> + ")");
>>
>> using the following log format:
>>
>> {
>>   "type" : "update",
>>   "timestamp" : 1583373066000,
>>   "binlog_filename" : "mysql-bin.000453",
>>   "binlog_position" : 923020943,
>>   "database" : "wms",
>>   "table_name" : "t_pick_order",
>>   "table_id" : 131936,
>>   "columns" : [ {
>> "id" : 1,
>> "name" : "order_id",
>> "column_type" : -5,
>> "last_value" : 4606458,
>> "value" : 4606458
>>   }, {
>> "id" : 2,
>> "name" : "order_no",
>> "column_type" : 12,
>> "last_value" : "EDBMFSJ1S2003050006628",
>> "value" : "EDBMFSJ1S2003050006628"
>>   }]
>> }
>>
>>
>> Surely the format.type' = 'json',\n" will not parse the result as I
>> expected.
>> Is there any method I can implement this? For example, using a self
>> defined format class.
>>
>> Thanks,
>> Lei
>>
>> --
>> wangl...@geekplus.com.cn
>>
>>
>>


Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Kurt Young
IIRC the tricky thing here is not all the config options belong to
flink-conf.yaml can be adjust dynamically in user's program.
So it will end up like some of the configurations can be overridden but
some are not. The experience is not quite good for users.

Best,
Kurt


On Thu, Mar 5, 2020 at 10:15 PM Jeff Zhang  wrote:

> Hi Gyula,
>
> I am doing integration Flink with Zeppelin. One feature in Zeppelin is
> that user could override any features in flink-conf.yaml. (Actually any
> features here
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html).
> Of course you can run flink sql in Zeppelin, and could also leverage other
> features of Zeppelin, like visualization.
>
> If you are interested, you could try the master branch of Zeppelin + this
> improvement PR
>
> https://github.com/apache/zeppelin
> https://github.com/apache/zeppelin/pull/3676
> https://github.com/apache/zeppelin/blob/master/docs/interpreter/flink.md
>
>
>
>
>
>
> Gyula Fóra  于2020年3月5日周四 下午6:51写道:
>
>> I could basically list a few things I want to set (execution.target for
>> example), but it's fair to assume that I would like to be able to set
>> anything :)
>>
>> Gyula
>>
>> On Thu, Mar 5, 2020 at 11:35 AM Jingsong Li 
>> wrote:
>>
>>> Hi Gyula,
>>>
>>> Maybe Blink planner has invoked "StreamExecutionEnvironment.configure",
>>> which planner do you use?
>>>
>>> But "StreamExecutionEnvironment.configure" is only for partial
>>> configuration, can not for all configuration in flink-conf.yaml.
>>> So what's the config do you want to set? I know some config like
>>> "taskmanager.network.blocking-shuffle.compression.enabled" can not set
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Thu, Mar 5, 2020 at 6:17 PM Jark Wu  wrote:
>>>
 Hi Gyula,

 Flink configurations can be overrided via
 `TableConfig#getConfiguration()`, however, SQL CLI only allows to set Table
 specific configs.
 I will think it as a bug/improvement of SQL CLI which should be fixed
 in 1.10.1.

 Best,
 Jark

 On Thu, 5 Mar 2020 at 18:12, Gyula Fóra  wrote:

> Thanks Caizhi,
>
> This seems like a pretty big shortcoming for any multi-user/multi-app
> environment. I will open a jira for this.
>
> Gyula
>
> On Thu, Mar 5, 2020 at 10:58 AM Caizhi Weng 
> wrote:
>
>> Hi Gyula.
>>
>> I'm afraid there is no way to override all Flink configurations
>> currently. SQL client yaml file can only override some of the Flink
>> configurations.
>>
>> Configuration entries indeed can only set Table specific configs,
>> while deployment entires are used to set the result fetching address and
>> port. There is currently no way to change the execution target from the 
>> SQL
>> client.
>>
>> Gyula Fóra  于2020年3月5日周四 下午4:31写道:
>>
>>> Hi All!
>>>
>>> I am trying to understand if there is any way to override flink
>>> configuration parameters when starting the SQL Client.
>>>
>>> It seems that the only way to pass any parameters is through the
>>> environment yaml.
>>>
>>> There I found 2 possible routes:
>>>
>>> configuration: this doesn't work as it only sets Table specific
>>> configs apparently, but maybe I am wrong.
>>>
>>> deployment: I tried using dynamic properties options here but
>>> unfortunately we normalize (lowercase) the YAML keys so it is 
>>> impossible to
>>> pass options like -yD or -D.
>>>
>>> Does anyone have any suggestions?
>>>
>>> Thanks
>>> Gyula
>>>
>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Writing retract streams to Kafka

2020-03-05 Thread Kurt Young
I think the issue is not caused by event time interval join, but the
aggregation after the join:
GROUP BY t.itemId, q.event_time, q.queryId;

In this case, there is still no chance for Flink to determine whether the
groups like (itemId, eventtime, queryId) have complete data or not.
As a comparison, if you change the grouping key to a window which based
only on q.event_time, then the query would emit insert only results.

Best,
Kurt


On Thu, Mar 5, 2020 at 10:29 PM Gyula Fóra  wrote:

> That's exactly the kind of behaviour I am looking for Kurt ("ignore all
> delete messages").
>
> As for the data completion, in my above example it is basically an event
> time interval join.
> With watermarks defined Flink should be able to compute results once in
> exactly the same way as for the tumbling window.
>
> Gyula
>
> On Thu, Mar 5, 2020 at 3:26 PM Kurt Young  wrote:
>
>> Back to this case, I assume you are expecting something like "ignore all
>> delete messages" flag? With this
>> flag turned on, Flink will only send insert messages which corresponding
>> current correct results to kafka and
>> drop all retractions and deletes on the fly.
>>
>> Best,
>> Kurt
>>
>>
>> On Thu, Mar 5, 2020 at 10:24 PM Kurt Young  wrote:
>>
>>> > I also don't completely understand at this point why I can write the
>>> result of a group, tumble window aggregate to Kafka and not this window
>>> join / aggregate.
>>>
>>> If you are doing a tumble window aggregate with watermark enabled, Flink
>>> will only fire a final result for
>>> each window at once, no modification or retractions will happen after a
>>> window is calculated and fired.
>>> But with some other arbitrary aggregations, there is not enough
>>> information for Flink to determine whether
>>> the data is complete or not, so the framework will keep calculating
>>> results when receiving new records and
>>> retract earlier results by firing retraction/deletion messages.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra  wrote:
>>>
>>>> Thanks Benoît!
>>>>
>>>> I can see now how I can implement this myself through the provided sink
>>>> interfaces but I was trying to avoid having to write code for this :D
>>>> My initial motivation was to see whether we are able to write out any
>>>> kind of table to Kafka as a simple stream of "upserts".
>>>>
>>>> I also don't completely understand at this point why I can write the
>>>> result of a group, tumble window aggregate to Kafka and not this window
>>>> join / aggregate.
>>>>
>>>> Cheers,
>>>> Gyula
>>>>
>>>> On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris <
>>>> benoit.pa...@centraliens-lille.org> wrote:
>>>>
>>>>> Hi Gyula,
>>>>>
>>>>> I'm afraid conversion to see the retractions vs inserts can't be done
>>>>> in pure SQL (though I'd love that feature).
>>>>>
>>>>> You might want to go lower level and implement a
>>>>> RetractStreamTableSink [1][2] that you would wrap around a KafkaTableSink
>>>>> [3]. This will give you a emitDataStream(DataStream>
>>>>> dataStream);, in which the Boolean flag will give you an 'accumulate' or
>>>>> 'retract' signal.
>>>>> You can then filter the DataStream accordingly before passing to the
>>>>> KafkaTableSink.
>>>>>
>>>>> Hope this helps.
>>>>>
>>>>> Best regards
>>>>> Benoît
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
>>>>> [2]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
>>>>> [3]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html
>>>>>
>>>>> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra 
>>>>> wrote:
>>>>>
>>>>>> Hi Roman,
>>>>>>
>>>>>> This is the core logic:
>>>>>>
>>>>>> CREATE TABLE QueryResult (
>>

Re: Writing retract streams to Kafka

2020-03-05 Thread Kurt Young
Back to this case, I assume you are expecting something like "ignore all
delete messages" flag? With this
flag turned on, Flink will only send insert messages which corresponding
current correct results to kafka and
drop all retractions and deletes on the fly.

Best,
Kurt


On Thu, Mar 5, 2020 at 10:24 PM Kurt Young  wrote:

> > I also don't completely understand at this point why I can write the
> result of a group, tumble window aggregate to Kafka and not this window
> join / aggregate.
>
> If you are doing a tumble window aggregate with watermark enabled, Flink
> will only fire a final result for
> each window at once, no modification or retractions will happen after a
> window is calculated and fired.
> But with some other arbitrary aggregations, there is not enough
> information for Flink to determine whether
> the data is complete or not, so the framework will keep calculating
> results when receiving new records and
> retract earlier results by firing retraction/deletion messages.
>
> Best,
> Kurt
>
>
> On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra  wrote:
>
>> Thanks Benoît!
>>
>> I can see now how I can implement this myself through the provided sink
>> interfaces but I was trying to avoid having to write code for this :D
>> My initial motivation was to see whether we are able to write out any
>> kind of table to Kafka as a simple stream of "upserts".
>>
>> I also don't completely understand at this point why I can write the
>> result of a group, tumble window aggregate to Kafka and not this window
>> join / aggregate.
>>
>> Cheers,
>> Gyula
>>
>> On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris <
>> benoit.pa...@centraliens-lille.org> wrote:
>>
>>> Hi Gyula,
>>>
>>> I'm afraid conversion to see the retractions vs inserts can't be done in
>>> pure SQL (though I'd love that feature).
>>>
>>> You might want to go lower level and implement a RetractStreamTableSink
>>> [1][2] that you would wrap around a KafkaTableSink [3]. This will give you
>>> a emitDataStream(DataStream> dataStream);, in which the
>>> Boolean flag will give you an 'accumulate' or 'retract' signal.
>>> You can then filter the DataStream accordingly before passing to the
>>> KafkaTableSink.
>>>
>>> Hope this helps.
>>>
>>> Best regards
>>> Benoît
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html
>>>
>>> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra  wrote:
>>>
>>>> Hi Roman,
>>>>
>>>> This is the core logic:
>>>>
>>>> CREATE TABLE QueryResult (
>>>> queryIdBIGINT,
>>>>   itemIdSTRING,
>>>>   quantity INT
>>>> ) WITH (
>>>> 'connector.type' = 'kafka',
>>>> 'connector.version' = 'universal',
>>>> 'connector.topic'   = 'query.output.log.1',
>>>> 'connector.properties.bootstrap.servers' = '',
>>>> 'format.type' = 'json'
>>>> );
>>>>
>>>> INSERT INTO QueryResult
>>>> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
>>>> FROM
>>>>   ItemTransactions AS t,
>>>>   Queries AS q
>>>> WHERE
>>>>   t.itemId = q.itemId AND
>>>>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND
>>>> q.event_time
>>>> GROUP BY
>>>>   t.itemId, q.event_time, q.queryId;
>>>>
>>>> And the error I get is:
>>>> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid
>>>> SQL update statement.
>>>> at
>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
>>>> at
>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
>>>> at
>>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
>>>> at
>>>> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548)
>>>> at
>>>> org.apache.flink.table.client.cli.CliClient.callCommand(CliCl

Re: Writing retract streams to Kafka

2020-03-05 Thread Kurt Young
> I also don't completely understand at this point why I can write the
result of a group, tumble window aggregate to Kafka and not this window
join / aggregate.

If you are doing a tumble window aggregate with watermark enabled, Flink
will only fire a final result for
each window at once, no modification or retractions will happen after a
window is calculated and fired.
But with some other arbitrary aggregations, there is not enough information
for Flink to determine whether
the data is complete or not, so the framework will keep calculating results
when receiving new records and
retract earlier results by firing retraction/deletion messages.

Best,
Kurt


On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra  wrote:

> Thanks Benoît!
>
> I can see now how I can implement this myself through the provided sink
> interfaces but I was trying to avoid having to write code for this :D
> My initial motivation was to see whether we are able to write out any kind
> of table to Kafka as a simple stream of "upserts".
>
> I also don't completely understand at this point why I can write the
> result of a group, tumble window aggregate to Kafka and not this window
> join / aggregate.
>
> Cheers,
> Gyula
>
> On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris <
> benoit.pa...@centraliens-lille.org> wrote:
>
>> Hi Gyula,
>>
>> I'm afraid conversion to see the retractions vs inserts can't be done in
>> pure SQL (though I'd love that feature).
>>
>> You might want to go lower level and implement a RetractStreamTableSink
>> [1][2] that you would wrap around a KafkaTableSink [3]. This will give you
>> a emitDataStream(DataStream> dataStream);, in which the
>> Boolean flag will give you an 'accumulate' or 'retract' signal.
>> You can then filter the DataStream accordingly before passing to the
>> KafkaTableSink.
>>
>> Hope this helps.
>>
>> Best regards
>> Benoît
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html
>>
>> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra  wrote:
>>
>>> Hi Roman,
>>>
>>> This is the core logic:
>>>
>>> CREATE TABLE QueryResult (
>>> queryIdBIGINT,
>>>   itemIdSTRING,
>>>   quantity INT
>>> ) WITH (
>>> 'connector.type' = 'kafka',
>>> 'connector.version' = 'universal',
>>> 'connector.topic'   = 'query.output.log.1',
>>> 'connector.properties.bootstrap.servers' = '',
>>> 'format.type' = 'json'
>>> );
>>>
>>> INSERT INTO QueryResult
>>> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
>>> FROM
>>>   ItemTransactions AS t,
>>>   Queries AS q
>>> WHERE
>>>   t.itemId = q.itemId AND
>>>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND
>>> q.event_time
>>> GROUP BY
>>>   t.itemId, q.event_time, q.queryId;
>>>
>>> And the error I get is:
>>> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL
>>> update statement.
>>> at
>>> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
>>> at
>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
>>> at
>>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
>>> at
>>> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548)
>>> at
>>> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310)
>>> at java.util.Optional.ifPresent(Optional.java:159)
>>> at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
>>> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>>> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
>>> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
>>> Caused by: org.apache.flink.table.api.TableException:
>>> AppendStreamTableSink requires that Table has only insert changes.
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>
>>> I am wondering what could I do to just simply pump the result updates to
>>> Kafka here.
>>>
>>> Gyula
>>>
>>> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman <
>>> khachatryan.ro...@gmail.com> wrote:
>>>
 Hi Gyula,

 Could you provide the code of your Flink program, the error with
 stacktrace and the Flink version?

 Thanks.,
 Roman


 On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra  wrote:

> Hi All!
>
> Excuse my stupid question, I 

Re: Hive Source With Kerberos认证问题

2020-02-27 Thread Kurt Young
cc @li...@apache.org 

Best,
Kurt


On Thu, Feb 13, 2020 at 10:22 AM 叶贤勋  wrote:

> Hi 大家好:
> 在做hive2.1.1 source带Kerberos认证有个异常请教下大家。
> flink 版本1.9
> hive 版本2.1.1,实现了HiveShimV211。
> 代码:
> public class HiveCatalogTest {
>private static final Logger LOG =
> LoggerFactory.getLogger(HiveCatalogTest.class);
>private String hiveConfDir = "/Users/yexianxun/dev/env/test-hive"; // a
> local path
>private TableEnvironment tableEnv;
>private HiveCatalog hive;
>private String hiveName;
>private String hiveDB;
>private String version;
>
>
>@Before
>public void before() {
>   EnvironmentSettings settings =
>  EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inBatchMode()
> .build();
>   tableEnv = TableEnvironment.create(settings);
>   hiveName = "myhive";
>   hiveDB = "sloth";
>   version = "2.1.1";
>}
>
>
>@Test
>public void testCatalogQuerySink() throws Exception {
>   hive = new HiveCatalog(hiveName, hiveDB, hiveConfDir, version);
>   System.setProperty("java.security.krb5.conf", hiveConfDir +
> "/krb5.conf");
>   tableEnv.getConfig().getConfiguration().setString("stream_mode",
> "false");
>   tableEnv.registerCatalog(hiveName, hive);
>   tableEnv.useCatalog(hiveName);
>   String query = "select * from " + hiveName + "." + hiveDB +
> ".testtbl2 where id = 20200202";
>   Table table = tableEnv.sqlQuery(query);
>   String newTableName = "testtbl2_1";
>   table.insertInto(hiveName, hiveDB, newTableName);
>   tableEnv.execute("test");
>}
> }
>
>
> HiveMetastoreClientFactory:
>public static HiveMetastoreClientWrapper create(HiveConf hiveConf,
> String hiveVersion) {
>   Preconditions.checkNotNull(hiveVersion, "Hive version cannot be
> null");
>   if (System.getProperty("java.security.krb5.conf") != null) {
>  if (System.getProperty("had_set_kerberos") == null) {
> String principal = "sloth/d...@bdms.163.com";
> String keytab =
> "/Users/yexianxun/dev/env/mammut-test-hive/sloth.keytab";
> try {
>sun.security.krb5.Config.refresh();
>UserGroupInformation.setConfiguration(hiveConf);
>UserGroupInformation.loginUserFromKeytab(principal, keytab);
>System.setProperty("had_set_kerberos", "true");
> } catch (Exception e) {
>LOG.error("", e);
> }
>  }
>   }
>   return new HiveMetastoreClientWrapper(hiveConf, hiveVersion);
>}
>
>
> HiveCatalog:
>private static HiveConf createHiveConf(@Nullable String hiveConfDir) {
>   LOG.info("Setting hive conf dir as {}", hiveConfDir);
>   try {
>  HiveConf.setHiveSiteLocation(
> hiveConfDir == null ?
>null : Paths.get(hiveConfDir,
> "hive-site.xml").toUri().toURL());
>   } catch (MalformedURLException e) {
>  throw new CatalogException(
> String.format("Failed to get hive-site.xml from %s",
> hiveConfDir), e);
>   }
>
>
>   // create HiveConf from hadoop configuration
>   HiveConf hiveConf = new
> HiveConf(HadoopUtils.getHadoopConfiguration(new
> org.apache.flink.configuration.Configuration()),
>  HiveConf.class);
>   try {
>  hiveConf.addResource(Paths.get(hiveConfDir,
> "hdfs-site.xml").toUri().toURL());
>  hiveConf.addResource(Paths.get(hiveConfDir,
> "core-site.xml").toUri().toURL());
>   } catch (MalformedURLException e) {
>  throw new CatalogException(String.format("Failed to get
> hdfs|core-site.xml from %s", hiveConfDir), e);
>   }
>   return hiveConf;
>}
>
>
> 在执行testCatalogQuerySink方法报以下错误:
> org.apache.flink.runtime.client.JobExecutionException: Could not retrieve
> JobResult.
>
>
> at
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:622)
> at
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:117)
> at
> org.apache.flink.table.planner.delegation.BatchExecutor.execute(BatchExecutor.java:55)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:410)
> at api.HiveCatalogTest.testCatalogQuerySink(HiveCatalogMumTest.java:234)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> 

[ANNOUNCE] Jingsong Lee becomes a Flink committer

2020-02-20 Thread Kurt Young
Hi everyone,

I'm very happy to announce that Jingsong Lee accepted the offer of the
Flink PMC to
become a committer of the Flink project.

Jingsong Lee has been an active community member for more than a year now.
He is
mainly focus on Flink SQL, played an essential role during blink planner
merging, drives
FLIP-63 and helped implementing rework expression design, and also
implemented
and fixed lots of features and bugs in Flink SQL. Moreover, he is very
active in both dev
and user mailing lists, helped discussing designs and answering users
questions, also
helped to verify various releases.

Congratulations Jingsong!

Best, Kurt
(on behalf of the Flink PMC)


Re: [ANNOUNCE] Apache Flink 1.10.0 released

2020-02-12 Thread Kurt Young
Congratulations to everyone involved!
Great thanks to Yu & Gary for being the release manager!

Best,
Kurt


On Thu, Feb 13, 2020 at 10:06 AM Hequn Cheng  wrote:

> Great thanks to Yu & Gary for being the release manager!
> Also thanks to everyone who made this release possible!
>
> Best, Hequn
>
> On Thu, Feb 13, 2020 at 9:54 AM Rong Rong  wrote:
>
>> Congratulations, a big thanks to the release managers for all the hard
>> works!!
>>
>> --
>> Rong
>>
>> On Wed, Feb 12, 2020 at 5:52 PM Yang Wang  wrote:
>>
>>> Excellent work. Thanks Gary & Yu for being the release manager.
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Jeff Zhang  于2020年2月13日周四 上午9:36写道:
>>>
 Congratulations! Really appreciated your hard work.

 Yangze Guo  于2020年2月13日周四 上午9:29写道:

> Thanks, Gary & Yu. Congrats to everyone involved!
>
> Best,
> Yangze Guo
>
> On Thu, Feb 13, 2020 at 9:23 AM Jingsong Li 
> wrote:
> >
> > Congratulations! Great work.
> >
> > Best,
> > Jingsong Lee
> >
> > On Wed, Feb 12, 2020 at 11:05 PM Leonard Xu 
> wrote:
> >>
> >> Great news!
> >> Thanks everyone involved !
> >> Thanks Gary and Yu for being the release manager !
> >>
> >> Best,
> >> Leonard Xu
> >>
> >> 在 2020年2月12日,23:02,Stephan Ewen  写道:
> >>
> >> Congrats to us all.
> >>
> >> A big piece of work, nicely done.
> >>
> >> Let's hope that this helps our users make their existing use cases
> easier and also opens up new use cases.
> >>
> >> On Wed, Feb 12, 2020 at 3:31 PM 张光辉  wrote:
> >>>
> >>> Greet work.
> >>>
> >>> Congxian Qiu  于2020年2月12日周三 下午10:11写道:
> 
>  Great work.
>  Thanks everyone involved.
>  Thanks Gary and Yu for being the release manager
> 
> 
>  Best,
>  Congxian
> 
> 
>  Jark Wu  于2020年2月12日周三 下午9:46写道:
> >
> > Congratulations to everyone involved!
> > Great thanks to Yu & Gary for being the release manager!
> >
> > Best,
> > Jark
> >
> > On Wed, 12 Feb 2020 at 21:42, Zhu Zhu  wrote:
> >>
> >> Cheers!
> >> Thanks Gary and Yu for the great job as release managers.
> >> And thanks to everyone whose contribution makes the release
> possible!
> >>
> >> Thanks,
> >> Zhu Zhu
> >>
> >> Wyatt Chun  于2020年2月12日周三 下午9:36写道:
> >>>
> >>> Sounds great. Congrats & Thanks!
> >>>
> >>> On Wed, Feb 12, 2020 at 9:31 PM Yu Li 
> wrote:
> 
>  The Apache Flink community is very happy to announce the
> release of Apache Flink 1.10.0, which is the latest major release.
> 
>  Apache Flink® is an open-source stream processing framework
> for distributed, high-performing, always-available, and accurate data
> streaming applications.
> 
>  The release is available for download at:
>  https://flink.apache.org/downloads.html
> 
>  Please check out the release blog post for an overview of the
> improvements for this new major release:
>  https://flink.apache.org/news/2020/02/11/release-1.10.0.html
> 
>  The full release notes are available in Jira:
> 
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12345845
> 
>  We would like to thank all contributors of the Apache Flink
> community who made this release possible!
> 
>  Cheers,
>  Gary & Yu
> >>
> >>
> >
> >
> > --
> > Best, Jingsong Lee
>


 --
 Best Regards

 Jeff Zhang

>>>


[DISCUSS] Remove registration of TableSource/TableSink in Table Env and ConnectTableDescriptor

2020-02-04 Thread Kurt Young
Hi all,

I'd like to bring up a discussion about removing registration of
TableSource and
TableSink in TableEnvironment as well as in ConnectTableDescriptor. The
affected
method would be:

TableEnvironment::registerTableSource
TableEnvironment::fromTableSource
TableEnvironment::registerTableSink
ConnectTableDescriptor::registerTableSource
ConnectTableDescriptor::registerTableSink
ConnectTableDescriptor::registerTableSourceAndSink

(Most of them are already deprecated, except for
TableEnvironment::fromTableSource,
which was intended to deprecate but missed by accident).

FLIP-64 [1] already explained why we want to deprecate TableSource &
TableSink from
user's interface. In a short word, these interfaces should only read &
write the physical
representation of the table, and they are not fitting well after we already
introduced some
logical table fields such as computed column, watermarks.

Another reason is the exposure of registerTableSource in Table Env just
make the whole
SQL protocol opposite. TableSource should be used as a reader of table, it
should rely on
other metadata information held by framework, which eventually comes from
DDL or
ConnectDescriptor. But if we register a TableSource to Table Env, we have
no choice but
have to rely on TableSource::getTableSchema. It will make the design
obscure, sometimes
TableSource should trust the information comes from framework, but
sometimes it should
also generate its own schema information.

Furthermore, if the authority about schema information is not clear, it
will make things much
more complicated if we want to improve the table api usability such as
introducing automatic
schema inference in the near future.

Since this is an API break change, I've also included user mailing list to
gather more feedbacks.

Best,
Kurt

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-64%3A+Support+for+Temporary+Objects+in+Table+module


Re: Please suggest helpful tools

2020-01-13 Thread Kurt Young
First could you check whether the added filter conditions are executed
before join operators? If they are
already pushed down and executed before join, it's should be some real join
keys generating data skew.

Best,
Kurt


On Tue, Jan 14, 2020 at 5:09 AM Eva Eva 
wrote:

> Hi Kurt,
>
> Assuming I'm joining two tables, "latestListings" and "latestAgents" like
> below:
>
> "SELECT * FROM latestListings l " +
> "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA " +
> "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA " +
> "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA " +
>
>
> In order to avoid joining on NULL keys, are you suggesting that I change
> the query as below:
>
> "SELECT * FROM latestListings l " +
> "LEFT JOIN latestAgents aa ON l.listAgentKeyL = aa.ucPKA AND 
> l.listAgentKeyL IS NOT NULL " +
>
> "LEFT JOIN latestAgents ab ON l.buyerAgentKeyL = ab.ucPKA AND 
> l.buyerAgentKeyL IS NOT NULL " +
>
> "LEFT JOIN latestAgents ac ON l.coListAgentKeyL = ac.ucPKA AND 
> l.coListAgentKeyL IS NOT NULL" +
>
>
> I tried this but noticed that it didn't work as the data skew (and heavy load 
> on one task) continued. Could you please let me know if I missed anything?
>
>
> Thanks,
>
> Eva
>
>
> On Sun, Jan 12, 2020 at 8:44 PM Kurt Young  wrote:
>
>> Hi,
>>
>> You can try to filter NULL values with an explicit condition like "
>> is not NULL".
>>
>> Best,
>> Kurt
>>
>>
>> On Sat, Jan 11, 2020 at 4:10 AM Eva Eva 
>> wrote:
>>
>>> Thank you both for the suggestions.
>>> I did a bit more analysis using UI and identified at least one
>>> problem that's occurring with the job rn. Going to fix it first and then
>>> take it from there.
>>>
>>> *Problem that I identified:*
>>> I'm running with 26 parallelism. For the checkpoints that are expiring,
>>> one of a JOIN operation is finishing at 25/26 (96%) progress with
>>> corresponding SubTask:21 has "n/a" value. For the same operation I also
>>> noticed that the load is distributed poorly with heavy load being fed to
>>> SubTask:21.
>>> My guess is bunch of null values are happening for this JOIN operation
>>> and being put into the same task.
>>> Currently I'm using SQL query which gives me limited control on handling
>>> null values so I'll try to programmatically JOIN and see if I can avoid
>>> JOIN operation whenever the joining value is null. This should help with
>>> better load distribution across subtasks. And may also fix expiring
>>> checkpointing issue.
>>>
>>> Thanks for the guidance.
>>> Eva.
>>>
>>> On Fri, Jan 10, 2020 at 7:44 AM Congxian Qiu 
>>> wrote:
>>>
>>>> Hi
>>>>
>>>> For expired checkpoint, you can find something like " Checkpoint xxx of
>>>> job xx expired before completing" in jobmanager.log, then you can go to the
>>>> checkpoint UI to find which tasks did not ack, and go to these tasks to see
>>>> what happened.
>>>>
>>>> If checkpoint was been declined, you can find something like "Decline
>>>> checkpoint xxx by task xxx of job xxx at xxx." in jobmanager.log, in this
>>>> case, you can go to the task directly to find out why the checkpoint 
>>>> failed.
>>>>
>>>> Best,
>>>> Congxian
>>>>
>>>>
>>>> Yun Tang  于2020年1月10日周五 下午7:31写道:
>>>>
>>>>> Hi Eva
>>>>>
>>>>> If checkpoint failed, please view the web UI or jobmanager log to see
>>>>> why checkpoint failed, might be declined by some specific task.
>>>>>
>>>>> If checkpoint expired, you can also access the web UI to see which
>>>>> tasks did not respond in time, some hot task might not be able to respond
>>>>> in time. Generally speaking, checkpoint expired is mostly caused by back
>>>>> pressure which led the checkpoint barrier did not arrive in time. Resolve
>>>>> the back pressure could help the checkpoint finished before timeout.
>>>>>
>>>>> I think the doc of monitoring web UI for checkpoint [1] and back
>>>>> pressure [2] could help you.
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitor

Re: Flink向量化读取parquet

2020-01-13 Thread Kurt Young
据我所知,已经有这部分的计划了,不出意外的话应该会在 1.11 版本发布:
https://issues.apache.org/jira/browse/FLINK-11899

Best,
Kurt


On Mon, Jan 13, 2020 at 7:50 PM faaron zheng  wrote:

>
> flink使用的是hadoop中的parquetfilereader,这个貌似不支持向量化读取,hive和spark目前都支持向量化读取,请加一下flink有什么计划吗?
>


Re: Please suggest helpful tools

2020-01-12 Thread Kurt Young
Hi,

You can try to filter NULL values with an explicit condition like " is
not NULL".

Best,
Kurt


On Sat, Jan 11, 2020 at 4:10 AM Eva Eva 
wrote:

> Thank you both for the suggestions.
> I did a bit more analysis using UI and identified at least one
> problem that's occurring with the job rn. Going to fix it first and then
> take it from there.
>
> *Problem that I identified:*
> I'm running with 26 parallelism. For the checkpoints that are expiring,
> one of a JOIN operation is finishing at 25/26 (96%) progress with
> corresponding SubTask:21 has "n/a" value. For the same operation I also
> noticed that the load is distributed poorly with heavy load being fed to
> SubTask:21.
> My guess is bunch of null values are happening for this JOIN operation and
> being put into the same task.
> Currently I'm using SQL query which gives me limited control on handling
> null values so I'll try to programmatically JOIN and see if I can avoid
> JOIN operation whenever the joining value is null. This should help with
> better load distribution across subtasks. And may also fix expiring
> checkpointing issue.
>
> Thanks for the guidance.
> Eva.
>
> On Fri, Jan 10, 2020 at 7:44 AM Congxian Qiu 
> wrote:
>
>> Hi
>>
>> For expired checkpoint, you can find something like " Checkpoint xxx of
>> job xx expired before completing" in jobmanager.log, then you can go to the
>> checkpoint UI to find which tasks did not ack, and go to these tasks to see
>> what happened.
>>
>> If checkpoint was been declined, you can find something like "Decline
>> checkpoint xxx by task xxx of job xxx at xxx." in jobmanager.log, in this
>> case, you can go to the task directly to find out why the checkpoint failed.
>>
>> Best,
>> Congxian
>>
>>
>> Yun Tang  于2020年1月10日周五 下午7:31写道:
>>
>>> Hi Eva
>>>
>>> If checkpoint failed, please view the web UI or jobmanager log to see
>>> why checkpoint failed, might be declined by some specific task.
>>>
>>> If checkpoint expired, you can also access the web UI to see which tasks
>>> did not respond in time, some hot task might not be able to respond in
>>> time. Generally speaking, checkpoint expired is mostly caused by back
>>> pressure which led the checkpoint barrier did not arrive in time. Resolve
>>> the back pressure could help the checkpoint finished before timeout.
>>>
>>> I think the doc of monitoring web UI for checkpoint [1] and back
>>> pressure [2] could help you.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/checkpoint_monitoring.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/back_pressure.html
>>>
>>> Best
>>> Yun Tang
>>> --
>>> *From:* Eva Eva 
>>> *Sent:* Friday, January 10, 2020 10:29
>>> *To:* user 
>>> *Subject:* Please suggest helpful tools
>>>
>>> Hi,
>>>
>>> I'm running Flink job on 1.9 version with blink planner.
>>>
>>> My checkpoints are timing out intermittently, but as state grows they
>>> are timing out more and more often eventually killing the job.
>>>
>>> Size of the state is large with Minimum=10.2MB and Maximum=49GB (this
>>> one is accumulated due to prior failed ones), Average=8.44GB.
>>>
>>> Although size is huge, I have enough space on EC2 instance in which I'm
>>> running job. I'm using RocksDB for checkpointing.
>>>
>>> *Logs does not have any useful information to understand why checkpoints
>>> are expiring/failing, can someone please point me to tools that can be used
>>> to investigate and understand why checkpoints are failing.*
>>>
>>> Also any other related suggestions are welcome.
>>>
>>>
>>> Thanks,
>>> Reva.
>>>
>>


Re: 疑似ParquetTableSource Filter Pushdown bug

2020-01-08 Thread Kurt Young
如果是优化器一直卡住不能退出,那基本肯定是BUG了。请开一个issue把这些信息上传上去吧,我们会调查一下是什么问题导致的。

Best,
Kurt


On Wed, Jan 8, 2020 at 5:12 PM jun su  wrote:

> 添加代码文字:
>
> def main(args: Array[String]): Unit = {
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tableEnv = StreamTableEnvironment.create(env)
>
> val schema = 
> "{\"type\":\"record\",\"name\":\"root\",\"fields\":[{\"name\":\"log_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"city\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"log_from\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ip\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"type\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"data_source\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"is_scan\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"result\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timelong\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"is_sec\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"time_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"device\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timestamp_string\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"occur_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},{\"name\":\"row_time\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null}]}"
> val parquetTableSource: ParquetTableSource = ParquetTableSource
> .builder
> .forParquetSchema(new 
> org.apache.parquet.avro.AvroSchemaConverter().convert(
> org.apache.avro.Schema.parse(schema, true)))
> .path("/Users/sujun/Documents/tmp/login_data")
> .build
>
> tableEnv.registerTableSource("source",parquetTableSource)
>
>
> val t1 = tableEnv.sqlQuery("select log_id,city from source where city = 
> '274' ")
> tableEnv.registerTable("t1",t1)
>
> val t4 = tableEnv.sqlQuery("select * from t1 where 
> log_id='5927070661978133'")
> t1.toAppendStream[Row].print()
>
> env.execute()
>
> }
>
>
> jun su  于2020年1月8日周三 下午4:59写道:
>
>> 你好:
>>我在使用ParquetTableSource时, 发现一些问题, 疑似是ParquetTableSource Filter
>> Pushdown的Bug, 以下是代码和描述:
>>
>> [image: 1578473593933.jpg]
>>
>> debug发现,
>> 代码卡在了: org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp方法, while
>> true循环一直出不来, 知道整合程序OOM
>>
>> [image: 1.jpg]
>>
>> 将ParquetTableSource的filter pushdown代码去掉后 , 主程序可以执行.
>> 怀疑是calcite的优化器在迭代找代价最小的plan时一直无法退出导致的
>>
>


Re: Flink SQL Count Distinct performance optimization

2020-01-07 Thread Kurt Young
Hi,

Could you try to find out what's the bottleneck of your current job? This
would leads to
different optimizations. Such as whether it's CPU bounded, or you have too
big local
state thus stuck by too many slow IOs.

Best,
Kurt


On Wed, Jan 8, 2020 at 3:53 PM 贺小令  wrote:

> hi sunfulin,
> you can try with blink planner (since 1.9 +), which optimizes distinct
> aggregation. you can also try to enable
> *table.optimizer.distinct-agg.split.enabled* if the data is skew.
>
> best,
> godfreyhe
>
> sunfulin  于2020年1月8日周三 下午3:39写道:
>
>> Hi, community,
>> I'm using Apache Flink SQL to build some of my realtime streaming apps.
>> With one scenario I'm trying to count(distinct deviceID) over about 100GB
>> data set in realtime, and aggregate results with sink to ElasticSearch
>> index. I met a severe performance issue when running my flink job. Wanner
>> get some help from community.
>>
>>
>> Flink version : 1.8.2
>> Running on yarn with 4 yarn slots per task manager. My flink task
>> parallelism is set to be 10, which is equal to my kafka source partitions.
>> After running the job, I can observe high backpressure from the flink
>> dashboard. Any suggestions and kind of help is highly appreciated.
>>
>>
>> running sql is like the following:
>>
>>
>> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>>
>> select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as
>> clkCnt  from
>>
>> (
>>
>> SELECT
>>
>>  aggId,
>>
>>  pageId,
>>
>>  statkey,
>>
>>  COUNT(DISTINCT deviceId) as cnt
>>
>>  FROM
>>
>>  (
>>
>>  SELECT
>>
>>  'ZL_005' as aggId,
>>
>>  'ZL_UV_PER_MINUTE' as pageId,
>>
>>  deviceId,
>>
>>  ts2Date(recvTime) as statkey
>>
>>  from
>>
>>  kafka_zl_etrack_event_stream
>>
>>  )
>>
>>  GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)
>>
>> ) as t1
>>
>> group by aggId, pageId, statkey
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Best
>
>


Re: Flink SQL Count Distinct performance optimization

2020-01-07 Thread Kurt Young
Hi,

Could you try to find out what's the bottleneck of your current job? This
would leads to
different optimizations. Such as whether it's CPU bounded, or you have too
big local
state thus stuck by too many slow IOs.

Best,
Kurt


On Wed, Jan 8, 2020 at 3:53 PM 贺小令  wrote:

> hi sunfulin,
> you can try with blink planner (since 1.9 +), which optimizes distinct
> aggregation. you can also try to enable
> *table.optimizer.distinct-agg.split.enabled* if the data is skew.
>
> best,
> godfreyhe
>
> sunfulin  于2020年1月8日周三 下午3:39写道:
>
>> Hi, community,
>> I'm using Apache Flink SQL to build some of my realtime streaming apps.
>> With one scenario I'm trying to count(distinct deviceID) over about 100GB
>> data set in realtime, and aggregate results with sink to ElasticSearch
>> index. I met a severe performance issue when running my flink job. Wanner
>> get some help from community.
>>
>>
>> Flink version : 1.8.2
>> Running on yarn with 4 yarn slots per task manager. My flink task
>> parallelism is set to be 10, which is equal to my kafka source partitions.
>> After running the job, I can observe high backpressure from the flink
>> dashboard. Any suggestions and kind of help is highly appreciated.
>>
>>
>> running sql is like the following:
>>
>>
>> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>>
>> select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as
>> clkCnt  from
>>
>> (
>>
>> SELECT
>>
>>  aggId,
>>
>>  pageId,
>>
>>  statkey,
>>
>>  COUNT(DISTINCT deviceId) as cnt
>>
>>  FROM
>>
>>  (
>>
>>  SELECT
>>
>>  'ZL_005' as aggId,
>>
>>  'ZL_UV_PER_MINUTE' as pageId,
>>
>>  deviceId,
>>
>>  ts2Date(recvTime) as statkey
>>
>>  from
>>
>>  kafka_zl_etrack_event_stream
>>
>>  )
>>
>>  GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)
>>
>> ) as t1
>>
>> group by aggId, pageId, statkey
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Best
>
>


Re: 注册table时catalog无法变更

2020-01-07 Thread Kurt Young
临时表的话只能放在指定的catalog中,不建议将临时表注册到另一个catalog,比如hive catalog。
临时表大部分情况下是不能序列化的,那样的话代码会报错。

Best,
Kurt


On Tue, Jan 7, 2020 at 9:20 PM 贺小令  wrote:

> hi,
>
> streamTableEnvironment.registerDataStream(tableName, dataStream, fields);
> 注册的表都是Temporary Table。
>
> 你可以通过:
> catalog = new InMemoryExternalCatalog(catalogName);
> streamTableEnvironment.registerCatalog(catalogName, catalog);
> catalog.createTable()
>
> 或者
> streamTableEnvironment.getCatalog().get().createTable()
>
> 的方式来注册表到指定的catalog
>
>
> xiyu...@163.com  于2020年1月7日周二 下午3:20写道:
>
> > hi,各位:
> >
> >
> 我在开发过程中,通过下面方式注册table时,默认使用的catalog是EnvironmentSettings.DEFAULT_BUILTIN_CATALOG
> > streamTableEnvironment.registerDataStream(tableName, dataStream,
> >
> fields);尝试通过下面方式解决,但是注册的table仍然在EnvironmentSettings.DEFAULT_BUILTIN_CATALOG中
> >   streamTableEnvironment.registerCatalog(catalogName, new
> > InMemoryExternalCatalog(catalogName));
> > streamTableEnvironment.useCatalog(catalogName);请问,我如何将table注册到指定的catalog?
> >
> >
> > xiyu...@163.com
> >
>


Re: Controlling the Materialization of JOIN updates

2020-01-05 Thread Kurt Young
Good to hear that the patch resolved your issue, looking forward to hearing
more feedback from you!

Best,
Kurt


On Mon, Jan 6, 2020 at 5:56 AM Benoît Paris <
benoit.pa...@centraliens-lille.org> wrote:

> Hi Kurt,
>
> Thank you for your answer.
>
> Yes both fact tables and dimension tables are changing over time; it was
> to illustrate that they could change at the same time but that we could
> still make a JOIN basically ignore updates from one specified side. The SQL
> is not the actual one I'm using, and as you have said later on, I indeed
> don't deal with a time attribute and just want what's in the table at
> processing time.
>
> At the moment my problem seems to be in good way of being resolved, and it
> is going to be Option 4: "LATERAL TABLE table_function" on the Blink
> planner; as Jark Wu seems to be -elegantly- providing a patch for the
> FLINK-14200 NPE bug:
> https://github.com/apache/flink/pull/10763
> It was indeed about shenanigans on finding the proper RelOptSchema;  Ah,
> I wish I had dived sooner in the source code, and I could have had the
> pleasure opportunity to contribute to the Flink codebase.
>
> Anyway, shout out to Jark for resolving the bug and providing a patch! I
> believe this will be a real enabler for CQRS architectures on Flink (we had
> subscriptions with regular joins, and this patch enables querying the same
> thing with very minor SQL modifications)
>
> Kind regards
> Benoît
>
>
> On Sat, Jan 4, 2020 at 4:22 AM Kurt Young  wrote:
>
>> Hi Benoît,
>>
>> Before discussing all the options you listed, I'd like understand more
>> about your requirements.
>>
>> The part I don't fully understand is, both your fact (Event) and
>> dimension (DimensionAtJoinTimeX) tables are
>> coming from the same table, Event or EventRawInput in your case. So it
>> will result that both your fact and
>> dimension tables are changing with time.
>>
>> My understanding is, when your DimensionAtJoinTimeX table emit the
>> results, you don't want to change the
>> result again. You want the fact table only join whatever data currently
>> the dimension table have? I'm asking
>> because your dimension table was calculated with a window aggregation,
>> but your join logic seems doesn't
>> care about the time attribute (LEFT JOIN DimensionAtJoinTime1 d1 ON e.uid
>> = d1.uid). It's possible that
>> when a record with uid=x comes from Event table, but the dimension table
>> doesn't have any data around
>> uid=x yet due to the window aggregation. In this case, you don't want
>> them to join?
>>
>> Best,
>> Kurt
>>
>>
>> On Fri, Jan 3, 2020 at 1:11 AM Benoît Paris <
>> benoit.pa...@centraliens-lille.org> wrote:
>>
>>> Hello all!
>>>
>>> I'm trying to design a stream pipeline, and have trouble controlling
>>> when a JOIN is triggering an update:
>>>
>>> Setup:
>>>
>>>- The Event table; "probe side", "query side", the result of earlier
>>>stream processing
>>>- The DimensionAtJoinTimeX tables; of updating nature, "build side",
>>>the results of earlier stream processing
>>>
>>> Joining them:
>>>
>>> SELECT*
>>> FROM  Event e
>>> LEFT JOIN DimensionAtJoinTime1 d1
>>>   ON  e.uid = d1.uid
>>> LEFT JOIN DimensionAtJoinTime2 d2
>>>   ON  e.uid = d2.uid
>>>
>>> The DimensionAtJoinTimeX Tables being the result of earlier stream
>>> processing, possibly from the same Event table:
>>>
>>> SELECT   uid,
>>>  hop_start(...),
>>>  sum(...)
>>> FROM Event e
>>> GROUP BY uid,
>>>  hop(...)
>>>
>>> The Event Table being:
>>>
>>> SELECT ...
>>> FROM   EventRawInput i
>>> WHERE  i.some_field = 'some_value'
>>>
>>> Requirements:
>>>
>>>- I need the JOINs to only be executed once, only when a new line is
>>>appended to the probe / query / Event table.
>>>- I also need the full pipeline to be defined in SQL.
>>>- I very strongly prefer the Blink planner (mainly for
>>>Deduplication, TopN and LAST_VALUE features).
>>>
>>> Problem exploration so far:
>>>
>>>- Option 1, "FOR SYSTEM_TIME AS OF" [1]: I need to have the solution
>>>in SQL: it doesn't work out. But I might explore the following: insert
>>>DimensionAtJoinTimeX into a special Sink, and use it in a
&g

Re: Duplicate tasks for the same query

2020-01-05 Thread Kurt Young
Another common skew case we've seen is null handling, the value of the join
key
is NULL. We will shuffle the NULL value into one task even if the join
condition
won't stand by definition.

For DeDuplication, I just want to make sure this behavior meets your
requirement.
Because for some other usages, users might be only interested with the
earliest
records because the updating for the same key is purely redundant, like
caused by
upstream failure and process the same data again. In that case, each key
will only have
at most one record and you won't face any join key skewing issue.

Best,
Kurt


On Mon, Jan 6, 2020 at 6:55 AM RKandoji  wrote:

> Hi Kurt,
>
> I understand what you mean, some userIds may appear more frequently than
> the others but this distribution doesn't look in proportionate with the
> data skew. Do you think of any other possible reasons or anything I can try
> out to investigate this more?
>
> For DeDuplication, I query for the latest record. Sorry I didn't follow
> above sentence, do you mean that for each update to user table the
> record(s) that were updated will be sent via retract stream.I think that's
> expected as I need to process latest records, as long as it is sending only
> the record(s) that's been updated.
>
> Thanks,
> RKandoji
>
> On Fri, Jan 3, 2020 at 9:57 PM Kurt Young  wrote:
>
>> Hi RKandoji,
>>
>> It looks like you have a data skew issue with your input data. Some or
>> maybe only one "userId" appears more frequent than others. For join
>> operator to work correctly, Flink will apply "shuffle by join key" before
>> the
>> operator, so same "userId" will go to the same sub-task to perform join
>> operation. In this case, I'm afraid there is nothing much you can do for
>> now.
>>
>> BTW, for the DeDuplicate, do you keep the latest record or the earliest?
>> If
>> you keep the latest version, Flink will tigger retraction and then send
>> the latest
>> record again every time when your user table changes.
>>
>> Best,
>> Kurt
>>
>>
>> On Sat, Jan 4, 2020 at 5:09 AM RKandoji  wrote:
>>
>>> Hi,
>>>
>>> Thanks a ton for the help with earlier questions, I updated code to
>>> version 1.9 and started using Blink Planner (DeDuplication). This is
>>> working as expected!
>>>
>>> I have a new question, but thought of asking in the same email chain as
>>> this has more context about my use case etc.
>>>
>>> Workflow:
>>> Currently I'm reading from a couple of Kafka topics, DeDuplicating the
>>> input data, performing JOINs and writing the joined data to another Kafka
>>> topic.
>>>
>>> Issue:
>>> I set Parallelism to 8 and on analyzing the subtasks found that the data
>>> is not distributed well among 8 parallel tasks for the last Join query. One
>>> of a subtask is taking huge load, whereas others taking pretty low load.
>>>
>>> Tried a couple of things below, but no use. Not sure if they are
>>> actually related to the problem as I couldn't yet understand what's the
>>> issue here.
>>> 1. increasing the number of partitions of output Kafka topic.
>>> 2. tried adding keys to output so key partitioning happens at Kafka end.
>>>
>>> Below is a snapshot for reference:
>>> [image: image.png]
>>>
>>> Below are the config changes I made:
>>>
>>> taskmanager.numberOfTaskSlots: 8
>>> parallelism.default: 8
>>> jobmanager.heap.size: 5000m
>>> taskmanager.heap.size: 5000m
>>> state.backend: rocksdb
>>> state.checkpoints.dir: file:///Users/username/flink-1.9.1/checkpoints
>>> state.backend.incremental: true
>>>
>>> I don't see any errors and job seems to be running smoothly (and
>>> slowly). I need to make it distribute the load well for faster processing,
>>> any pointers on what could be wrong and how to fix it would be very helpful.
>>>
>>> Thanks,
>>> RKandoji
>>>
>>>
>>> On Fri, Jan 3, 2020 at 1:06 PM RKandoji  wrote:
>>>
>>>> Thanks!
>>>>
>>>> On Thu, Jan 2, 2020 at 9:45 PM Jingsong Li 
>>>> wrote:
>>>>
>>>>> Yes,
>>>>>
>>>>> 1.9.2 or Coming soon 1.10
>>>>>
>>>>> Best,
>>>>> Jingsong Lee
>>>>>
>>>>> On Fri, Jan 3, 2020 at 12:43 AM RKandoji  wrote:
>>>>>
>>>>>> Ok thanks, does it mean version 1.9.2 is what I need to use?
>>

Re: Controlling the Materialization of JOIN updates

2020-01-03 Thread Kurt Young
Hi Benoît,

Before discussing all the options you listed, I'd like understand more
about your requirements.

The part I don't fully understand is, both your fact (Event) and dimension
(DimensionAtJoinTimeX) tables are
coming from the same table, Event or EventRawInput in your case. So it will
result that both your fact and
dimension tables are changing with time.

My understanding is, when your DimensionAtJoinTimeX table emit the results,
you don't want to change the
result again. You want the fact table only join whatever data currently the
dimension table have? I'm asking
because your dimension table was calculated with a window aggregation, but
your join logic seems doesn't
care about the time attribute (LEFT JOIN DimensionAtJoinTime1 d1 ON e.uid =
d1.uid). It's possible that
when a record with uid=x comes from Event table, but the dimension table
doesn't have any data around
uid=x yet due to the window aggregation. In this case, you don't want them
to join?

Best,
Kurt


On Fri, Jan 3, 2020 at 1:11 AM Benoît Paris <
benoit.pa...@centraliens-lille.org> wrote:

> Hello all!
>
> I'm trying to design a stream pipeline, and have trouble controlling when
> a JOIN is triggering an update:
>
> Setup:
>
>- The Event table; "probe side", "query side", the result of earlier
>stream processing
>- The DimensionAtJoinTimeX tables; of updating nature, "build side",
>the results of earlier stream processing
>
> Joining them:
>
> SELECT*
> FROM  Event e
> LEFT JOIN DimensionAtJoinTime1 d1
>   ON  e.uid = d1.uid
> LEFT JOIN DimensionAtJoinTime2 d2
>   ON  e.uid = d2.uid
>
> The DimensionAtJoinTimeX Tables being the result of earlier stream
> processing, possibly from the same Event table:
>
> SELECT   uid,
>  hop_start(...),
>  sum(...)
> FROM Event e
> GROUP BY uid,
>  hop(...)
>
> The Event Table being:
>
> SELECT ...
> FROM   EventRawInput i
> WHERE  i.some_field = 'some_value'
>
> Requirements:
>
>- I need the JOINs to only be executed once, only when a new line is
>appended to the probe / query / Event table.
>- I also need the full pipeline to be defined in SQL.
>- I very strongly prefer the Blink planner (mainly for Deduplication,
>TopN and LAST_VALUE features).
>
> Problem exploration so far:
>
>- Option 1, "FOR SYSTEM_TIME AS OF" [1]: I need to have the solution
>in SQL: it doesn't work out. But I might explore the following: insert
>DimensionAtJoinTimeX into a special Sink, and use it in a
>LookupableTableSource (I'm at a loss on how to do that, though. Do I need
>an external kv store?).
>- Option 2, "FOR SYSTEM_TIME AS OF" [1], used in SQL: Is there a
>version of "FOR SYSTEM_TIME AS OF" readily usable in SQL? I might have
>missed something in the documentation.
>- Option 3, "LATERAL TABLE table_function" [2], on the Legacy planner:
>It does not work with two tables [3], and I don't get to have the Blink
>planner features.
>- Option 4, "LATERAL TABLE table_function" [2], on the Blink planner:
>It does not work with the "probe side" being the results of earlier stream
>processing [4].
>- Option 5, let a regular JOIN materialize the updates, and somehow
>find how to filter the ones coming from the build sides (I'm at a loss on
>how to do that).
>- Option 6, "TVR": I read this paper [5], which mentions "Time-Varying
>Relation"s; Speculating here: could there be a way, to say that the build
>side is not a TVR. Aka declare the stream as being somehow "static", while
>still being updated (but I guess we're back to "FOR SYSTEM_TIME AS OF").
>- Option 7: Is there some features being developed, or hints, or
>workarounds to control the JOIN updates that I have not considered so far?
>- Remark 1: I believe that FLINK-15112 and FLINK-14200 are of the same
>bug nature, even though they occur in different situations on different
>planners (same Exception Stack Trace on files that have the same historical
>parent before the Blink fork). FLINK-15112 has a workaround, but
>FLINK-14200 does not. The existence of that workaround IMHO signals that
>there is a simple fix for both bugs. I have tried to find it in Flink for a
>few days, but no success so far. If you guys have pointers helping me
>provide a fix, I'll gladly listen. So far I have progressed to: It revolves
>around Calcite-based Flink streaming rules transforming a temporal table
>function correlate into a Join on 2*Scan, and crashes when it encounters
>something that is not a table that can be readily scanned. Also, there are
>shenanigans on trying to find the right schema in the Catalog. But I am
>blocked now, and not accustomed to the Flink internal code (would like to
>though, if Alibaba/Ververica are recruiting remote workers, wink wink,
>nudge nudge).
>
> All opinions very much welcomed on all Options and Remarks!
>
> Cheers, and a 

Re: Flink group with time-windowed join

2020-01-03 Thread Kurt Young
Looks like a bug to me, could you fire an issue for this?

Best,
Kurt


On Thu, Jan 2, 2020 at 9:06 PM jeremyji <18868129...@163.com> wrote:

> Two stream as table1, table2. We know that group with regular join won't
> work
> so we have to use time-windowed join. So here is my flink sql looks like:
>
> *SELECT
> a.account account,
> SUM(a.value) + SUM(b.value),
> UNIX_TIMESTAMP(TUMBLE_START(a.producer_timestamp, INTERVAL '3'
> MINUTE))
> FROM
> (SELECT
> account,
> value,
> producer_timestamp
> FROM
> table1) a,
> (SELECT
> account,
> value,
> producer_timestamp
> FROM
> table2) b
> WHERE
> a.account = b.account AND
> a.producer_timestamp BETWEEN b.producer_timestamp - INTERVAL '3'
> MINUTE AND b.producer_timestamp)
> group by
> a.account,
> TUMBLE(a.producer_timestamp, INTERVAL '3' MINUTE)*
> But i still got error from flink:
>
> /Rowtime attributes must not be in the input rows of a regular join. As a
> workaround you can cast the time attributes of input tables to TIMESTAMP
> before.
> Please check the documentation for the set of currently supported SQL
> features.
> at
>
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:450)
> at
>
> org.apache.flink.table.api.TableEnvironment.optimizePhysicalPlan(TableEnvironment.scala:369)
> at
>
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:814)
> at
>
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
> at
>
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
> at
>
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:1048)
> at
>
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:962)
> at
>
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:922)
> /
> I think i use time-windowed join just like this doc
> says:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#time-windowed-joins
> .
> But flink told me its a regular join. Is there anything wrong i haven't
> notice?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Duplicate tasks for the same query

2020-01-03 Thread Kurt Young
Hi RKandoji,

It looks like you have a data skew issue with your input data. Some or
maybe only one "userId" appears more frequent than others. For join
operator to work correctly, Flink will apply "shuffle by join key" before
the
operator, so same "userId" will go to the same sub-task to perform join
operation. In this case, I'm afraid there is nothing much you can do for
now.

BTW, for the DeDuplicate, do you keep the latest record or the earliest? If
you keep the latest version, Flink will tigger retraction and then send the
latest
record again every time when your user table changes.

Best,
Kurt


On Sat, Jan 4, 2020 at 5:09 AM RKandoji  wrote:

> Hi,
>
> Thanks a ton for the help with earlier questions, I updated code to
> version 1.9 and started using Blink Planner (DeDuplication). This is
> working as expected!
>
> I have a new question, but thought of asking in the same email chain as
> this has more context about my use case etc.
>
> Workflow:
> Currently I'm reading from a couple of Kafka topics, DeDuplicating the
> input data, performing JOINs and writing the joined data to another Kafka
> topic.
>
> Issue:
> I set Parallelism to 8 and on analyzing the subtasks found that the data
> is not distributed well among 8 parallel tasks for the last Join query. One
> of a subtask is taking huge load, whereas others taking pretty low load.
>
> Tried a couple of things below, but no use. Not sure if they are actually
> related to the problem as I couldn't yet understand what's the issue here.
> 1. increasing the number of partitions of output Kafka topic.
> 2. tried adding keys to output so key partitioning happens at Kafka end.
>
> Below is a snapshot for reference:
> [image: image.png]
>
> Below are the config changes I made:
>
> taskmanager.numberOfTaskSlots: 8
> parallelism.default: 8
> jobmanager.heap.size: 5000m
> taskmanager.heap.size: 5000m
> state.backend: rocksdb
> state.checkpoints.dir: file:///Users/username/flink-1.9.1/checkpoints
> state.backend.incremental: true
>
> I don't see any errors and job seems to be running smoothly (and slowly).
> I need to make it distribute the load well for faster processing, any
> pointers on what could be wrong and how to fix it would be very helpful.
>
> Thanks,
> RKandoji
>
>
> On Fri, Jan 3, 2020 at 1:06 PM RKandoji  wrote:
>
>> Thanks!
>>
>> On Thu, Jan 2, 2020 at 9:45 PM Jingsong Li 
>> wrote:
>>
>>> Yes,
>>>
>>> 1.9.2 or Coming soon 1.10
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Fri, Jan 3, 2020 at 12:43 AM RKandoji  wrote:
>>>
>>>> Ok thanks, does it mean version 1.9.2 is what I need to use?
>>>>
>>>> On Wed, Jan 1, 2020 at 10:25 PM Jingsong Li 
>>>> wrote:
>>>>
>>>>> Blink planner was introduced in 1.9. We recommend use blink planner
>>>>> after 1.9.
>>>>> After some bug fix, I think the latest version of 1.9 is OK. The
>>>>> production environment has also been set up in some places.
>>>>>
>>>>> Best,
>>>>> Jingsong Lee
>>>>>
>>>>> On Wed, Jan 1, 2020 at 3:24 AM RKandoji  wrote:
>>>>>
>>>>>> Thanks Jingsong and Kurt for more details.
>>>>>>
>>>>>> Yes, I'm planning to try out DeDuplication when I'm done upgrading to
>>>>>> version 1.9. Hopefully deduplication is done by only one task and reused
>>>>>> everywhere else.
>>>>>>
>>>>>> One more follow-up question, I see "For production use cases, we
>>>>>> recommend the old planner that was present before Flink 1.9 for now." 
>>>>>> warning
>>>>>> here
>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/
>>>>>> This is actually the reason why started with version 1.8, could you
>>>>>> please let me know your opinion about this? and do you think there is any
>>>>>> production code running on version 1.9
>>>>>>
>>>>>> Thanks,
>>>>>> Reva
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Dec 30, 2019 at 9:02 PM Kurt Young  wrote:
>>>>>>
>>>>>>> BTW, you could also have a more efficient version of deduplicating
>>>>>>> user table by using the topn feature [1].
>>>>>>>
>>>>>>> Best,
>>>>>>>

Re: Duplicate tasks for the same query

2019-12-30 Thread Kurt Young
BTW, you could also have a more efficient version of deduplicating
user table by using the topn feature [1].

Best,
Kurt

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#top-n


On Tue, Dec 31, 2019 at 9:24 AM Jingsong Li  wrote:

> Hi RKandoji,
>
> In theory, you don't need to do something.
> First, the optimizer will optimize by doing duplicate nodes.
> Second, after SQL optimization, if the optimized plan still has duplicate
> nodes, the planner will automatically reuse them.
> There are config options to control whether we should reuse plan, their
> default value is true. So you don't need modify them.
> - table.optimizer.reuse-sub-plan-enabled
> - table.optimizer.reuse-source-enabled
>
> Best,
> Jingsong Lee
>
> On Tue, Dec 31, 2019 at 6:29 AM RKandoji  wrote:
>
>> Thanks Terry and Jingsong,
>>
>> Currently I'm on 1.8 version using Flink planner for stream proessing,
>> I'll switch to 1.9 version to try out blink planner.
>>
>> Could you please point me to any examples (Java preferred) using
>> SubplanReuser?
>>
>> Thanks,
>> RK
>>
>> On Sun, Dec 29, 2019 at 11:32 PM Jingsong Li 
>> wrote:
>>
>>> Hi RKandoji,
>>>
>>> FYI: Blink-planner subplan reusing: [1] 1.9 available.
>>>
>>>Join  Join
>>>  /  \  /  \
>>>  Filter1  Filter2  Filter1  Filter2
>>> ||=>   \ /
>>>  Project1 Project2Project1
>>> ||   |
>>>   Scan1Scan2   Scan1
>>>
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Mon, Dec 30, 2019 at 12:28 PM Terry Wang  wrote:
>>>
 Hi RKandoji~

 Could you provide more info about your poc environment?
 Stream or batch? Flink planner or blink planner?
 AFAIK, blink planner has done some optimization to deal such duplicate
 task for one same query. You can have a try with blink planner :
 https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment

 Best,
 Terry Wang



 2019年12月30日 03:07,RKandoji  写道:

 Hi Team,

 I'm doing a POC with flink to understand if it's a good fit for my use
 case.

 As part of the process, I need to filter duplicate items and created
 below query to get only the latest records based on timestamp. For
 instance, I have "Users" table which may contain multiple messages for the
 same "userId". So I wrote below query to get only the latest message for a
 given "userId"

 Table uniqueUsers = tEnv.sqlQuery("SELECT * FROM Users WHERE (userId,
 userTimestamp) IN (SELECT userId, MAX(userTimestamp) FROM Users GROUP BY
 userId)");

 The above query works as expected and contains only the latest users
 based on timestamp.

 The issue is when I use "uniqueUsers" table multiple times in a JOIN
 operation, I see multiple tasks in the flink dashboard for the same query
 that is creating "uniqueUsers" table. It is simply creating as many tasks
 as many times I'm using the table.

 Below is the JOIN query.
 tEnv.registerTable("uniqueUsersTbl", uniqueUsers);
 Table joinOut = tEnv.sqlQuery("SELECT * FROM Car c
LEFT JOIN uniqueUsersTbl aa ON
 c.userId = aa.userId
LEFT JOIN uniqueUsersTbl ab
 ON c.ownerId = ab.userId
LEFT JOIN uniqueUsersTbl ac ON
 c.sellerId = ac.userId
LEFT JOIN uniqueUsersTbl ad
 ON c.buyerId = ad.userId");

 Could someone please help me understand how I can avoid these duplicate
 tasks?


 Thanks,
 R Kandoji



>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>
> --
> Best, Jingsong Lee
>


Re: Flink SQL + savepoint

2019-12-30 Thread Kurt Young
 I created a issue to trace this feature:
https://issues.apache.org/jira/browse/FLINK-15440

Best,
Kurt

On Tue, Dec 31, 2019 at 8:00 AM Fanbin Bu  wrote:

> Kurt,
>
> Is there any update on this or roadmap that supports savepoints with Flink
> SQL?
>
> On Sun, Nov 3, 2019 at 11:25 PM Kurt Young  wrote:
>
>> It's not possible for SQL and Table API jobs playing with savepoints yet,
>> but I
>> think this is a popular requirement and we should definitely discuss the
>> solutions
>> in the following versions.
>>
>> Best,
>> Kurt
>>
>> On Sat, Nov 2, 2019 at 7:24 AM Fanbin Bu  wrote:
>>
>>> Kurt,
>>>
>>> What do you recommend for Flink SQL to use savepoints?
>>>
>>>
>>>
>>> On Thu, Oct 31, 2019 at 12:03 AM Yun Tang  wrote:
>>>
>>>> Hi Fanbin
>>>>
>>>>
>>>>
>>>> If you do not change the parallelism or add and remove operators, you
>>>> could still use savepoint to resume your jobs with Flink SQL.
>>>>
>>>>
>>>>
>>>> However, as far as I know, Flink SQL might not configure the uid
>>>> currently and I’m pretty sure blink branch contains this part of setting
>>>> uid to stream node. [1]
>>>>
>>>>
>>>>
>>>> Already CC Kurt as he could provide more detail information of this.
>>>>
>>>>
>>>>
>>>> [1]
>>>> https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/util/resource/StreamNodeUtil.java#L44
>>>>
>>>>
>>>>
>>>> Best
>>>>
>>>> Yun Tang
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *From: *Fanbin Bu 
>>>> *Date: *Thursday, October 31, 2019 at 1:17 PM
>>>> *To: *user 
>>>> *Subject: *Flink SQL + savepoint
>>>>
>>>>
>>>>
>>>> Hi,
>>>>
>>>>
>>>>
>>>> it is highly recommended that we assign the uid to the operator for the
>>>> sake of savepoint. How do we do this for Flink SQL? According to
>>>> https://stackoverflow.com/questions/55464955/how-to-add-uid-to-operator-in-flink-table-api,
>>>> it is not possible.
>>>>
>>>>
>>>>
>>>> Does that mean, I can't use savepoint to restart my program if I use
>>>> Flink SQL?
>>>>
>>>>
>>>>
>>>> Thanks,
>>>>
>>>>
>>>>
>>>> Fanbin
>>>>
>>>


Re: Flink1.9批任务yn和ys对任务的影响

2019-12-25 Thread Kurt Young
也可以试下最新的1.10版本,这个版本里面 sql 的算子已经不再申请固定写死的内存数量,
而是根据当时 slot 能提供多少 managed 内存来自适应了。

Best,
Kurt


On Thu, Dec 26, 2019 at 1:36 PM Xintong Song  wrote:

> slot需要多少内存是和具体作业相关的,不同作业差别会比较大。
>
> slot的资源需求是根据所有算子的资源需求相加得到的,如果你对你的作业用到了哪些算子比较了解的话,可以根据算子的资源需求推算出来。
> 算子的默认资源需求可以参考 [1],里面有五个“table.exec.resource.*”的配置项,也可以调整这些配置项来更改算子使用的内存。
>
> 如果对作业使用到的算子不是很了解的话,那比较简单的办法还是直接提交作业试试看,去日志里面搜"Request slot with
> profile"就能够看到slot的资源需求。
>
> Thank you~
>
> Xintong Song
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/config.html#execution-options
>
> On Thu, Dec 26, 2019 at 11:36 AM faaron zheng 
> wrote:
>
> > 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed
> > memory为2g,也就是一个slot平均200m,所以任务没调度起来。
> > 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱:
> > faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月26日 11:23,faaron zheng 写道:
> > 感谢回复,我确认了下,ys为10的时候,hashjoin请求的slot内存为256m,而我的tm managed
> > memory为2g,也就是一个slot平均200m,所以任务没调度起来。
> > 但是我还有个疑问,批任务如何在任务提交前确定单个slot应该分多少内存,有没有一般性的方法或经验? faaron zheng 邮箱:
> > faaronzh...@gmail.com 签名由 网易邮箱大师 定制 在2019年12月25日 11:30,Xintong Song 写道:
> > Hi faaron, Flink 1.9 中 -yn参数应该是不生效的,后续版本中已经删除了这个参数。 根据你的参数,在每个 TM
> > 的内存为30G不变的情况下,每个 TM 中的slot个数(-ys)从5变成10,也就意味着平均每个slot占用的内存变为了原来的一半。 Flink
> > 1.9 的sql batch 算子对 flink managed memory 是有确定的需求的,很可能是这个变化导致单个 slot
> 的managed
> > memory无法满足算子的资源需求了。 Thank you~ Xintong Song On Wed, Dec 25, 2019 at 11:09
> > AM faaron zheng  wrote: > 跑tpcds的query1: flink
> run
> > -m yarn-cluster -d -p 100 -yn 20 -ys 5 -yjm 60g > -ytm 30g 任务可以正常执行 flink
> > run -m yarn-cluster -d -p 100 -yn 10 -ys 10 -yjm > 60g -ytm 30g
> > 任务在做hashjoin的时候就会失败 报错是No pooled slot available and request to >
> > ResourceManager for new slot failed 搞不懂这有啥关系,求指教 faaron zheng 邮箱: >
> > faaronzh...@gmail.com 签名由 网易邮箱大师 定制
>


Re: testing - syncing data timeline

2019-12-25 Thread Kurt Young
Lets say you keep your #1, which does hourly counting, and emit result to
the merged
new #2. The new #2 would first keep all hourly result in state, and also
keep tracking
whether it already receive all 24 results belong to same day. Once you
received all 24
count belong to the same day, you can start your logic. You could also
determine what
kind of data you want to keep in state after that.

Best,
Kurt


On Thu, Dec 26, 2019 at 1:14 PM Avi Levi  wrote:

> not sure that I can see how it is simpler. #2 is time window per day it
> emits the highest hour for that day. #4 is not a time window it keeps
> history of several days . if I want to put the logic of #2 I will need to
> manage it with timers, correct ?
>
> On Thu, Dec 26, 2019 at 6:28 AM Kurt Young  wrote:
>
>> *This Message originated outside your organization.*
>> --
>> Hi,
>>
>> You can merge the logic of #2 into #4, it will be much simpler.
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Dec 25, 2019 at 7:36 PM Avi Levi  wrote:
>>
>>>  Hi ,
>>>
>>> I have the following pipeline :
>>> 1. single hour window that counts the number of records
>>> 2. single day window that accepts the aggregated data from #1 and emits
>>> the highest hour count of that day
>>> 3. union #1 + #2
>>> 4. Logic operator that accepts the data from #3 and keep a listState of
>>> #2 and apply some logic on #1 based on that state (e.g comparing a single
>>> hour the history of the max hours at the last X days ) and emits the result
>>>
>>> the timestamsAndWaterMarks is
>>> using BoundedOutOfOrdernessTimestampExtractor (event-time)  and I allow
>>> lateness of 3 hours
>>>
>>>  the problem is that when I try to do unit tests of all the pipeline,
>>> the data from #1 rich #4 before the latter accepts the data from #3 hence
>>> it doesn't have any state yet (state is always empty when the stream from
>>> #1 arrives ).
>>> My source in the tests is a collection that represents the records.
>>>  is there anyway I can solve this ?
>>> [image: Screen Shot 2019-12-25 at 13.04.17.png]
>>> I appreciate any help you can provide
>>> Cheers
>>> Avi
>>>
>>>
>>>


Re: testing - syncing data timeline

2019-12-25 Thread Kurt Young
Hi,

You can merge the logic of #2 into #4, it will be much simpler.

Best,
Kurt


On Wed, Dec 25, 2019 at 7:36 PM Avi Levi  wrote:

>  Hi ,
>
> I have the following pipeline :
> 1. single hour window that counts the number of records
> 2. single day window that accepts the aggregated data from #1 and emits
> the highest hour count of that day
> 3. union #1 + #2
> 4. Logic operator that accepts the data from #3 and keep a listState of #2
> and apply some logic on #1 based on that state (e.g comparing a single hour
> the history of the max hours at the last X days ) and emits the result
>
> the timestamsAndWaterMarks is
> using BoundedOutOfOrdernessTimestampExtractor (event-time)  and I allow
> lateness of 3 hours
>
>  the problem is that when I try to do unit tests of all the pipeline, the
> data from #1 rich #4 before the latter accepts the data from #3 hence it
> doesn't have any state yet (state is always empty when the stream from #1
> arrives ).
> My source in the tests is a collection that represents the records.
>  is there anyway I can solve this ?
> [image: Screen Shot 2019-12-25 at 13.04.17.png]
> I appreciate any help you can provide
> Cheers
> Avi
>
>
>


Re: Need guidance on a use case

2019-12-19 Thread Kurt Young
Hi Eva,

Correct me If i'm wrong. You have an unbounded Task stream and you
want to enrich the User info to the task event. Meanwhile, the User table
is also changing by the time, so you basically want that when task event
comes, join the latest data of User table and emit the results. Even if the
User table changes again, you don't want to re-trigger the join operation
which happened before and already emitted, right?

Best,
Kurt


On Fri, Dec 20, 2019 at 12:33 AM Timo Walther  wrote:

> Hi Eva,
>
> I'm not 100% sure if your use case can be solved with SQL. JOIN in SQL
> always joins an incoming record with all previous arrived records. Maybe
> Jark in CC has some idea?
>
> It might make sense to use the DataStream API instead with a connect()
> and CoProcessFunction where you can simply put the latest row into state
> and perform the joining and emission of a new row when required.
>
> Regards,
> Timo
>
>
> On 18.12.19 23:44, Eva Eva wrote:
> > Hi Team,
> >
> > I'm trying Flink for the first time and encountered an issue that I
> > would like to discuss and understand if there is a way to achieve my use
> > case with Flink.
> >
> > *Use case:* I need to perform unbounded stream joins on multiple data
> > streams by listening to different Kafka topics. I have a scenario to
> > join a column in a table with multiple columns in another table by
> > avoiding duplicate joins. The main concern is that I'm not able to avoid
> > duplicate joins.
> >
> > *Issue: *Given the nature of data, it is possible to have updates over
> > time, sent as new messages since Kafka is immutable. For a given key I
> > would like to perform join only on the latest message, whereas currently
> > Flink performs join against all messages with the key (this is what I'm
> > calling as duplicate joins issue).
> > Example: Say I have two Kafka streams "User" and "Task". And I want to
> > join "User" with multiple columns in "Task".
> > Join "UserID" in "User" with "PrimaryAssignee", "SecondaryAssignee" and
> > "Manager" in "Task".
> >
> > Assuming I created and registered DataStreams.
> > Below is my query:
> >
> >SELECT * FROM Task t
> > LEFT JOIN User ua ON t.PrimaryAssignee = ua.UserID
> > LEFT JOIN User ub ON t.SecondaryAssignee = ub.UserID
> > LEFT JOIN User uc ON t.Manager = uc.UserID
> >
> > Say I have 5 different messages in Kafka with UserID=1000, I don't want
> > to perform 5 joins instead I want to perform join with the only latest
> > message with UserID=1000. Is there any way to achieve this without using
> > Temporal Table Functions?
> >
> > *I cannot use Temporal Table Functions because of below reasons:*
> > 1. I need to trigger JOIN operation for every new message in Kafka.
> > Whereas new messages in Temporal Table don't trigger JOIN operation.
> > 2. I need to perform LEFT OUTER JOINS, whereas Temporal Table can only
> > be used for INNER JOINS
> > 3. From what I understand, JOIN in Temporal Table can only be performed
> > using Primary key, so I won't be able to Join more than one key.
> >
> >
> > Could someone please help me with this? Please let me know if any of the
> > information is not clear or need more details.
> >
> >   If this is not the correct email id, could you please point me to the
> > correct one.
> >
> >
> > Thanks in advance!
>
>


Re: How to convert retract stream to dynamic table?

2019-12-18 Thread Kurt Young
Hi James,

If I understand correctly, you can use `TableEnvironment#sqlQuery` to
achieve
what you want. You can pass the whole sql statement in and get a `Table`
back
from the method. I believe this is the table you want which is semantically
equivalent with the stream you mentioned.

For example, you can further operate on the `Table` with other sql
operations,
like `GROUP BY cnt` on the returned table. You can think of it in this way
that
Flink would attach another aggregation operator to the original plan, and
this
operator can consume the retraction stream which the original sql statement
produced and start to generate correct results.

Best,
Kurt


On Thu, Dec 19, 2019 at 1:25 AM James Baker  wrote:

> Hi!
> I've been looking at Flink for the last few days and have very much
> appreciated the concept of Dynamic Tables, it solves a lot of my needs and
> handles a lot of the complex state tracking that is otherwise painful. I
> have a question about the composability of the system which the docs don't
> answer.
>
> The docs use the example of 'SELECT user, COUNT(url) as cnt FROM clicks
> GROUP BY user', where clicks is a stream coming in of user and the url
> they've clicked.
>
> From such a Table, I can then get a retract stream written into an
> external system, perhaps outputting (true, User1, 1), ..., (true, User1, 2)
> indicating that User1's clicked on something.
>
> Is there an idiomatic way to convert a retract stream into a semantically
> equivalent table?
>
> Thanks,
> James
>


Re: Join a datastream with tables stored in Hive

2019-12-16 Thread Kurt Young
Great, looking forward to hearing from you again.

Best,
Kurt


On Mon, Dec 16, 2019 at 10:22 PM Krzysztof Zarzycki 
wrote:

> Thanks Kurt for your answers.
>
> Summing up, I feel like the option 1 (i.e. join with temporal table
> function) requires some coding around a source, that needs to pull data
> once a day. But otherwise, bring the following benefits:
> * I don't have to put dicts in another store like Hbase. All stays in
> Hive + Flink.
> * I'll be able to make a true temporal join - event-time based.
> * I believe I will be able to build a history reprocessing program based
> on the same logic (i.e. same SQL). At least for a particular day -
> processing multiple days would be tricky, because I will need to pull
> multiple versions of the dictionary.
> Plus, looking up dict values will be much faster and resource optimal when
> dict is stored in a state instead of uncached Hbase. It's especially
> important in a case when we want to reprocess historical, archived stream
> with a speed of millions of events/sec.
>
> I understand that option 2 is easier to implement. I may do a PoC of it as
> well.
> OK, I believe I know enough to get my hands dirty with the code. I can
> share later on what I was able to accomplish. And probably more questions
> will show up when I finally start the implementation.
>
> Thanks
> Krzysztof
>
> pon., 16 gru 2019 o 03:14 Kurt Young  napisał(a):
>
>> Hi Krzysztof, thanks for the discussion, you raised lots of good
>> questions, I will try to reply them
>> one by one.
>>
>> Re option 1:
>>
>> > Question 1: do I need to write that Hive source or can I use something
>> ready, like Hive catalog integration? Or maybe reuse e.g. HiveTableSource
>> class?
>>
>> I'm not sure if you can reuse the logic of `HiveTableSource`. Currently
>> `HiveTableSource` works
>> as batch mode, it will read all data at once and stop. But what you need
>> is wait until next day after
>> finish. What you can try is reuse the logic of `HiveTableInputFormat`,
>> and wrap the "monitoring"
>> logic outside.
>>
>> > Question/worry 2:  the state would grow inifinitely if I had infinite
>> number of keys, but not only infinite number of versions of all keys.
>>
>> The temporal table function doesn't support watermark based state clean
>> up yet, but what you can
>> try is idle state retention [1]. So even if you have infinite number of
>> keys, for example say you have
>> different join keys every day, the old keys will not be touched in next
>> days and become idle and will
>> be deleted by framework.
>>
>> > Question 3: Do you imagine that I could use the same logic for both
>> stream processing and reprocessing just by replacing sources and sinks?
>>
>> Generally speaking, yes I think so. With event time based join, we should
>> be able to reuse the logic
>> of normal stream processing and reprocessing historical data. Although
>> there will definitely exists some
>> details should be addressed, like event time and watermarks.
>>
>> Re option 2:
>>
>> > maybe implement Hive/JDBC-based LookupableTableSource that  pulls the
>> whole dictionary to memory
>>
>> You can do this manually but I would recommend you go with the first
>> choice which loads hive table
>> to HBase periodically. It's much more easier and efficient. And this
>> approach you mentioned also
>> seems a little bit duplicate with the temporal table function solution.
>>
>> > this option is available only with Blink engine and also only with use
>> of Flink SQL, no Table API?
>>
>> I'm afraid yes, you can only use it with SQL for now.
>>
>> > do you think it would be possible to use the same logic / SQL for
>> reprocessing?
>>
>> Given the fact this solution is based on processing time, I don't think
>> it can cover the use case of
>> reprocessing, except if you can accept always joining with latest day
>> even during backfilling. But we
>> are also aiming to resolve this shortcoming maybe in 1 or 2 releases.
>>
>> Best,
>> Kurt
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time
>>
>>
>> On Sat, Dec 14, 2019 at 3:41 AM Krzysztof Zarzycki 
>> wrote:
>>
>>> Very interesting, Kurt! Yes, I also imagined it's rather a very common
>>> case. In my company we currently have 3 clients wanting this functionality.
>>> I also just realized this slight difference between Temporal Join and
>>

Re: Join a datastream with tables stored in Hive

2019-12-15 Thread Kurt Young
d to join data stream and
> (versioned/snapshotted) dictionaries stored on HDFS. Do you imagine that I
> could use the same logic for both stream processing and reprocessing just
> by replacing sources and sinks? Maybe after some slight modifications?
>
>
> Regarding option 2:
> Here I understand the current limitation (which will stay for some time )
> is that the join can happen only on processing time, which means join only
> with the latest version of dictionaries.
> Accepting that, I understand I would need to do:
> a) load Hive table to e.g. HBase and then use HBaseTableSource on it., OR
> b) maybe implement Hive/JDBC-based LookupableTableSource that  pulls the
> whole dictionary to memory (or even to Flink state, if it is possible to
> use it from TableFunction).
> Then use this table and my Kafka stream table in temporal join expressed
> with Flink SQL.
> What do you think, is that feasible?
> Do I understand correctly, that this option is available only with Blink
> engine and also only with use of Flink SQL, no Table API?
>
> Same question comes up regarding reprocessing: do you think it would be
> possible to use the same logic / SQL for reprocessing?
>
> Thank you for continuing discussion with me. I believe we're here on a
> subject of a really important design for the community.
> Krzysztof
>
> pt., 13 gru 2019 o 09:39 Kurt Young  napisał(a):
>
>> Sorry I forgot to paste the reference url.
>>
>> Best,
>> Kurt
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table-function
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
>>
>> On Fri, Dec 13, 2019 at 4:37 PM Kurt Young  wrote:
>>
>>> Hi Krzysztof,
>>>
>>> What you raised also interested us a lot to achieve in Flink.
>>> Unfortunately, there
>>> is no in place solution in Table/SQL API yet, but you have 2 options
>>> which are both
>>> close to this thus need some modifications.
>>>
>>> 1. The first one is use temporal table function [1]. It needs you to
>>> write the logic of
>>> reading hive tables and do the daily update inside the table function.
>>> 2. The second choice is to use temporal table join [2], which only works
>>> with processing
>>> time now (just like the simple solution you mentioned), and need the
>>> table source has
>>> look up capability (like hbase). Currently, hive connector doesn't
>>> support look up, so to
>>> make this work, you need to sync the content to other storages which
>>> support look up,
>>> like HBase.
>>>
>>> Both solutions are not ideal now, and we also aims to improve this maybe
>>> in the following
>>> release.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Fri, Dec 13, 2019 at 1:44 AM Krzysztof Zarzycki 
>>> wrote:
>>>
>>>> Hello dear Flinkers,
>>>> If this kind of question was asked on the groups, I'm sorry for a
>>>> duplicate. Feel free to just point me to the thread.
>>>> I have to solve a probably pretty common case of joining a datastream
>>>> to a dataset.
>>>> Let's say I have the following setup:
>>>> * I have a high pace stream of events coming in Kafka.
>>>> * I have some dimension tables stored in Hive. These tables are changed
>>>> daily. I can keep a snapshot for each day.
>>>>
>>>> Now conceptually, I would like to join the stream of incoming events to
>>>> the dimension tables (simple hash join). we can consider two cases:
>>>> 1) simpler, where I join the stream with the most recent version of the
>>>> dictionaries. (So the result is accepted to be nondeterministic if the job
>>>> is retried).
>>>> 2) more advanced, where I would like to do temporal join of the stream
>>>> with dictionaries snapshots that were valid at the time of the event. (This
>>>> result should be deterministic).
>>>>
>>>> The end goal is to do aggregation of that joined stream, store results
>>>> in Hive or more real-time analytical store (Druid).
>>>>
>>>> Now, could you please help me understand is any of these cases
>>>> implementable with declarative Table/SQL API? With use of temporal joins,
>>>> catalogs, Hive integration, JDBC connectors, or whatever beta features
>>>> there are now. (I've read quite a lot of Flink docs about each of those,
>>>> but I have a problem to compile this information in the final design.)
>>>> Could you please help me understand how these components should
>>>> cooperate?
>>>> If that is impossible with Table API, can we come up with the easiest
>>>> implementation using Datastream API ?
>>>>
>>>> Thanks a lot for any help!
>>>> Krzysztof
>>>>
>>>


Re: Flink1.9.1的SQL向前不兼容的问题

2019-12-13 Thread Kurt Young
Hi,

建议你翻译成英文然后到jira里建个issue。

Best,
Kurt


On Thu, Dec 12, 2019 at 11:39 PM 李佟  wrote:

> 近期进行Flink升级,将原来的程序从老的集群(1.8.0运行正常)迁移到新的集群(1.9.1)中。在部署程序的时候发现在1.9.1的集群中,原来运行正常的Flink
> SQL的程序无法执行,异常如下:
>
>
> org.apache.flink.table.api.ValidationException: *Window can only be
> defined over a time attribute column.*
> at
> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85)
>
> at
> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:90)
>
>
>
> 跟踪到反编译的scala文件并设置断点,发现下图中的红框部分没有执行,直接跳过。
>
>
> 功能很简单,就是在某个时间窗对数值求和。测试用例如下:
>
>
> package org.flowmatrix.isp.traffic.accounting.test;
>
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.scala.typeutils.Types;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
> import org.apache.flink.streaming.api.watermark.Watermark;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.TableSchema;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.sinks.CsvTableSink;
> import org.apache.flink.table.sinks.TableSink;
> import org.apache.flink.table.sources.DefinedRowtimeAttributes;
> import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
> import org.apache.flink.table.sources.StreamTableSource;
> import org.apache.flink.table.sources.tsextractors.ExistingField;
> import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
> import org.apache.flink.types.Row;
> import org.junit.Test;
>
> import javax.annotation.Nullable;
> import java.sql.Timestamp;
> import java.util.ArrayList;
> import java.util.Collections;
> import java.util.List;
>
> public class TestSql {
> @Test
> public void testAccountingSql() {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.setParallelism(1);
>
> try {
> StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(env);
>
> SimpleTableSource source = new SimpleTableSource();
> Table t = tableEnv.fromTableSource(source);
>
> String interval = "5"; //5 second
> System.out.println("source schema is " +
> source.getTableSchema());
>
> Table sqlResult = tableEnv.sqlQuery("SELECT " +
> " TUMBLE_START(UserActionTime, INTERVAL '" + interval
> + "' SECOND) as rowTime, " +
> " Username," +
> " SUM(Data) as Data " +
> " FROM  " + t +
> " GROUP BY TUMBLE(UserActionTime, INTERVAL '" +
> interval + "' SECOND),Username");
>
>
> String[] fieldNames = {
> "rowTime",
> "Username", "Data"};
> TypeInformation[] fieldTypes = {
> TypeInformation.of(Timestamp.class),
> TypeInformation.of(String.class),
> TypeInformation.of(Long.class)};
>
> TableSink sink1 = new CsvTableSink("/tmp/data.log", ",");
> sink1 = sink1.configure(fieldNames, fieldTypes);
> tableEnv.registerTableSink("EsSinkTable", sink1);
> System.out.println("sql result schema is " +
> sqlResult.getSchema());
>
> tableEnv.sqlUpdate("insert into EsSinkTable select  " +
> "rowTime,Username,Data from " + sqlResult + "");
>
> env.execute("test");
> } catch (Exception e) {
> e.printStackTrace();
> System.err.println("start program error. FlowMatrix
> --zookeeper  --config " +
> " --name  --interval 
> --indexName ");
> System.err.println(e.toString());
> return;
> }
> }
>
> public static class SimpleTableSource implements
> StreamTableSource, DefinedRowtimeAttributes {
> @Override
> public DataStream getDataStream(StreamExecutionEnvironment
> env) {
> return
> env.fromCollection(genertateData()).assignTimestampsAndWatermarks(new
> AssignerWithPunctuatedWatermarks() {
> private long lastWaterMarkMillSecond = -1;
> private long waterMarkPeriodMillSecond = 1000;
> @Nullable
> @Override
> public Watermark checkAndGetNextWatermark(Row lastElement,
> long extractedTimestamp) {
> if(extractedTimestamp - lastWaterMarkMillSecond >=
> waterMarkPeriodMillSecond){
> lastWaterMarkMillSecond = 

Re: Join a datastream with tables stored in Hive

2019-12-13 Thread Kurt Young
Sorry I forgot to paste the reference url.

Best,
Kurt

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table-function
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table

On Fri, Dec 13, 2019 at 4:37 PM Kurt Young  wrote:

> Hi Krzysztof,
>
> What you raised also interested us a lot to achieve in Flink.
> Unfortunately, there
> is no in place solution in Table/SQL API yet, but you have 2 options which
> are both
> close to this thus need some modifications.
>
> 1. The first one is use temporal table function [1]. It needs you to write
> the logic of
> reading hive tables and do the daily update inside the table function.
> 2. The second choice is to use temporal table join [2], which only works
> with processing
> time now (just like the simple solution you mentioned), and need the table
> source has
> look up capability (like hbase). Currently, hive connector doesn't support
> look up, so to
> make this work, you need to sync the content to other storages which
> support look up,
> like HBase.
>
> Both solutions are not ideal now, and we also aims to improve this maybe
> in the following
> release.
>
> Best,
> Kurt
>
>
> On Fri, Dec 13, 2019 at 1:44 AM Krzysztof Zarzycki 
> wrote:
>
>> Hello dear Flinkers,
>> If this kind of question was asked on the groups, I'm sorry for a
>> duplicate. Feel free to just point me to the thread.
>> I have to solve a probably pretty common case of joining a datastream to
>> a dataset.
>> Let's say I have the following setup:
>> * I have a high pace stream of events coming in Kafka.
>> * I have some dimension tables stored in Hive. These tables are changed
>> daily. I can keep a snapshot for each day.
>>
>> Now conceptually, I would like to join the stream of incoming events to
>> the dimension tables (simple hash join). we can consider two cases:
>> 1) simpler, where I join the stream with the most recent version of the
>> dictionaries. (So the result is accepted to be nondeterministic if the job
>> is retried).
>> 2) more advanced, where I would like to do temporal join of the stream
>> with dictionaries snapshots that were valid at the time of the event. (This
>> result should be deterministic).
>>
>> The end goal is to do aggregation of that joined stream, store results in
>> Hive or more real-time analytical store (Druid).
>>
>> Now, could you please help me understand is any of these cases
>> implementable with declarative Table/SQL API? With use of temporal joins,
>> catalogs, Hive integration, JDBC connectors, or whatever beta features
>> there are now. (I've read quite a lot of Flink docs about each of those,
>> but I have a problem to compile this information in the final design.)
>> Could you please help me understand how these components should
>> cooperate?
>> If that is impossible with Table API, can we come up with the easiest
>> implementation using Datastream API ?
>>
>> Thanks a lot for any help!
>> Krzysztof
>>
>


  1   2   >