Is that possible to specify join algorithm hint in Flink SQL

2019-04-15 Thread yinhua.dai
Hi team, I know we can specify the join algorithm hint with dataset API https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/dataset_transformations.html#join-algorithm-hints But wondering if this is possible to support with the SQL API? We have market data with a currency id(a

Re: RemoteTransportException: Connection unexpectedly closed by remote task manager

2019-03-28 Thread yinhua.dai
I have put the task manager of the data sink log to https://gist.github.com/yinhua2018/7de42ff9c1738d5fdf9d99030db903e2 -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: RemoteTransportException: Connection unexpectedly closed by remote task manager

2019-03-28 Thread yinhua.dai
Hi Qi, I checked the JVM heap of the sink TM is low. I tried to read flink source code to identify where is exact the error happen. I think the exception happened inside DataSinkTask.invoke() // work! while (!this.taskCanceled &&

RemoteTransportException: Connection unexpectedly closed by remote task manager

2019-03-28 Thread yinhua.dai
Hi, I write a single flink job with flink SQL with version 1.6.1 I have one table source which read data from a database, and one table sink to output as avro format file. The table source has parallelism of 19, and table sink only has parallelism of 1. But there is always

Execution sequence for slot sharing

2019-03-26 Thread yinhua.dai
Hi Community, Can anyone help me understand the execution sequence in batch mode? 1. Can I set slot isolation in batch mode? I can only find the slotSharingGroup API in streaming mode. 2. When multiple data source parallel instances are allocated to the same slot, how does flink run those data

Job crashed very soon

2019-03-21 Thread yinhua.dai
Hi Community, I was trying to run a big batch job which use JDBCInputFormat to retrieve a large amount data from a mysql database and do some joins in flink, the environment is AWS EMR. But it always failed very fast. I'm using flink on yarn, flink 1.6.1 my cluster has 1000GB memory, my job

Re: What should I take care if I enable object reuse

2019-03-17 Thread yinhua.dai
Get it, thanks guys -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: What should I take care if I enable object reuse

2019-03-14 Thread yinhua.dai
Hi Elias, Thanks. Would it be good enough as long as we use always use different object when call the Collector.collect() method in the operator? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

What should I take care if I enable object reuse

2019-03-13 Thread yinhua.dai
Hi Community, I saw from the document that we need to be careful about enable the object reuse feature. So which part should I check to avoid any issues? Can any one help to summarize? Thank you. // *enableObjectReuse() / disableObjectReuse()* By default, objects are not reused in Flink.

Re: flink submit job rest api classpath jar

2019-02-17 Thread yinhua.dai
Maybe you could consider to put your udf jar to flink/lib before job submission. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Limit in batch flink sql job

2019-02-12 Thread yinhua.dai
OK, thanks. It might be better to update the document which has the following example that confused me. SELECT * FROM Orders LIMIT 3 -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Limit in batch flink sql job

2019-02-11 Thread yinhua.dai
Why flink said "Limiting the result without sorting is not allowed as it could lead to arbitrary results" when I use limit in batch mode? SELECT * FROM csvSource limit 10; -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Get UnknownTaskExecutorException when I add a new configuration in flink-conf.yaml

2019-01-31 Thread yinhua.dai
Hi Community,I added below item in flink-conf.yaml, and I saw UnknownTaskExecutorException each time when I start flink in Windows via start-cluster.bat.*fs.s3a.aws.credentials.provider: com.tr.apt.sqlengine.tables.aws.HadoopAWSCredentialsProviderChain*I'm sure that this new configuration is

Re: Is there a way to get all flink build-in SQL functions

2019-01-25 Thread yinhua.dai
Thanks Guys. I just wondering if there is another way except hard code the list:) Thanks anyway. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Is there a way to get all flink build-in SQL functions

2019-01-22 Thread yinhua.dai
I would like to put this list to the our self service flink SQL web UI. Thanks. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: The way to write a UDF with generic type

2019-01-08 Thread yinhua.dai
Get it, thanks. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: How to get the temp result of each dynamic table when executing Flink-SQL?

2019-01-07 Thread yinhua.dai
In our case, we wrote a console table sink which print everything on the console, and use "insert into" to write the interim result to console. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: The way to write a UDF with generic type

2019-01-07 Thread yinhua.dai
Hi Timo, Can you let me know how the build-in "MAX" function able to support different types? Thanks. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: The way to write a UDF with generic type

2019-01-06 Thread yinhua.dai
Hi Timo, But getResultType should only return a concrete type information, right? How could I implement with a generic type? I'd like to clarify my questions again. Say I want to implement my own "MAX" function, but I want to apply it to different types, e.g. integer, long, double etc, so I

Re: The way to write a UDF with generic type

2019-01-04 Thread yinhua.dai
Hi Chesnay, Maybe you misunderstand my question. I have below code: public class MyMaxAggregation extends AggregateFunction { @Override public MyAccumulator createAccumulator() { return new MyAccumulator(); } @Override public T getValue(MyAccumulator accumulator) { return null;

The way to write a UDF with generic type

2019-01-03 Thread yinhua.dai
Hi Community, I tried to write a UDF with generic type, but seems that Flink will complain not recognizing the type information when I use it in SQL. I checked the implementation of native function "MAX" and realize that it's not using the same API(AggregationFunction e.g.) as user defined

Re: Flink SQL client always cost 2 minutes to submit job to a local cluster

2019-01-03 Thread yinhua.dai
Hi Fabian, It's the submission of the jar file cost too long time. And yes Hequn and your suggestion is working, but just curious why a 100M jar files causes so long time to submit, is it related with some upload parameter settings of the web layer? -- Sent from:

Re: Flink SQL client always cost 2 minutes to submit job to a local cluster

2018-12-30 Thread yinhua.dai
I have to do that for now, however I have to find another way because the jar some times get update and the flink cluster will be remotely in future. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Flink SQL client always cost 2 minutes to submit job to a local cluster

2018-12-27 Thread yinhua.dai
I am using Flink 1.6.1, I tried to use flink sql client with some own jars with --jar and --library. It can work to execute sql query, however it always cause around 2 minutes to submit the job the local cluster, but when I copy my jar to flink lib, and remove --jar and --library parameter, it can

Re: how to override s3 key config in flink job

2018-11-27 Thread yinhua.dai
It might be difficult as you the task manager and job manager are pre-started in a session mode. It seems that flink http server will always use the configuration that you specified when you start your flink cluster, i.e. start-cluster.sh, I don't find a way to override it. -- Sent from:

Re: how to override s3 key config in flink job

2018-11-26 Thread yinhua.dai
Which flink version are you using. I know how it works in yarn, but not very clear with standalone mode. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: where can I see logs from code

2018-11-26 Thread yinhua.dai
The code running in your main method will output to flink cli log, others like map function etc will output to task manager log. Are you saying that you only see flink code in http://SERVERADD/#/taskmanager/TM_ID/log? It might be useful if you elaborate your environment. -- Sent from:

Re: your advice please regarding state

2018-11-26 Thread yinhua.dai
General approach#1 is ok, but you may have to use some hash based key selector if you have a heavy data skew. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: how to override s3 key config in flink job

2018-11-26 Thread yinhua.dai
Did you try "-Dkey=value"? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: TaskManager & task slots

2018-11-21 Thread yinhua.dai
OK, thanks. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: About the issue caused by flink's dependency jar package submission method

2018-11-20 Thread yinhua.dai
As far as I know, -yt works for both job manager and task manager, -C works for flink cli. Did you consider putting all your jars in /flink/lib? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: TaskManager & task slots

2018-11-20 Thread yinhua.dai
Hi Fabian, Is below description still remain the same in Flink 1.6? Slots do not guard CPU time, IO, or JVM memory. At the moment they only isolate managed memory which is only used for batch processing. For streaming applications their only purpose is to limit the number of parallel threads

Re: Always trigger calculation of a tumble window in Flink SQL

2018-11-08 Thread yinhua.dai
I am able to write a single operator as you suggested, thank you. And then I saw ContinuousProcessingTimeTrigger from flink source code, it looks like it's something that I am looking for, if there is a way that I can customize the SQL "TUMBLE" window to use this trigger instead of

Re: Always trigger calculation of a tumble window in Flink SQL

2018-11-08 Thread yinhua.dai
Hi Piotr, Thank you for your explanation. I basically understand your meaning, as far as my understanding, we can write a custom window assigner and custom trigger, and we can register the timer when the window process elements. But How can we register a timer when no elements received during

Re: Always trigger calculation of a tumble window in Flink SQL

2018-11-08 Thread yinhua.dai
Hi Piotr, Thank you for your explanation. I basically understand your meaning, as far as my understanding, we can write a custom window assigner and custom trigger, and we can register the timer when the window process elements. But How can we register a timer when no elements received during

Re: Always trigger calculation of a tumble window in Flink SQL

2018-11-08 Thread yinhua.dai
Hi Piotr, Thank you for your explanation. I basically understand your meaning, as far as my understanding, we can write a custom window assigner and custom trigger, and we can register the timer when the window process elements. But How can we register a timer when no elements received during a

Re: Always trigger calculation of a tumble window in Flink SQL

2018-11-08 Thread yinhua.dai
Hi Piotr, Thank you for your explanation. I basically understand your meaning, as far as my understanding, we can write a custom window assigner and custom trigger, and we can register the timer when the window process elements. But How can we register a timer when no elements received during a

Re: Always trigger calculation of a tumble window in Flink SQL

2018-11-06 Thread yinhua.dai
Hi Piotr, Can you elaborate more on the solution with the custom operator? I don't think there will be any records from the SQL query if no input data in coming in within the time window even if we convert the result to a datastream. -- Sent from:

Always trigger calculation of a tumble window in Flink SQL

2018-11-05 Thread yinhua.dai
We have a requirement that always want to trigger a calculation on a timer basis e.g. every 1 minute. *If there are records come in flink during the time window then calculate it with the normal way, i.e. aggregate for each record and getResult() at end of the time window.* *If there are no

Re: How to configure TaskManager's JVM options through cmdline?

2018-10-23 Thread yinhua.dai
You can try with *-yD env.java.opts.taskmanager="-XX:+UseConcMarkSweepGC"* if you are running fink on yarn. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Reverse the order of fields in Flink SQL

2018-10-23 Thread yinhua.dai
Hi Timo, I write simple testing code for the issue, please checkout https://gist.github.com/yinhua-dai/143304464270afd19b6a926531f9acb1 I write a custom table source which just use RowCsvInputformat to create the dataset, and use the provided CsvTableSink, and can reproduce the issue. -- Sent

Reverse the order of fields in Flink SQL

2018-10-23 Thread yinhua.dai
I write a customized table source, and it emits some fields let's say f1, f2. And then I just write to a sink with a reversed order of fields, as below: *select f2, f1 from customTableSource* And I found that it actually doesn't do the field reverse. Then I tried with flink provided

How to access Ftp server with passive mode

2018-10-18 Thread yinhua.dai
I am using flink to download and process a big file from a remote ftp server in AWS EMR. As flink supports ftp protocol with hadoop ftp file system, so I use the CSVInputFormat with a ftp address(ftp://user:pass@server/path/file). It works correct in my local machine, but when I run the job in

Re: User jar is present in the flink job manager's class path

2018-10-11 Thread yinhua.dai
Hi Gary, Yes you are right, we are using the attach mode. I will try to put my jar to flink/lib to get around with the issue. Thanks. I will open a jira for the discrepancy for flink 1.3 and 1.5, thanks a lot. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: User jar is present in the flink job manager's class path

2018-10-11 Thread yinhua.dai
Meanwhile, I can see below code in flink 1.5 public static final ConfigOption CLASSPATH_INCLUDE_USER_JAR = key("yarn.per-job-cluster.include-user-jar") .defaultValue("ORDER") .withDescription("Defines whether user-jars are included

Re: User jar is present in the flink job manager's class path

2018-10-11 Thread yinhua.dai
Hi Timo, I didn't tried to configure the classloader order, according to the document, it should only be needed for yarn-session mode, right? I can see the ship files(-yt /path/dir/) is present in job manager's class path, so maybe I should put my uber jar in the -yt path so that it will be

User jar is present in the flink job manager's class path

2018-10-11 Thread yinhua.dai
We have some customized log4j layout implementation so we need flink job manager/task manager be able to load the logger implementation which is packaged in the uber jar. However, we noticed that in flink 1.3, the user jar is put at the beginning of job manager, when we do the same again in flink

UnsatisfiedLinkError when using flink-s3-fs-hadoop

2018-09-10 Thread yinhua.dai
Hi, I have experience UnsatisfiedLinkError when I tried to use flink-s3-fs-hadoop to sink to s3 in my local Windows machine. I googled and tried several solutions like download hadoop.dll and winutils.exe, set up HADOOP_HOME and PATH environment variables, copy hadoop.dll to C:\Windows\System32,

Re: Facing Issues while migrating to S3A

2018-09-09 Thread yinhua.dai
Hi, I am still have the same problem, googled many ways but still failed. I have downloaded and added hadoop.dll and winutils.exe to class path. To verify that is working, I called "System.loadLibrary("haddop")" at the beginning of my java program and it succeed. BTW: I run my program in