Re: Yarn Kerberos issue

2020-01-05 Thread Juan Gentile
Hello Rong, Chesnay,

Thank you for your answer, the way we are trying to launch the job is through a 
scheduler (similar to oozie) where we have a keytab for the scheduler user and 
with that keytab we get delegation tokens impersonating the right user (owner 
of the job). But the only way I was able to make this work is by getting a 
ticket (through kinit).
As a comparison, if I launch a spark job (without doing kinit) just with the 
delegation tokens, it works okay. So I guess Spark does something extra.
This is as far as I could go but at this point I’m not sure if this is 
something just not supported by Flink or I’m doing something wrong.

Thank you,
Juan

From: Rong Rong 
Date: Saturday, January 4, 2020 at 6:06 PM
To: Chesnay Schepler 
Cc: Juan Gentile , "user@flink.apache.org" 
, Oleksandr Nitavskyi 
Subject: Re: Yarn Kerberos issue

Hi Juan,

Chesnay was right. If you are using CLI to launch your session cluster based on 
the document [1], you following the instruction to use kinit [2] first seems to 
be one of the right way to go.
Another way of approaching it is to setup the kerberos settings in the 
flink-conf.yaml file [3]. FlinkYarnSessionCli will be able to pick up your 
keytab files and run the CLI securely.

As far as I know the option `security.kerberos.login.use-ticket-cache` doesn't 
actually change the behavior of the authentication process, it is more of a 
hint whether to use the ticket cache instantiated by `kinit`. If you disable 
using the ticket cache, you will have to use the "keytab/principle" approach - 
this doc [4] might be helpful to explain better.

Thanks,
Rong


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#start-flink-session
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-kerberos.html#using-kinit-yarn-only
[3] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-kerberos.html#yarnmesos-mode
[4] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#enabling-kerberos-authentication-for-versions-09-and-above-only

On Fri, Jan 3, 2020 at 7:20 AM Chesnay Schepler 
mailto:ches...@apache.org>> wrote:
From what I understand from the documentation, if you want to use delegation 
tokens you always first have to issue a ticket using kinit; so you did 
everything correctly?

On 02/01/2020 13:00, Juan Gentile wrote:
Hello,

Im trying to submit a job (batch worcount) to a Yarn cluster. I’m trying to use 
delegation tokens and I’m getting the following error:

org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy 
Yarn session cluster
   at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:423)
   at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:262)
   at 
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)
   at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)
   at 
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)
   at java.security.AccessController.doPrivileged(Native Method)
   at javax.security.auth.Subject.doAs(Subject.java:422)
   at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
   at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
  

Re: Checkpoints issue and job failing

2020-01-05 Thread vino yang
Hi Navneeth,

Since the file still exists, this exception is very strange.

I want to ask, does it happen by accident or frequently?

Another concern is that since the 1.4 version is very far away, all
maintenance and response are not as timely as the recent versions. I
personally recommend upgrading as soon as possible.

I can ping @Piotr Nowojski   and see if it is possible
to explain the cause of this problem.

Best,
Vino

Navneeth Krishnan  于2020年1月4日周六 上午1:03写道:

> Thanks Congxian & Vino.
>
> Yes, the file do exist and I don't see any problem in accessing it.
>
> Regarding flink 1.9, we haven't migrated yet but we are planning to do.
> Since we have to test it might take sometime.
>
> Thanks
>
> On Fri, Jan 3, 2020 at 2:14 AM Congxian Qiu 
> wrote:
>
>> Hi
>>
>> Do you have ever check that this problem exists on Flink 1.9?
>>
>> Best,
>> Congxian
>>
>>
>> vino yang  于2020年1月3日周五 下午3:54写道:
>>
>>> Hi Navneeth,
>>>
>>> Did you check if the path contains in the exception is really can not be
>>> found?
>>>
>>> Best,
>>> Vino
>>>
>>> Navneeth Krishnan  于2020年1月3日周五 上午8:23写道:
>>>
 Hi All,

 We are running into checkpoint timeout issue more frequently in
 production and we also see the below exception. We are running flink 1.4.0
 and the checkpoints are saved on NFS. Can someone suggest how to overcome
 this?

 [image: image.png]

 java.lang.IllegalStateException: Could not initialize operator state 
 backend.
at 
 org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:302)
at 
 org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:249)
at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
 Caused by: java.io.FileNotFoundException: 
 /mnt/checkpoints/02c4f8d5c11921f363b98c5959cc4f06/chk-101/e71d8eaf-ff4a-4783-92bd-77e3d8978e01
  (No such file or directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.(FileInputStream.java:138)
at 
 org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)


 Thanks




Re: How long Flink state default TTL,if I don't config the state ttl config?

2020-01-05 Thread LakeShen
Ok, got it ,thank you

Zhu Zhu  于2020年1月6日周一 上午10:30写道:

> Yes. State TTL is by default disabled.
>
> Thanks,
> Zhu Zhu
>
> LakeShen  于2020年1月6日周一 上午10:09写道:
>
>> I saw the flink source code, I find the flink state ttl default is
>> never expire,is it right?
>>
>> LakeShen  于2020年1月6日周一 上午9:58写道:
>>
>>> Hi community,I have a question about flink state ttl.If I don't config
>>> the flink state ttl config,
>>> How long the flink state retain?Is it forever retain in hdfs?
>>> Thanks your replay.
>>>
>>


Re: How long Flink state default TTL,if I don't config the state ttl config?

2020-01-05 Thread LakeShen
Ok, got it ,thank you

Zhu Zhu  于2020年1月6日周一 上午10:30写道:

> Yes. State TTL is by default disabled.
>
> Thanks,
> Zhu Zhu
>
> LakeShen  于2020年1月6日周一 上午10:09写道:
>
>> I saw the flink source code, I find the flink state ttl default is
>> never expire,is it right?
>>
>> LakeShen  于2020年1月6日周一 上午9:58写道:
>>
>>> Hi community,I have a question about flink state ttl.If I don't config
>>> the flink state ttl config,
>>> How long the flink state retain?Is it forever retain in hdfs?
>>> Thanks your replay.
>>>
>>


Re: Flink logging issue with logback

2020-01-05 Thread vino yang
Hi Bajaj,

>> Logs from main method(outside of job graph) do not show up in jobmanager
logs.

IMO, it's normal phenomena.

Other ideas, please check the JVM options mentioned by Yang.

Best,
Vino


Yang Wang  于2020年1月6日周一 上午11:18写道:

> Hi Bajaj, Abhinav,
>
> Could you share the start-command of jobmanager and taskmanager. If it is
> started correctly, we
> will have a the following jvm options.
>
> -Dlog.file=/path/of/taskmanager.log
> -Dlogback.configurationFile=file:///path/of/logback.xml
>
>
>
> Best,
> Yang
>
> Bajaj, Abhinav  于2020年1月4日周六 上午7:23写道:
>
>> Hi,
>>
>>
>>
>> I am investigating a logging issue with Flink.
>>
>>
>>
>> *Setup*
>>
>>- Using Flink-1.7.1 using logback as suggested in Flink documentation
>>here
>>
>> 
>>.
>>- Submitting the Flink job from the Flink dashboard.
>>
>>
>>
>> *Observations*
>>
>>- Logs from main method(outside of job graph) do not show up in
>>jobmanager logs.
>>- Logs from the operators like map or custom operators do show up in
>>the taskmanager logs.
>>- Logs from main method do show up in jobmanager logs when using
>>log4j in place of logback.
>>
>>
>>
>> Has anyone else noticed similar behavior or is this a known issue with
>> logback integration in Flink?
>>
>> Any suggestions on potential workaround or fix?
>>
>>
>>
>> Appreciate your time and help.
>>
>>
>>
>> ~ Abhinav Bajaj
>>
>>
>>
>


Re: [DISCUSS] Set default planner for SQL Client to Blink planner in 1.10 release

2020-01-05 Thread 贺小令
+1 for making the blink planner as default planner, the blink planner
becomes more stable since 1.10

Dian Fu  于2020年1月6日周一 上午11:51写道:

> +1 to set blink planner as the default planner for SQL client considering
> that so many features added since 1.10 are only available in the blink
> planner.
>
> 在 2020年1月6日,上午11:04,Rui Li  写道:
>
> +1. I think it improves user experience.
>
> On Mon, Jan 6, 2020 at 10:18 AM Zhenghua Gao  wrote:
>
>> +1 for making blink planner as the default planner for SQL Client since
>> we have made a huge improvement in 1.10.
>>
>> *Best Regards,*
>> *Zhenghua Gao*
>>
>>
>> On Sun, Jan 5, 2020 at 2:42 PM Benchao Li  wrote:
>>
>>> +1
>>>
>>> We have used blink planner since 1.9.0 release in our production
>>> environment, and it behaves really impressive.
>>>
>>> Hequn Cheng  于2020年1月5日周日 下午1:58写道:
>>>
 +1 to make blink planner as the default planner for SQL Client, hence
 we can give the blink planner a bit more exposure.

 Best, Hequn

 On Fri, Jan 3, 2020 at 6:32 PM Jark Wu  wrote:

> Hi Benoît,
>
> Thanks for the reminder. I will look into the issue and hopefully we
> can target it into 1.9.2 and 1.10.
>
> Cheers,
> Jark
>
> On Fri, 3 Jan 2020 at 18:21, Benoît Paris <
> benoit.pa...@centraliens-lille.org> wrote:
>
>> >  If anyone finds that blink planner has any significant defects
>> and has a larger regression than the old planner, please let us know.
>>
>> Overall, the Blink-exclusive features are must (TopN, deduplicate,
>> LAST_VALUE, plan reuse, etc)! But all use cases of the Legacy planner in
>> production are not covered:
>> An edge case of Temporal Table Functions does not allow computed
>> Tables (as opposed to TableSources) to be used on the query side in 
>> Blink (
>> https://issues.apache.org/jira/browse/FLINK-14200)
>>
>> Cheers
>> Ben
>>
>>
>> On Fri, Jan 3, 2020 at 10:00 AM Jeff Zhang  wrote:
>>
>>> +1, I have already made blink as the default planner of flink
>>> interpreter in Zeppelin
>>>
>>>
>>> Jingsong Li  于2020年1月3日周五 下午4:37写道:
>>>
 Hi Jark,

 +1 for default blink planner in SQL-CLI.
 I believe this new planner can be put into practice in production.
 We've worked hard for nearly a year, but the old planner didn't
 move on.

 And I'd like to cc to user@flink.apache.org.
 If anyone finds that blink planner has any significant defects and
 has a larger regression than the old planner, please let us know. We 
 will
 be very grateful.

 Best,
 Jingsong Lee

 On Fri, Jan 3, 2020 at 4:14 PM Leonard Xu 
 wrote:

> +1 for this.
> We bring many SQL/API features and enhance stability in 1.10
> release, and almost all of them happens in Blink planner.
> SQL CLI is the most convenient entrypoint for me, I believe many
> users will have a better experience If we set Blink planner as default
> planner.
>
> Best,
> Leonard
>
> > 在 2020年1月3日,15:16,Terry Wang  写道:
> >
> > Since what blink planner can do is a superset of flink planner,
> big +1 for changing the default planner to Blink planner from my side.
> >
> > Best,
> > Terry Wang
> >
> >
> >
> >> 2020年1月3日 15:00,Jark Wu  写道:
> >>
> >> Hi everyone,
> >>
> >> In 1.10 release, Flink SQL supports many awesome features and
> improvements,
> >> including:
> >> - support watermark statement and computed column in DDL
> >> - fully support all data types in Hive
> >> - Batch SQL performance improvements (TPC-DS 7x than Hive MR)
> >> - support INSERT OVERWRITE and INSERT PARTITION
> >>
> >> However, all the features and improvements are only avaiable in
> Blink
> >> planner, not in Old planner.
> >> There are also some other features are limited in Blink
> planner, e.g.
> >> Dimension Table Join [1],
> >> TopN [2], Deduplicate [3], streaming aggregates optimization
> [4], and so on.
> >>
> >> But Old planner is still the default planner in Table API &
> SQL. It is
> >> frustrating for users to set
> >> to blink planner manually when every time start a SQL CLI. And
> it's
> >> surprising to see unsupported
> >> exception if they trying out the new features but not switch
> planner.
> >>
> >> SQL CLI is a very important entrypoint for trying out new
> feautures and
> >> prototyping for users.
> >> In order to give new planner more exposures, I would like to
> suggest to set
> >> 

Re: Need guidance on a use case

2020-01-05 Thread Jark Wu
Hi Reva,

I'm glad to see it can help you.

Quick answers for your questions:
1) Yes, it works. You can deduplicate Task table in the same way using
ROW_NUMBER().
2) Yes. It is a stream-stream join which will be triggered for new messages
from both sides.

Best,
Jark


On Sat, 28 Dec 2019 at 01:02, Eva Eva  wrote:

> Thanks everyone for the replies.
>
> @Jark,
>
> This is helpful, my code is currently in 1.8 version and I'll upgrade the
> code to 1.9 and give it a try.
>
> Couple of follow-up questions:
> 1. I need to perform Deduplication on Task table as well. Would above
> query work well on two Deduplicated tables, "LatestUser" and "LatestTask"?
> 2. Would JOIN operation be triggered for all new messages on both "User"
> and "Task" table? Same question in different words, will JOIN operation be
> triggered for all upsert messages on "LatestUser" and "LatestTask"
>
> Thanks,
> Reva
>
>
> On Thu, Dec 19, 2019 at 9:50 PM Jark Wu  wrote:
>
>> Hi Eva,
>>
>> If I understand correctly,
>> 1) the user stream is a changelog stream which every record is a upsert
>> with a primary key, and you only want to join the latest one
>> 2) if the user record is updated, you want to re-trigger the join
>> (retract previous joined result)
>>
>> If this is your requirement, fortunately, this use case can be solved in
>> Flink SQL v1.9 with *blink planner*.
>> First, you can use Deduplicate[1] to convert the append stream to an
>> updating stream which keeps the last row.
>> And then, join Task stream with the deduplicated view. Below is the
>> example:
>>
>>
>> Register the following query as "LatestUser" view:
>>
>> SELECT *
>> FROM (
>>   SELECT *, ROW_NUMBER() OVER (PARTITION BY UserID ORDER BY PROCTIME()
>> DESC) AS rn
>>   FROM User
>> ) WHERE rn = 1
>>
>> Join on the "LatestUser":
>>
>>  SELECT * FROM Task t
>>LEFT JOIN LatestUser ua ON t.PrimaryAssignee = ua.UserID
>>LEFT JOIN LatestUser ub ON t.SecondaryAssignee = ub.UserID
>>LEFT JOIN LatestUser uc ON t.Manager = uc.UserID
>>
>>
>> Best,
>> Jark
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#deduplication
>>
>>
>> On Fri, 20 Dec 2019 at 09:53, Kurt Young  wrote:
>>
>>> Hi Eva,
>>>
>>> Correct me If i'm wrong. You have an unbounded Task stream and you
>>> want to enrich the User info to the task event. Meanwhile, the User table
>>> is also changing by the time, so you basically want that when task event
>>> comes, join the latest data of User table and emit the results. Even if
>>> the
>>> User table changes again, you don't want to re-trigger the join
>>> operation
>>> which happened before and already emitted, right?
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Fri, Dec 20, 2019 at 12:33 AM Timo Walther 
>>> wrote:
>>>
 Hi Eva,

 I'm not 100% sure if your use case can be solved with SQL. JOIN in SQL
 always joins an incoming record with all previous arrived records.
 Maybe
 Jark in CC has some idea?

 It might make sense to use the DataStream API instead with a connect()
 and CoProcessFunction where you can simply put the latest row into
 state
 and perform the joining and emission of a new row when required.

 Regards,
 Timo


 On 18.12.19 23:44, Eva Eva wrote:
 > Hi Team,
 >
 > I'm trying Flink for the first time and encountered an issue that I
 > would like to discuss and understand if there is a way to achieve my
 use
 > case with Flink.
 >
 > *Use case:* I need to perform unbounded stream joins on multiple data
 > streams by listening to different Kafka topics. I have a scenario to
 > join a column in a table with multiple columns in another table by
 > avoiding duplicate joins. The main concern is that I'm not able to
 avoid
 > duplicate joins.
 >
 > *Issue: *Given the nature of data, it is possible to have updates
 over
 > time, sent as new messages since Kafka is immutable. For a given key
 I
 > would like to perform join only on the latest message, whereas
 currently
 > Flink performs join against all messages with the key (this is what
 I'm
 > calling as duplicate joins issue).
 > Example: Say I have two Kafka streams "User" and "Task". And I want
 to
 > join "User" with multiple columns in "Task".
 > Join "UserID" in "User" with "PrimaryAssignee", "SecondaryAssignee"
 and
 > "Manager" in "Task".
 >
 > Assuming I created and registered DataStreams.
 > Below is my query:
 >
 >SELECT * FROM Task t
 > LEFT JOIN User ua ON t.PrimaryAssignee = ua.UserID
 > LEFT JOIN User ub ON t.SecondaryAssignee = ub.UserID
 > LEFT JOIN User uc ON t.Manager = uc.UserID
 >
 > Say I have 5 different messages in Kafka with UserID=1000, I don't
 want
 > to perform 5 joins instead I want to perform join with the only
 latest
 > message with UserID=1000. Is there any way to 

Re: [DISCUSS] Set default planner for SQL Client to Blink planner in 1.10 release

2020-01-05 Thread Dian Fu
+1 to set blink planner as the default planner for SQL client considering that 
so many features added since 1.10 are only available in the blink planner.

> 在 2020年1月6日,上午11:04,Rui Li  写道:
> 
> +1. I think it improves user experience.
> 
> On Mon, Jan 6, 2020 at 10:18 AM Zhenghua Gao  > wrote:
> +1 for making blink planner as the default planner for SQL Client since we 
> have made a huge improvement in 1.10.
> 
> Best Regards,
> Zhenghua Gao
> 
> 
> On Sun, Jan 5, 2020 at 2:42 PM Benchao Li  > wrote:
> +1 
> 
> We have used blink planner since 1.9.0 release in our production environment, 
> and it behaves really impressive.
> 
> Hequn Cheng mailto:chenghe...@gmail.com>> 于2020年1月5日周日 
> 下午1:58写道:
> +1 to make blink planner as the default planner for SQL Client, hence we can 
> give the blink planner a bit more exposure. 
> 
> Best, Hequn
> 
> On Fri, Jan 3, 2020 at 6:32 PM Jark Wu  > wrote:
> Hi Benoît,
> 
> Thanks for the reminder. I will look into the issue and hopefully we can 
> target it into 1.9.2 and 1.10. 
> 
> Cheers,
> Jark
> 
> On Fri, 3 Jan 2020 at 18:21, Benoît Paris  > wrote:
> >  If anyone finds that blink planner has any significant defects and has a 
> > larger regression than the old planner, please let us know.
> 
> Overall, the Blink-exclusive features are must (TopN, deduplicate, 
> LAST_VALUE, plan reuse, etc)! But all use cases of the Legacy planner in 
> production are not covered:
> An edge case of Temporal Table Functions does not allow computed Tables (as 
> opposed to TableSources) to be used on the query side in Blink 
> (https://issues.apache.org/jira/browse/FLINK-14200 
> )
> 
> Cheers
> Ben
> 
> 
> On Fri, Jan 3, 2020 at 10:00 AM Jeff Zhang  > wrote:
> +1, I have already made blink as the default planner of flink interpreter in 
> Zeppelin
> 
> 
> Jingsong Li mailto:jingsongl...@gmail.com>> 
> 于2020年1月3日周五 下午4:37写道:
> Hi Jark,
> 
> +1 for default blink planner in SQL-CLI.
> I believe this new planner can be put into practice in production.
> We've worked hard for nearly a year, but the old planner didn't move on.
> 
> And I'd like to cc to user@flink.apache.org .
> If anyone finds that blink planner has any significant defects and has a 
> larger regression than the old planner, please let us know. We will be very 
> grateful.
> 
> Best,
> Jingsong Lee
> 
> On Fri, Jan 3, 2020 at 4:14 PM Leonard Xu  > wrote:
> +1 for this. 
> We bring many SQL/API features and enhance stability in 1.10 release, and 
> almost all of them happens in Blink planner.
> SQL CLI is the most convenient entrypoint for me, I believe many users will 
> have a better experience If we set Blink planner as default planner.
> 
> Best,
> Leonard
> 
> > 在 2020年1月3日,15:16,Terry Wang  > > 写道:
> > 
> > Since what blink planner can do is a superset of flink planner, big +1 for 
> > changing the default planner to Blink planner from my side.
> > 
> > Best,
> > Terry Wang
> > 
> > 
> > 
> >> 2020年1月3日 15:00,Jark Wu mailto:imj...@gmail.com>> 写道:
> >> 
> >> Hi everyone,
> >> 
> >> In 1.10 release, Flink SQL supports many awesome features and improvements,
> >> including:
> >> - support watermark statement and computed column in DDL
> >> - fully support all data types in Hive
> >> - Batch SQL performance improvements (TPC-DS 7x than Hive MR)
> >> - support INSERT OVERWRITE and INSERT PARTITION
> >> 
> >> However, all the features and improvements are only avaiable in Blink
> >> planner, not in Old planner.
> >> There are also some other features are limited in Blink planner, e.g.
> >> Dimension Table Join [1],
> >> TopN [2], Deduplicate [3], streaming aggregates optimization [4], and so 
> >> on.
> >> 
> >> But Old planner is still the default planner in Table API & SQL. It is
> >> frustrating for users to set
> >> to blink planner manually when every time start a SQL CLI. And it's
> >> surprising to see unsupported
> >> exception if they trying out the new features but not switch planner.
> >> 
> >> SQL CLI is a very important entrypoint for trying out new feautures and
> >> prototyping for users.
> >> In order to give new planner more exposures, I would like to suggest to set
> >> default planner
> >> for SQL Client to Blink planner before 1.10 release.
> >> 
> >> The approach is just changing the default SQL CLI yaml configuration[5]. In
> >> this way, the existing
> >> environment is still compatible and unaffected.
> >> 
> >> Changing the default planner for the whole Table API & SQL is another topic
> >> and is out of scope of this discussion.
> >> 
> >> What do you think?
> >> 
> >> Best,
> >> Jark
> >> 
> >> [1]:
> >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
> 

Re: Flink logging issue with logback

2020-01-05 Thread Yang Wang
Hi Bajaj, Abhinav,

Could you share the start-command of jobmanager and taskmanager. If it is
started correctly, we
will have a the following jvm options.

-Dlog.file=/path/of/taskmanager.log
-Dlogback.configurationFile=file:///path/of/logback.xml



Best,
Yang

Bajaj, Abhinav  于2020年1月4日周六 上午7:23写道:

> Hi,
>
>
>
> I am investigating a logging issue with Flink.
>
>
>
> *Setup*
>
>- Using Flink-1.7.1 using logback as suggested in Flink documentation
>here
>
> 
>.
>- Submitting the Flink job from the Flink dashboard.
>
>
>
> *Observations*
>
>- Logs from main method(outside of job graph) do not show up in
>jobmanager logs.
>- Logs from the operators like map or custom operators do show up in
>the taskmanager logs.
>- Logs from main method do show up in jobmanager logs when using log4j
>in place of logback.
>
>
>
> Has anyone else noticed similar behavior or is this a known issue with
> logback integration in Flink?
>
> Any suggestions on potential workaround or fix?
>
>
>
> Appreciate your time and help.
>
>
>
> ~ Abhinav Bajaj
>
>
>


Re: [DISCUSS] Set default planner for SQL Client to Blink planner in 1.10 release

2020-01-05 Thread Rui Li
+1. I think it improves user experience.

On Mon, Jan 6, 2020 at 10:18 AM Zhenghua Gao  wrote:

> +1 for making blink planner as the default planner for SQL Client since we
> have made a huge improvement in 1.10.
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Sun, Jan 5, 2020 at 2:42 PM Benchao Li  wrote:
>
>> +1
>>
>> We have used blink planner since 1.9.0 release in our production
>> environment, and it behaves really impressive.
>>
>> Hequn Cheng  于2020年1月5日周日 下午1:58写道:
>>
>>> +1 to make blink planner as the default planner for SQL Client, hence we
>>> can give the blink planner a bit more exposure.
>>>
>>> Best, Hequn
>>>
>>> On Fri, Jan 3, 2020 at 6:32 PM Jark Wu  wrote:
>>>
 Hi Benoît,

 Thanks for the reminder. I will look into the issue and hopefully we
 can target it into 1.9.2 and 1.10.

 Cheers,
 Jark

 On Fri, 3 Jan 2020 at 18:21, Benoît Paris <
 benoit.pa...@centraliens-lille.org> wrote:

> >  If anyone finds that blink planner has any significant defects and
> has a larger regression than the old planner, please let us know.
>
> Overall, the Blink-exclusive features are must (TopN, deduplicate,
> LAST_VALUE, plan reuse, etc)! But all use cases of the Legacy planner in
> production are not covered:
> An edge case of Temporal Table Functions does not allow computed
> Tables (as opposed to TableSources) to be used on the query side in Blink 
> (
> https://issues.apache.org/jira/browse/FLINK-14200)
>
> Cheers
> Ben
>
>
> On Fri, Jan 3, 2020 at 10:00 AM Jeff Zhang  wrote:
>
>> +1, I have already made blink as the default planner of flink
>> interpreter in Zeppelin
>>
>>
>> Jingsong Li  于2020年1月3日周五 下午4:37写道:
>>
>>> Hi Jark,
>>>
>>> +1 for default blink planner in SQL-CLI.
>>> I believe this new planner can be put into practice in production.
>>> We've worked hard for nearly a year, but the old planner didn't move
>>> on.
>>>
>>> And I'd like to cc to user@flink.apache.org.
>>> If anyone finds that blink planner has any significant defects and
>>> has a larger regression than the old planner, please let us know. We 
>>> will
>>> be very grateful.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Fri, Jan 3, 2020 at 4:14 PM Leonard Xu  wrote:
>>>
 +1 for this.
 We bring many SQL/API features and enhance stability in 1.10
 release, and almost all of them happens in Blink planner.
 SQL CLI is the most convenient entrypoint for me, I believe many
 users will have a better experience If we set Blink planner as default
 planner.

 Best,
 Leonard

 > 在 2020年1月3日,15:16,Terry Wang  写道:
 >
 > Since what blink planner can do is a superset of flink planner,
 big +1 for changing the default planner to Blink planner from my side.
 >
 > Best,
 > Terry Wang
 >
 >
 >
 >> 2020年1月3日 15:00,Jark Wu  写道:
 >>
 >> Hi everyone,
 >>
 >> In 1.10 release, Flink SQL supports many awesome features and
 improvements,
 >> including:
 >> - support watermark statement and computed column in DDL
 >> - fully support all data types in Hive
 >> - Batch SQL performance improvements (TPC-DS 7x than Hive MR)
 >> - support INSERT OVERWRITE and INSERT PARTITION
 >>
 >> However, all the features and improvements are only avaiable in
 Blink
 >> planner, not in Old planner.
 >> There are also some other features are limited in Blink planner,
 e.g.
 >> Dimension Table Join [1],
 >> TopN [2], Deduplicate [3], streaming aggregates optimization
 [4], and so on.
 >>
 >> But Old planner is still the default planner in Table API & SQL.
 It is
 >> frustrating for users to set
 >> to blink planner manually when every time start a SQL CLI. And
 it's
 >> surprising to see unsupported
 >> exception if they trying out the new features but not switch
 planner.
 >>
 >> SQL CLI is a very important entrypoint for trying out new
 feautures and
 >> prototyping for users.
 >> In order to give new planner more exposures, I would like to
 suggest to set
 >> default planner
 >> for SQL Client to Blink planner before 1.10 release.
 >>
 >> The approach is just changing the default SQL CLI yaml
 configuration[5]. In
 >> this way, the existing
 >> environment is still compatible and unaffected.
 >>
 >> Changing the default planner for the whole Table API & SQL is
 another topic
 >> and is out of scope of this discussion.
 >>
 >> What do you 

Re: How long Flink state default TTL,if I don't config the state ttl config?

2020-01-05 Thread Zhu Zhu
Yes. State TTL is by default disabled.

Thanks,
Zhu Zhu

LakeShen  于2020年1月6日周一 上午10:09写道:

> I saw the flink source code, I find the flink state ttl default is
> never expire,is it right?
>
> LakeShen  于2020年1月6日周一 上午9:58写道:
>
>> Hi community,I have a question about flink state ttl.If I don't config
>> the flink state ttl config,
>> How long the flink state retain?Is it forever retain in hdfs?
>> Thanks your replay.
>>
>


Re: How long Flink state default TTL,if I don't config the state ttl config?

2020-01-05 Thread Zhu Zhu
Yes. State TTL is by default disabled.

Thanks,
Zhu Zhu

LakeShen  于2020年1月6日周一 上午10:09写道:

> I saw the flink source code, I find the flink state ttl default is
> never expire,is it right?
>
> LakeShen  于2020年1月6日周一 上午9:58写道:
>
>> Hi community,I have a question about flink state ttl.If I don't config
>> the flink state ttl config,
>> How long the flink state retain?Is it forever retain in hdfs?
>> Thanks your replay.
>>
>


Re: [DISCUSS] Set default planner for SQL Client to Blink planner in 1.10 release

2020-01-05 Thread Zhenghua Gao
+1 for making blink planner as the default planner for SQL Client since we
have made a huge improvement in 1.10.

*Best Regards,*
*Zhenghua Gao*


On Sun, Jan 5, 2020 at 2:42 PM Benchao Li  wrote:

> +1
>
> We have used blink planner since 1.9.0 release in our production
> environment, and it behaves really impressive.
>
> Hequn Cheng  于2020年1月5日周日 下午1:58写道:
>
>> +1 to make blink planner as the default planner for SQL Client, hence we
>> can give the blink planner a bit more exposure.
>>
>> Best, Hequn
>>
>> On Fri, Jan 3, 2020 at 6:32 PM Jark Wu  wrote:
>>
>>> Hi Benoît,
>>>
>>> Thanks for the reminder. I will look into the issue and hopefully we can
>>> target it into 1.9.2 and 1.10.
>>>
>>> Cheers,
>>> Jark
>>>
>>> On Fri, 3 Jan 2020 at 18:21, Benoît Paris <
>>> benoit.pa...@centraliens-lille.org> wrote:
>>>
 >  If anyone finds that blink planner has any significant defects and
 has a larger regression than the old planner, please let us know.

 Overall, the Blink-exclusive features are must (TopN, deduplicate,
 LAST_VALUE, plan reuse, etc)! But all use cases of the Legacy planner in
 production are not covered:
 An edge case of Temporal Table Functions does not allow computed Tables
 (as opposed to TableSources) to be used on the query side in Blink (
 https://issues.apache.org/jira/browse/FLINK-14200)

 Cheers
 Ben


 On Fri, Jan 3, 2020 at 10:00 AM Jeff Zhang  wrote:

> +1, I have already made blink as the default planner of flink
> interpreter in Zeppelin
>
>
> Jingsong Li  于2020年1月3日周五 下午4:37写道:
>
>> Hi Jark,
>>
>> +1 for default blink planner in SQL-CLI.
>> I believe this new planner can be put into practice in production.
>> We've worked hard for nearly a year, but the old planner didn't move
>> on.
>>
>> And I'd like to cc to user@flink.apache.org.
>> If anyone finds that blink planner has any significant defects and
>> has a larger regression than the old planner, please let us know. We will
>> be very grateful.
>>
>> Best,
>> Jingsong Lee
>>
>> On Fri, Jan 3, 2020 at 4:14 PM Leonard Xu  wrote:
>>
>>> +1 for this.
>>> We bring many SQL/API features and enhance stability in 1.10
>>> release, and almost all of them happens in Blink planner.
>>> SQL CLI is the most convenient entrypoint for me, I believe many
>>> users will have a better experience If we set Blink planner as default
>>> planner.
>>>
>>> Best,
>>> Leonard
>>>
>>> > 在 2020年1月3日,15:16,Terry Wang  写道:
>>> >
>>> > Since what blink planner can do is a superset of flink planner,
>>> big +1 for changing the default planner to Blink planner from my side.
>>> >
>>> > Best,
>>> > Terry Wang
>>> >
>>> >
>>> >
>>> >> 2020年1月3日 15:00,Jark Wu  写道:
>>> >>
>>> >> Hi everyone,
>>> >>
>>> >> In 1.10 release, Flink SQL supports many awesome features and
>>> improvements,
>>> >> including:
>>> >> - support watermark statement and computed column in DDL
>>> >> - fully support all data types in Hive
>>> >> - Batch SQL performance improvements (TPC-DS 7x than Hive MR)
>>> >> - support INSERT OVERWRITE and INSERT PARTITION
>>> >>
>>> >> However, all the features and improvements are only avaiable in
>>> Blink
>>> >> planner, not in Old planner.
>>> >> There are also some other features are limited in Blink planner,
>>> e.g.
>>> >> Dimension Table Join [1],
>>> >> TopN [2], Deduplicate [3], streaming aggregates optimization [4],
>>> and so on.
>>> >>
>>> >> But Old planner is still the default planner in Table API & SQL.
>>> It is
>>> >> frustrating for users to set
>>> >> to blink planner manually when every time start a SQL CLI. And
>>> it's
>>> >> surprising to see unsupported
>>> >> exception if they trying out the new features but not switch
>>> planner.
>>> >>
>>> >> SQL CLI is a very important entrypoint for trying out new
>>> feautures and
>>> >> prototyping for users.
>>> >> In order to give new planner more exposures, I would like to
>>> suggest to set
>>> >> default planner
>>> >> for SQL Client to Blink planner before 1.10 release.
>>> >>
>>> >> The approach is just changing the default SQL CLI yaml
>>> configuration[5]. In
>>> >> this way, the existing
>>> >> environment is still compatible and unaffected.
>>> >>
>>> >> Changing the default planner for the whole Table API & SQL is
>>> another topic
>>> >> and is out of scope of this discussion.
>>> >>
>>> >> What do you think?
>>> >>
>>> >> Best,
>>> >> Jark
>>> >>
>>> >> [1]:
>>> >>
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
>>> >> [2]:
>>> >>
>>> 

Re: How long Flink state default TTL,if I don't config the state ttl config?

2020-01-05 Thread LakeShen
I saw the flink source code, I find the flink state ttl default is
never expire,is it right?

LakeShen  于2020年1月6日周一 上午9:58写道:

> Hi community,I have a question about flink state ttl.If I don't config the
> flink state ttl config,
> How long the flink state retain?Is it forever retain in hdfs?
> Thanks your replay.
>


Re: How long Flink state default TTL,if I don't config the state ttl config?

2020-01-05 Thread LakeShen
I saw the flink source code, I find the flink state ttl default is
never expire,is it right?

LakeShen  于2020年1月6日周一 上午9:58写道:

> Hi community,I have a question about flink state ttl.If I don't config the
> flink state ttl config,
> How long the flink state retain?Is it forever retain in hdfs?
> Thanks your replay.
>


How long Flink state default TTL,if I don't config the state ttl config?

2020-01-05 Thread LakeShen
Hi community,I have a question about flink state ttl.If I don't config the
flink state ttl config,
How long the flink state retain?Is it forever retain in hdfs?
Thanks your replay.


How long Flink state default TTL,if I don't config the state ttl config?

2020-01-05 Thread LakeShen
Hi community,I have a question about flink state ttl.If I don't config the
flink state ttl config,
How long the flink state retain?Is it forever retain in hdfs?
Thanks your replay.


Re: Controlling the Materialization of JOIN updates

2020-01-05 Thread Kurt Young
Good to hear that the patch resolved your issue, looking forward to hearing
more feedback from you!

Best,
Kurt


On Mon, Jan 6, 2020 at 5:56 AM Benoît Paris <
benoit.pa...@centraliens-lille.org> wrote:

> Hi Kurt,
>
> Thank you for your answer.
>
> Yes both fact tables and dimension tables are changing over time; it was
> to illustrate that they could change at the same time but that we could
> still make a JOIN basically ignore updates from one specified side. The SQL
> is not the actual one I'm using, and as you have said later on, I indeed
> don't deal with a time attribute and just want what's in the table at
> processing time.
>
> At the moment my problem seems to be in good way of being resolved, and it
> is going to be Option 4: "LATERAL TABLE table_function" on the Blink
> planner; as Jark Wu seems to be -elegantly- providing a patch for the
> FLINK-14200 NPE bug:
> https://github.com/apache/flink/pull/10763
> It was indeed about shenanigans on finding the proper RelOptSchema;  Ah,
> I wish I had dived sooner in the source code, and I could have had the
> pleasure opportunity to contribute to the Flink codebase.
>
> Anyway, shout out to Jark for resolving the bug and providing a patch! I
> believe this will be a real enabler for CQRS architectures on Flink (we had
> subscriptions with regular joins, and this patch enables querying the same
> thing with very minor SQL modifications)
>
> Kind regards
> Benoît
>
>
> On Sat, Jan 4, 2020 at 4:22 AM Kurt Young  wrote:
>
>> Hi Benoît,
>>
>> Before discussing all the options you listed, I'd like understand more
>> about your requirements.
>>
>> The part I don't fully understand is, both your fact (Event) and
>> dimension (DimensionAtJoinTimeX) tables are
>> coming from the same table, Event or EventRawInput in your case. So it
>> will result that both your fact and
>> dimension tables are changing with time.
>>
>> My understanding is, when your DimensionAtJoinTimeX table emit the
>> results, you don't want to change the
>> result again. You want the fact table only join whatever data currently
>> the dimension table have? I'm asking
>> because your dimension table was calculated with a window aggregation,
>> but your join logic seems doesn't
>> care about the time attribute (LEFT JOIN DimensionAtJoinTime1 d1 ON e.uid
>> = d1.uid). It's possible that
>> when a record with uid=x comes from Event table, but the dimension table
>> doesn't have any data around
>> uid=x yet due to the window aggregation. In this case, you don't want
>> them to join?
>>
>> Best,
>> Kurt
>>
>>
>> On Fri, Jan 3, 2020 at 1:11 AM Benoît Paris <
>> benoit.pa...@centraliens-lille.org> wrote:
>>
>>> Hello all!
>>>
>>> I'm trying to design a stream pipeline, and have trouble controlling
>>> when a JOIN is triggering an update:
>>>
>>> Setup:
>>>
>>>- The Event table; "probe side", "query side", the result of earlier
>>>stream processing
>>>- The DimensionAtJoinTimeX tables; of updating nature, "build side",
>>>the results of earlier stream processing
>>>
>>> Joining them:
>>>
>>> SELECT*
>>> FROM  Event e
>>> LEFT JOIN DimensionAtJoinTime1 d1
>>>   ON  e.uid = d1.uid
>>> LEFT JOIN DimensionAtJoinTime2 d2
>>>   ON  e.uid = d2.uid
>>>
>>> The DimensionAtJoinTimeX Tables being the result of earlier stream
>>> processing, possibly from the same Event table:
>>>
>>> SELECT   uid,
>>>  hop_start(...),
>>>  sum(...)
>>> FROM Event e
>>> GROUP BY uid,
>>>  hop(...)
>>>
>>> The Event Table being:
>>>
>>> SELECT ...
>>> FROM   EventRawInput i
>>> WHERE  i.some_field = 'some_value'
>>>
>>> Requirements:
>>>
>>>- I need the JOINs to only be executed once, only when a new line is
>>>appended to the probe / query / Event table.
>>>- I also need the full pipeline to be defined in SQL.
>>>- I very strongly prefer the Blink planner (mainly for
>>>Deduplication, TopN and LAST_VALUE features).
>>>
>>> Problem exploration so far:
>>>
>>>- Option 1, "FOR SYSTEM_TIME AS OF" [1]: I need to have the solution
>>>in SQL: it doesn't work out. But I might explore the following: insert
>>>DimensionAtJoinTimeX into a special Sink, and use it in a
>>>LookupableTableSource (I'm at a loss on how to do that, though. Do I need
>>>an external kv store?).
>>>- Option 2, "FOR SYSTEM_TIME AS OF" [1], used in SQL: Is there a
>>>version of "FOR SYSTEM_TIME AS OF" readily usable in SQL? I might have
>>>missed something in the documentation.
>>>- Option 3, "LATERAL TABLE table_function" [2], on the Legacy
>>>planner: It does not work with two tables [3], and I don't get to have 
>>> the
>>>Blink planner features.
>>>- Option 4, "LATERAL TABLE table_function" [2], on the Blink
>>>planner: It does not work with the "probe side" being the results of
>>>earlier stream processing [4].
>>>- Option 5, let a regular JOIN materialize the updates, and somehow
>>>find how to filter the ones coming 

Re: Duplicate tasks for the same query

2020-01-05 Thread Kurt Young
Another common skew case we've seen is null handling, the value of the join
key
is NULL. We will shuffle the NULL value into one task even if the join
condition
won't stand by definition.

For DeDuplication, I just want to make sure this behavior meets your
requirement.
Because for some other usages, users might be only interested with the
earliest
records because the updating for the same key is purely redundant, like
caused by
upstream failure and process the same data again. In that case, each key
will only have
at most one record and you won't face any join key skewing issue.

Best,
Kurt


On Mon, Jan 6, 2020 at 6:55 AM RKandoji  wrote:

> Hi Kurt,
>
> I understand what you mean, some userIds may appear more frequently than
> the others but this distribution doesn't look in proportionate with the
> data skew. Do you think of any other possible reasons or anything I can try
> out to investigate this more?
>
> For DeDuplication, I query for the latest record. Sorry I didn't follow
> above sentence, do you mean that for each update to user table the
> record(s) that were updated will be sent via retract stream.I think that's
> expected as I need to process latest records, as long as it is sending only
> the record(s) that's been updated.
>
> Thanks,
> RKandoji
>
> On Fri, Jan 3, 2020 at 9:57 PM Kurt Young  wrote:
>
>> Hi RKandoji,
>>
>> It looks like you have a data skew issue with your input data. Some or
>> maybe only one "userId" appears more frequent than others. For join
>> operator to work correctly, Flink will apply "shuffle by join key" before
>> the
>> operator, so same "userId" will go to the same sub-task to perform join
>> operation. In this case, I'm afraid there is nothing much you can do for
>> now.
>>
>> BTW, for the DeDuplicate, do you keep the latest record or the earliest?
>> If
>> you keep the latest version, Flink will tigger retraction and then send
>> the latest
>> record again every time when your user table changes.
>>
>> Best,
>> Kurt
>>
>>
>> On Sat, Jan 4, 2020 at 5:09 AM RKandoji  wrote:
>>
>>> Hi,
>>>
>>> Thanks a ton for the help with earlier questions, I updated code to
>>> version 1.9 and started using Blink Planner (DeDuplication). This is
>>> working as expected!
>>>
>>> I have a new question, but thought of asking in the same email chain as
>>> this has more context about my use case etc.
>>>
>>> Workflow:
>>> Currently I'm reading from a couple of Kafka topics, DeDuplicating the
>>> input data, performing JOINs and writing the joined data to another Kafka
>>> topic.
>>>
>>> Issue:
>>> I set Parallelism to 8 and on analyzing the subtasks found that the data
>>> is not distributed well among 8 parallel tasks for the last Join query. One
>>> of a subtask is taking huge load, whereas others taking pretty low load.
>>>
>>> Tried a couple of things below, but no use. Not sure if they are
>>> actually related to the problem as I couldn't yet understand what's the
>>> issue here.
>>> 1. increasing the number of partitions of output Kafka topic.
>>> 2. tried adding keys to output so key partitioning happens at Kafka end.
>>>
>>> Below is a snapshot for reference:
>>> [image: image.png]
>>>
>>> Below are the config changes I made:
>>>
>>> taskmanager.numberOfTaskSlots: 8
>>> parallelism.default: 8
>>> jobmanager.heap.size: 5000m
>>> taskmanager.heap.size: 5000m
>>> state.backend: rocksdb
>>> state.checkpoints.dir: file:///Users/username/flink-1.9.1/checkpoints
>>> state.backend.incremental: true
>>>
>>> I don't see any errors and job seems to be running smoothly (and
>>> slowly). I need to make it distribute the load well for faster processing,
>>> any pointers on what could be wrong and how to fix it would be very helpful.
>>>
>>> Thanks,
>>> RKandoji
>>>
>>>
>>> On Fri, Jan 3, 2020 at 1:06 PM RKandoji  wrote:
>>>
 Thanks!

 On Thu, Jan 2, 2020 at 9:45 PM Jingsong Li 
 wrote:

> Yes,
>
> 1.9.2 or Coming soon 1.10
>
> Best,
> Jingsong Lee
>
> On Fri, Jan 3, 2020 at 12:43 AM RKandoji  wrote:
>
>> Ok thanks, does it mean version 1.9.2 is what I need to use?
>>
>> On Wed, Jan 1, 2020 at 10:25 PM Jingsong Li 
>> wrote:
>>
>>> Blink planner was introduced in 1.9. We recommend use blink planner
>>> after 1.9.
>>> After some bug fix, I think the latest version of 1.9 is OK. The
>>> production environment has also been set up in some places.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Wed, Jan 1, 2020 at 3:24 AM RKandoji  wrote:
>>>
 Thanks Jingsong and Kurt for more details.

 Yes, I'm planning to try out DeDuplication when I'm done upgrading
 to version 1.9. Hopefully deduplication is done by only one task and 
 reused
 everywhere else.

 One more follow-up question, I see "For production use cases, we
 recommend the old planner that was present before Flink 1.9 for now." 
 warning
 

Re: Duplicate tasks for the same query

2020-01-05 Thread RKandoji
Hi Kurt,

I understand what you mean, some userIds may appear more frequently than
the others but this distribution doesn't look in proportionate with the
data skew. Do you think of any other possible reasons or anything I can try
out to investigate this more?

For DeDuplication, I query for the latest record. Sorry I didn't follow
above sentence, do you mean that for each update to user table the
record(s) that were updated will be sent via retract stream.I think that's
expected as I need to process latest records, as long as it is sending only
the record(s) that's been updated.

Thanks,
RKandoji

On Fri, Jan 3, 2020 at 9:57 PM Kurt Young  wrote:

> Hi RKandoji,
>
> It looks like you have a data skew issue with your input data. Some or
> maybe only one "userId" appears more frequent than others. For join
> operator to work correctly, Flink will apply "shuffle by join key" before
> the
> operator, so same "userId" will go to the same sub-task to perform join
> operation. In this case, I'm afraid there is nothing much you can do for
> now.
>
> BTW, for the DeDuplicate, do you keep the latest record or the earliest? If
> you keep the latest version, Flink will tigger retraction and then send
> the latest
> record again every time when your user table changes.
>
> Best,
> Kurt
>
>
> On Sat, Jan 4, 2020 at 5:09 AM RKandoji  wrote:
>
>> Hi,
>>
>> Thanks a ton for the help with earlier questions, I updated code to
>> version 1.9 and started using Blink Planner (DeDuplication). This is
>> working as expected!
>>
>> I have a new question, but thought of asking in the same email chain as
>> this has more context about my use case etc.
>>
>> Workflow:
>> Currently I'm reading from a couple of Kafka topics, DeDuplicating the
>> input data, performing JOINs and writing the joined data to another Kafka
>> topic.
>>
>> Issue:
>> I set Parallelism to 8 and on analyzing the subtasks found that the data
>> is not distributed well among 8 parallel tasks for the last Join query. One
>> of a subtask is taking huge load, whereas others taking pretty low load.
>>
>> Tried a couple of things below, but no use. Not sure if they are actually
>> related to the problem as I couldn't yet understand what's the issue here.
>> 1. increasing the number of partitions of output Kafka topic.
>> 2. tried adding keys to output so key partitioning happens at Kafka end.
>>
>> Below is a snapshot for reference:
>> [image: image.png]
>>
>> Below are the config changes I made:
>>
>> taskmanager.numberOfTaskSlots: 8
>> parallelism.default: 8
>> jobmanager.heap.size: 5000m
>> taskmanager.heap.size: 5000m
>> state.backend: rocksdb
>> state.checkpoints.dir: file:///Users/username/flink-1.9.1/checkpoints
>> state.backend.incremental: true
>>
>> I don't see any errors and job seems to be running smoothly (and slowly).
>> I need to make it distribute the load well for faster processing, any
>> pointers on what could be wrong and how to fix it would be very helpful.
>>
>> Thanks,
>> RKandoji
>>
>>
>> On Fri, Jan 3, 2020 at 1:06 PM RKandoji  wrote:
>>
>>> Thanks!
>>>
>>> On Thu, Jan 2, 2020 at 9:45 PM Jingsong Li 
>>> wrote:
>>>
 Yes,

 1.9.2 or Coming soon 1.10

 Best,
 Jingsong Lee

 On Fri, Jan 3, 2020 at 12:43 AM RKandoji  wrote:

> Ok thanks, does it mean version 1.9.2 is what I need to use?
>
> On Wed, Jan 1, 2020 at 10:25 PM Jingsong Li 
> wrote:
>
>> Blink planner was introduced in 1.9. We recommend use blink planner
>> after 1.9.
>> After some bug fix, I think the latest version of 1.9 is OK. The
>> production environment has also been set up in some places.
>>
>> Best,
>> Jingsong Lee
>>
>> On Wed, Jan 1, 2020 at 3:24 AM RKandoji  wrote:
>>
>>> Thanks Jingsong and Kurt for more details.
>>>
>>> Yes, I'm planning to try out DeDuplication when I'm done upgrading
>>> to version 1.9. Hopefully deduplication is done by only one task and 
>>> reused
>>> everywhere else.
>>>
>>> One more follow-up question, I see "For production use cases, we
>>> recommend the old planner that was present before Flink 1.9 for now." 
>>> warning
>>> here
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/
>>> This is actually the reason why started with version 1.8, could you
>>> please let me know your opinion about this? and do you think there is 
>>> any
>>> production code running on version 1.9
>>>
>>> Thanks,
>>> Reva
>>>
>>>
>>>
>>>
>>> On Mon, Dec 30, 2019 at 9:02 PM Kurt Young  wrote:
>>>
 BTW, you could also have a more efficient version of deduplicating
 user table by using the topn feature [1].

 Best,
 Kurt

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#top-n


 On Tue, Dec 31, 2019 at 9:24 AM Jingsong Li 
 wrote:

Re: Controlling the Materialization of JOIN updates

2020-01-05 Thread Benoît Paris
Hi Kurt,

Thank you for your answer.

Yes both fact tables and dimension tables are changing over time; it was to
illustrate that they could change at the same time but that we could still
make a JOIN basically ignore updates from one specified side. The SQL is
not the actual one I'm using, and as you have said later on, I indeed don't
deal with a time attribute and just want what's in the table at processing
time.

At the moment my problem seems to be in good way of being resolved, and it
is going to be Option 4: "LATERAL TABLE table_function" on the Blink
planner; as Jark Wu seems to be -elegantly- providing a patch for the
FLINK-14200 NPE bug:
https://github.com/apache/flink/pull/10763
It was indeed about shenanigans on finding the proper RelOptSchema;  Ah, I
wish I had dived sooner in the source code, and I could have had the
pleasure opportunity to contribute to the Flink codebase.

Anyway, shout out to Jark for resolving the bug and providing a patch! I
believe this will be a real enabler for CQRS architectures on Flink (we had
subscriptions with regular joins, and this patch enables querying the same
thing with very minor SQL modifications)

Kind regards
Benoît


On Sat, Jan 4, 2020 at 4:22 AM Kurt Young  wrote:

> Hi Benoît,
>
> Before discussing all the options you listed, I'd like understand more
> about your requirements.
>
> The part I don't fully understand is, both your fact (Event) and dimension
> (DimensionAtJoinTimeX) tables are
> coming from the same table, Event or EventRawInput in your case. So it
> will result that both your fact and
> dimension tables are changing with time.
>
> My understanding is, when your DimensionAtJoinTimeX table emit the
> results, you don't want to change the
> result again. You want the fact table only join whatever data currently
> the dimension table have? I'm asking
> because your dimension table was calculated with a window aggregation, but
> your join logic seems doesn't
> care about the time attribute (LEFT JOIN DimensionAtJoinTime1 d1 ON e.uid
> = d1.uid). It's possible that
> when a record with uid=x comes from Event table, but the dimension table
> doesn't have any data around
> uid=x yet due to the window aggregation. In this case, you don't want them
> to join?
>
> Best,
> Kurt
>
>
> On Fri, Jan 3, 2020 at 1:11 AM Benoît Paris <
> benoit.pa...@centraliens-lille.org> wrote:
>
>> Hello all!
>>
>> I'm trying to design a stream pipeline, and have trouble controlling when
>> a JOIN is triggering an update:
>>
>> Setup:
>>
>>- The Event table; "probe side", "query side", the result of earlier
>>stream processing
>>- The DimensionAtJoinTimeX tables; of updating nature, "build side",
>>the results of earlier stream processing
>>
>> Joining them:
>>
>> SELECT*
>> FROM  Event e
>> LEFT JOIN DimensionAtJoinTime1 d1
>>   ON  e.uid = d1.uid
>> LEFT JOIN DimensionAtJoinTime2 d2
>>   ON  e.uid = d2.uid
>>
>> The DimensionAtJoinTimeX Tables being the result of earlier stream
>> processing, possibly from the same Event table:
>>
>> SELECT   uid,
>>  hop_start(...),
>>  sum(...)
>> FROM Event e
>> GROUP BY uid,
>>  hop(...)
>>
>> The Event Table being:
>>
>> SELECT ...
>> FROM   EventRawInput i
>> WHERE  i.some_field = 'some_value'
>>
>> Requirements:
>>
>>- I need the JOINs to only be executed once, only when a new line is
>>appended to the probe / query / Event table.
>>- I also need the full pipeline to be defined in SQL.
>>- I very strongly prefer the Blink planner (mainly for Deduplication,
>>TopN and LAST_VALUE features).
>>
>> Problem exploration so far:
>>
>>- Option 1, "FOR SYSTEM_TIME AS OF" [1]: I need to have the solution
>>in SQL: it doesn't work out. But I might explore the following: insert
>>DimensionAtJoinTimeX into a special Sink, and use it in a
>>LookupableTableSource (I'm at a loss on how to do that, though. Do I need
>>an external kv store?).
>>- Option 2, "FOR SYSTEM_TIME AS OF" [1], used in SQL: Is there a
>>version of "FOR SYSTEM_TIME AS OF" readily usable in SQL? I might have
>>missed something in the documentation.
>>- Option 3, "LATERAL TABLE table_function" [2], on the Legacy
>>planner: It does not work with two tables [3], and I don't get to have the
>>Blink planner features.
>>- Option 4, "LATERAL TABLE table_function" [2], on the Blink planner:
>>It does not work with the "probe side" being the results of earlier stream
>>processing [4].
>>- Option 5, let a regular JOIN materialize the updates, and somehow
>>find how to filter the ones coming from the build sides (I'm at a loss on
>>how to do that).
>>- Option 6, "TVR": I read this paper [5], which mentions
>>"Time-Varying Relation"s; Speculating here: could there be a way, to say
>>that the build side is not a TVR. Aka declare the stream as being somehow
>>"static", while still being updated (but I guess we're back to 

Re: 回复:使用influxdb作为flink metrics reporter

2020-01-05 Thread Yun Tang
Hi 张江

这个invalid boolean 
一般是tag和field中间穿插空格有关,导致influxDB识别匹配的时候出了问题,你的原始报错信息是什么,不要隐去你的operator name和task 
name,另外task_id= 后面的那个空格是你粘贴时候的错误还是原先就是这样。

最后,这些只会是warning,不会导致你的其他metrics数据无法插入,不影响整体使用。

祝好
唐云


From: 张江 
Sent: Saturday, January 4, 2020 19:14
To: user-zh ; myas...@live.com 
Subject: 回复:使用influxdb作为flink metrics reporter


你好,


我看我这里报错的问题是invalid boolean,并不是NaN/infinity value造成的,不知道是什么原因?


而且我用的flink是1.9.1版本,influxdb是1.7.9版本。


祝好,


[https://mail-online.nosdn.127.net/qiyelogo/defaultAvatar.png]
张江
邮箱:zjkingdom2...@163.com

签名由 网易邮箱大师 定制

在2020年01月04日 00:56,Yun Tang 写道:
Hi 张江


 *   Retention policy 需要现在InfluxDB端创建,InfluxDBReporter不会自行创建不存在的 retention 
policy.
 *   kafka的一些metrics在使用influxDB reporter的时候,会出现一些cast exception,可以参考 
[1],在Flink-1.9 版本下可以忽略这些异常。

[1] https://issues.apache.org/jira/browse/FLINK-12147

祝好
唐云

From: 张江 
Sent: Friday, January 3, 2020 21:22
To: user-zh@flink.apache.org 
Subject: 使用influxdb作为flink metrics reporter

大家好,


我按照官网所介绍的flink metrics reporter设置,选用了influxdb,进行了如下设置:
metrics.reporter.influxdb.class:org.apache.flink.metrics.influxdb.InfluxdbReportermetrics.reporter.influxdb.host:localhostmetrics.reporter.influxdb.port:8086metrics.reporter.influxdb.db:flinkmetrics.reporter.influxdb.username:flink-metrics
metrics.reporter.influxdb.password:qwerty
metrics.reporter.influxdb.retentionPolicy:one_hour
但是,启动flink作业(on yarn per job模式)和flinxdb后,发现一直报错:
error  [500] - "retention policy not found: one_hour" {"log_id": "OK6nejJI000", 
"service": "httpd"} [httpd] 10.90.*.* - flinkuser [03/Jan/2020:19:35:58 +0800] 
"POST /write? db=flink=one_hour=n=one HTTP/1.1" 500 49 
"-" "okhttp/3.11.0" 3637af63-2e1d-11ea-802a-000c2947e206 165


我使用的是 flink 1.9.1,influxdb版本是1.79.


而且,当我不设置retentionPolicy时,还是会报错,提示:
org.apache.flink.metrics.influxdb.shaded.org.influxdb.InfluxDBException$UnableToParseException:
 partial write: unable to parse 
"taskmanager_job_task_operator_sync-time-avg,host=master,job_id=03136f4c1a78e9930262b455ef0657e2,job_name=Flink-app,operator_id=cbc357ccb763df2852fee8c4fc7d55f2,operator_name=XXX,task_attempt_num=0,task_id=
 
cbc357ccb763df2852fee8c4fc7d55f2,task_name=XX,tm_id=container_1577507646998_0054_01_02
 value=? 157805124760500": invalid boolean


求问各位大佬,这些问题怎么解决?
谢谢


祝好,





Re: Stateful functions and modules

2020-01-05 Thread Dan Pettersson
Ok, good. Thanks for your response.

/Dan

Den sön 5 jan. 2020 11:52Igal Shilman  skrev:

> Hi Dan,
>
> Having a class that defines only the function types indeed makes sense,
> this would lower the coupling between the (maven) module that contains the
> function implementation and the (maven) module that uses it.
> You can peek here for example:
>
>
> https://github.com/ververica/stateful-functions/blob/master/stateful-functions-examples/stateful-functions-shopping-cart-example/src/main/java/com/ververica/statefun/examples/shoppingcart/Identifiers.java#L28
>
> Igal.
>
> On Saturday, January 4, 2020, Dan Pettersson 
> wrote:
>
>> Hi,
>>
>> What is the preferred way to expose functions (FunctionTypes) between
>> modules?
>>
>> For example lets say i have the following maven modules:
>>
>> -> common
>> -> statistics
>> -> persister
>> -> tradematching
>>
>> and I have some FunctionTypes in common and persister module that should
>> be reach
>> from all modules.
>>
>> common module is included everywhere so I thought about creating
>> a helper class, CallableFunctions, and there specify the FunctionTypes
>> that can
>> be called from all modules. Is this the right approach or is there a
>> better way?
>>
>> Regards
>> Dan
>>
>


Re: Stateful functions and modules

2020-01-05 Thread Igal Shilman
Hi Dan,

Having a class that defines only the function types indeed makes sense,
this would lower the coupling between the (maven) module that contains the
function implementation and the (maven) module that uses it.
You can peek here for example:

https://github.com/ververica/stateful-functions/blob/master/stateful-functions-examples/stateful-functions-shopping-cart-example/src/main/java/com/ververica/statefun/examples/shoppingcart/Identifiers.java#L28

Igal.

On Saturday, January 4, 2020, Dan Pettersson 
wrote:

> Hi,
>
> What is the preferred way to expose functions (FunctionTypes) between
> modules?
>
> For example lets say i have the following maven modules:
>
> -> common
> -> statistics
> -> persister
> -> tradematching
>
> and I have some FunctionTypes in common and persister module that should
> be reach
> from all modules.
>
> common module is included everywhere so I thought about creating
> a helper class, CallableFunctions, and there specify the FunctionTypes
> that can
> be called from all modules. Is this the right approach or is there a
> better way?
>
> Regards
> Dan
>