[GitHub] flink issue #3769: [FLINK-6367] support custom header settings of allow orig...
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3769 hi, @zentol @StephanEwen It didn't merge in. @zentol Please check again. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3769: [FLINK-6367] support custom header settings of all...
Github user shijinkui closed the pull request at: https://github.com/apache/flink/pull/3769 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3769: [FLINK-6367] support custom header settings of all...
GitHub user shijinkui reopened a pull request: https://github.com/apache/flink/pull/3769 [FLINK-6367] support custom header settings of allow origin `jobmanager.web.access-control-allow-origin`: Enable custom access control parameter for allow origin header, default is `*`. - [X] General - The pull request references the related JIRA issue ("[FLINK-6367] support custom header settings of allow origin") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [X] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/hwstreaming/flink allow_origin Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3769.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3769 commit b6815edca3d38fed5da32175c32170fbbef084a0 Author: shijinkui <shijin...@huawei.com> Date: 2017-04-25T12:09:15Z [FLINK-6367] support custom header settings of allow origin commit 62cfc7dd5e0575ca64fda2d9b13c4281550383bf Author: Jinkui Shi <shijin...@huawei.com> Date: 2017-04-25T15:18:38Z use ConfigOption wrap key and default value commit bd295833a038ebce45db921b843863d4221cd25d Author: Jinkui Shi <shijin...@huawei.com> Date: 2017-04-25T23:20:42Z code line format --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3777: [FLINK-6387] [webfrontend]Flink UI support access log
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3777 @StephanEwen @zentol The Flink webmonitor is netty inside. In generally, Netty is used as a tcp transfer. For a web or API server, jetty or Playframework maybe more suitable. Especially we must fill much gaps and holes on web security if choose netty as web framework. I think we should re-consider whether Netty is suitable? Current Flink UI is hard to extend. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3777: [FLINK-6387] [webfrontend]Flink UI support access log
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3777 > I can see this flooding the logs like crazy, especially with things like metrics and watermarks that update often. Some kind of filtering is probably necessary here. You are right. The crazy log can separated into a new log file. > This seems to be about auditing, so completely different... The main purpose is record every access, like auditing log. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3777: [FLINK-6387] [webfrontend]Flink UI support access ...
GitHub user shijinkui opened a pull request: https://github.com/apache/flink/pull/3777 [FLINK-6387] [webfrontend]Flink UI support access log Record the use request to the access log. Append use access to the log file. - [X] General - The pull request references the related JIRA issue ("[FLINK-6387] [webfrontend]Flink UI support access log") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [X] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/hwstreaming/flink access_log_support Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3777.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3777 commit 0d19fb95072c90125152513c9b2a07b518d16b27 Author: shijinkui <shijin...@huawei.com> Date: 2017-02-23T12:06:43Z [FLINK-6387] [webfrontend]Flink UI support access log --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6387) Flink UI support access log
shijinkui created FLINK-6387: Summary: Flink UI support access log Key: FLINK-6387 URL: https://issues.apache.org/jira/browse/FLINK-6387 Project: Flink Issue Type: Improvement Components: Webfrontend Reporter: shijinkui Assignee: shijinkui Record the use request to the access log. Append use access to the log file. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3769: [FLINK-6367] support custom header settings of allow orig...
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3769 @zentol fix that --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3769: [FLINK-6367] support custom header settings of all...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3769#discussion_r113226235 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java --- @@ -701,6 +701,9 @@ @Deprecated public static final String JOB_MANAGER_WEB_BACK_PRESSURE_DELAY = "jobmanager.web.backpressure.delay-between-samples"; + /** Web response header of Access-Control-Allow-Origin */ + public static final String JOB_MANAGER_WEB_ACCESS_CONTROL_ALLOW_ORIGIN = "jobmanager.web.access-control-allow-origin"; --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-6367) support custom header settings of allow origin
[ https://issues.apache.org/jira/browse/FLINK-6367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui reassigned FLINK-6367: Assignee: shijinkui > support custom header settings of allow origin > -- > > Key: FLINK-6367 > URL: https://issues.apache.org/jira/browse/FLINK-6367 > Project: Flink > Issue Type: Sub-task > Components: Webfrontend >Reporter: shijinkui > Assignee: shijinkui > > `jobmanager.web.access-control-allow-origin`: Enable custom access control > parameter for allow origin header, default is `*`. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3769: [FLINK-6367] support custom header settings of all...
GitHub user shijinkui opened a pull request: https://github.com/apache/flink/pull/3769 [FLINK-6367] support custom header settings of allow origin `jobmanager.web.access-control-allow-origin`: Enable custom access control parameter for allow origin header, default is `*`. - [X] General - The pull request references the related JIRA issue ("[FLINK-6367] support custom header settings of allow origin") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [X] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/hwstreaming/flink allow_origin Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3769.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3769 commit b6815edca3d38fed5da32175c32170fbbef084a0 Author: shijinkui <shijin...@huawei.com> Date: 2017-04-25T12:09:15Z [FLINK-6367] support custom header settings of allow origin --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6367) support custom header settings of allow origin
[ https://issues.apache.org/jira/browse/FLINK-6367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15982764#comment-15982764 ] shijinkui commented on FLINK-6367: -- [~greghogan] We need configure some special allow_origin. For example, flink set the yarn url as flink allow origin so that forbidden the other urls. > support custom header settings of allow origin > -- > > Key: FLINK-6367 > URL: https://issues.apache.org/jira/browse/FLINK-6367 > Project: Flink > Issue Type: Sub-task > Components: Webfrontend >Reporter: shijinkui > > `jobmanager.web.access-control-allow-origin`: Enable custom access control > parameter for allow origin header, default is `*`. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6367) support custom header settings of allow origin
shijinkui created FLINK-6367: Summary: support custom header settings of allow origin Key: FLINK-6367 URL: https://issues.apache.org/jira/browse/FLINK-6367 Project: Flink Issue Type: Sub-task Components: Webfrontend Reporter: shijinkui `jobmanager.web.access-control-allow-origin`: Enable custom access control parameter for allow origin header, default is `*`. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-6193) Flink dist directory normalize
[ https://issues.apache.org/jira/browse/FLINK-6193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui closed FLINK-6193. Resolution: Fixed > Flink dist directory normalize > -- > > Key: FLINK-6193 > URL: https://issues.apache.org/jira/browse/FLINK-6193 > Project: Flink > Issue Type: Improvement > Components: Examples >Reporter: shijinkui > > The Flink distribution's directory have no very clear responsibility about > what type of files should be in which directory. For example, "opt" > directories are mixed with library jars and example jars. > This mail here: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-dist-directory-management-td16784.html > After discuss, we determine for the distribution directory style below: > - "examples" directory only contain example jars > - "opt" directory only contain optional library jars in runtime > - "lib" directory only contain library jar that must be loaded at runtime > - "resources" directory only contain resource file used at runtime, such as > web file -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #2460: [FLINK-4562] table examples make an divided module in fli...
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/2460 @fhueske Thanks : ) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2460: [FLINK-4562] table examples make an divided module in fli...
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/2460 > Hi @shijinkui, yes that sounds good to me. Thanks. When will we merge this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5902) Some images can not show in IE
[ https://issues.apache.org/jira/browse/FLINK-5902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15958261#comment-15958261 ] shijinkui commented on FLINK-5902: -- hi, [~ajithshetty] do we have some solution? > Some images can not show in IE > -- > > Key: FLINK-5902 > URL: https://issues.apache.org/jira/browse/FLINK-5902 > Project: Flink > Issue Type: Sub-task > Components: Webfrontend > Environment: IE >Reporter: Tao Wang > Attachments: chrome is ok.png, IE 11 with problem.png > > > Some images in the Overview page can not show in IE, as it is good in Chrome. > I'm using IE 11, but think same with IE9 I'll paste the screenshot > later. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950672#comment-15950672 ] shijinkui commented on FLINK-6233: -- Is this the sub-issue of FLINK-4557? > Support rowtime inner equi-join between two streams in the SQL API > -- > > Key: FLINK-6233 > URL: https://issues.apache.org/jira/browse/FLINK-6233 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: hongyuhong > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL > '1' HOUR}} only can use rowtime that is a system attribute, the time > condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime > - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support > unbounded like {{o.rowtime > s.rowtime }}, and should include both two > stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + > 1}} should also not be supported. > An row-time streams join will not be able to handle late data, because this > would mean in insert a row into a sorted order shift all other computations. > This would be too expensive to maintain. Therefore, we will throw an error if > a user tries to use an row-time stream join with late data handling. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5860) Replace all the file creating from java.io.tmpdir with TemporaryFolder
[ https://issues.apache.org/jira/browse/FLINK-5860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15948410#comment-15948410 ] shijinkui commented on FLINK-5860: -- [~yaroslav.mykhaylov] Thank for your work. Wait for your message. > Replace all the file creating from java.io.tmpdir with TemporaryFolder > -- > > Key: FLINK-5860 > URL: https://issues.apache.org/jira/browse/FLINK-5860 > Project: Flink > Issue Type: Test > Components: Tests >Reporter: shijinkui >Assignee: Yaroslav Mykhaylov > Labels: starter > > Search `System.getProperty("java.io.tmpdir")` in whole Flink project. It will > get a Unit test list. Replace all the file creating from `java.io.tmpdir` > with TemporaryFolder. > Who can fix this problem thoroughly? > ``` > $ grep -ri 'System.getProperty("java.io.tmpdir")' . > ./flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java: > env.setStateBackend(new FsStateBackend("file:///" + > System.getProperty("java.io.tmpdir") + "/flink/backend")); > ./flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java: > return getMockEnvironment(new File[] { new > File(System.getProperty("java.io.tmpdir")) }); > ./flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java: >public static final String DEFAULT_TASK_MANAGER_TMP_PATH = > System.getProperty("java.io.tmpdir"); > ./flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java: > final String tempPath = System.getProperty("java.io.tmpdir"); > ./flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java: > final File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java: > final String outDir = params.get("output", > System.getProperty("java.io.tmpdir")); > ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java: > final String tmpDir = System.getProperty("java.io.tmpdir"); > ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java: > final String outPath = System.getProperty("java.io.tmpdir"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java: >pub
[jira] [Commented] (FLINK-6204) Improve Event-Time OVER ROWS BETWEEN UNBOUNDED PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-6204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15946477#comment-15946477 ] shijinkui commented on FLINK-6204: -- -1. hi, guys. I want to know the differance between this PR and https://github.com/apache/flink/pull/3386 You have 138 comments, but now rewrite the FLINK-3386. Why not recommend this solution at 3386. Do we must waste of time on the same problem? > Improve Event-Time OVER ROWS BETWEEN UNBOUNDED PRECEDING aggregation to SQL > --- > > Key: FLINK-6204 > URL: https://issues.apache.org/jira/browse/FLINK-6204 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Currently `event time OVER ROWS BETWEEN UNBOUNDED PRECEDING aggregation to > SQL` implementation class: ` UnboundedEventTimeOverProcessFunction` use > data size uncontrollable memory data structures`sortedTimestamps: > util.LinkedList [Long] cache data timestamps and sort timestamps. IMO,It's > not a good way, because in the production environment there are millions of > window data pre millisecond in our application scenario.So, I want to remove > `util.LinkedList [Long] `. Welcome anyone to give me feedback. > What do you think? [~fhueske] and [~Yuhong_kyo] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3628: [FLINK-6201][example] move python example files from reso...
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3628 > I would also suggest to run an example at least once to make sure you don't break anything. You are right. Only move the example files --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3628: [FLINK-6201][example] move python example files fr...
GitHub user shijinkui opened a pull request: https://github.com/apache/flink/pull/3628 [FLINK-6201][example] move python example files from resources to the examples Python example in the resource dir is not suitable. Move them to the examples/python dir. - [X] General - The pull request references the related JIRA issue ("[FLINK-6201] move python example files from resources to the examples") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [X] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/hwstreaming/flink FLINK-6201 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3628.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3628 commit c9eafd960707971d762da2f7d9179404e91b6211 Author: Jinkui Shi <shijin...@huawei.com> Date: 2017-03-28T06:40:26Z [FLINK-6201][example] move python example files from resources to the examples --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-6201) move python example files from resources to the examples
[ https://issues.apache.org/jira/browse/FLINK-6201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui reassigned FLINK-6201: Assignee: shijinkui > move python example files from resources to the examples > > > Key: FLINK-6201 > URL: https://issues.apache.org/jira/browse/FLINK-6201 > Project: Flink > Issue Type: Sub-task > Components: Examples >Reporter: shijinkui > Assignee: shijinkui >Priority: Trivial > > Python example in the resource dir is not suitable. Move them to the > examples/python dir. > ``` > > > ../flink-libraries/flink-python/src/main/python/org/apache/flink/python/api > resources/python > 0755 > > ``` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6201) move python example files from resources to the examples
[ https://issues.apache.org/jira/browse/FLINK-6201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui updated FLINK-6201: - Priority: Trivial (was: Major) > move python example files from resources to the examples > > > Key: FLINK-6201 > URL: https://issues.apache.org/jira/browse/FLINK-6201 > Project: Flink > Issue Type: Sub-task > Components: Examples >Reporter: shijinkui >Priority: Trivial > > Python example in the resource dir is not suitable. Move them to the > examples/python dir. > ``` > > > ../flink-libraries/flink-python/src/main/python/org/apache/flink/python/api > resources/python > 0755 > > ``` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6201) move python example files from resources to the examples
[ https://issues.apache.org/jira/browse/FLINK-6201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui updated FLINK-6201: - Summary: move python example files from resources to the examples (was: move python example files to the examples dir) > move python example files from resources to the examples > > > Key: FLINK-6201 > URL: https://issues.apache.org/jira/browse/FLINK-6201 > Project: Flink > Issue Type: Sub-task > Components: Examples >Reporter: shijinkui > > Python example in the resource dir is not suitable. Move them to the > examples/python dir. > ``` > > > ../flink-libraries/flink-python/src/main/python/org/apache/flink/python/api > resources/python > 0755 > > ``` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6201) move python example files to the examples dir
shijinkui created FLINK-6201: Summary: move python example files to the examples dir Key: FLINK-6201 URL: https://issues.apache.org/jira/browse/FLINK-6201 Project: Flink Issue Type: Sub-task Components: Examples Reporter: shijinkui Python example in the resource dir is not suitable. Move them to the examples/python dir. ``` ../flink-libraries/flink-python/src/main/python/org/apache/flink/python/api resources/python 0755 ``` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4319) Rework Cluster Management (FLIP-6)
[ https://issues.apache.org/jira/browse/FLINK-4319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943198#comment-15943198 ] shijinkui commented on FLINK-4319: -- Flink on Kubernetes, do we have some schedule? > Rework Cluster Management (FLIP-6) > -- > > Key: FLINK-4319 > URL: https://issues.apache.org/jira/browse/FLINK-4319 > Project: Flink > Issue Type: Improvement > Components: Cluster Management >Affects Versions: 1.1.0 >Reporter: Stephan Ewen > > This is the root issue to track progress of the rework of cluster management > (FLIP-6) > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-5217) Deprecated interface Checkpointed make clear suggestion
[ https://issues.apache.org/jira/browse/FLINK-5217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui closed FLINK-5217. Resolution: Fixed > Deprecated interface Checkpointed make clear suggestion > --- > > Key: FLINK-5217 > URL: https://issues.apache.org/jira/browse/FLINK-5217 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: shijinkui > Fix For: 1.2.1 > > > package org.apache.flink.streaming.api.checkpoint; > @Deprecated > @PublicEvolving > public interface Checkpointed extends > CheckpointedRestoring > this interface should have clear suggestion which version to give up this > interface, and which interface can instead of it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-5754) released tag missing .gitigonore .travis.yml .gitattributes
[ https://issues.apache.org/jira/browse/FLINK-5754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui closed FLINK-5754. Resolution: Won't Fix > released tag missing .gitigonore .travis.yml .gitattributes > > > Key: FLINK-5754 > URL: https://issues.apache.org/jira/browse/FLINK-5754 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: shijinkui > > released tag missing .gitigonore .travis.yml .gitattributes. > When make a release version, should only replace the version. > for example: https://github.com/apache/spark/tree/v2.1.0 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5754) released tag missing .gitigonore .travis.yml .gitattributes
[ https://issues.apache.org/jira/browse/FLINK-5754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942804#comment-15942804 ] shijinkui commented on FLINK-5754: -- [~greghogan] It's OK > released tag missing .gitigonore .travis.yml .gitattributes > > > Key: FLINK-5754 > URL: https://issues.apache.org/jira/browse/FLINK-5754 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: shijinkui > > released tag missing .gitigonore .travis.yml .gitattributes. > When make a release version, should only replace the version. > for example: https://github.com/apache/spark/tree/v2.1.0 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-4562) table examples make an divided module in flink-examples
[ https://issues.apache.org/jira/browse/FLINK-4562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui updated FLINK-4562: - Issue Type: Sub-task (was: Improvement) Parent: FLINK-6193 > table examples make an divided module in flink-examples > --- > > Key: FLINK-4562 > URL: https://issues.apache.org/jira/browse/FLINK-4562 > Project: Flink > Issue Type: Sub-task > Components: Examples, Table API & SQL >Reporter: shijinkui >Assignee: shijinkui > Fix For: 1.2.1 > > > example code should't packaged in table module. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6193) Flink dist directory normalize
shijinkui created FLINK-6193: Summary: Flink dist directory normalize Key: FLINK-6193 URL: https://issues.apache.org/jira/browse/FLINK-6193 Project: Flink Issue Type: Improvement Components: Examples Reporter: shijinkui The Flink distribution's directory have no very clear responsibility about what type of files should be in which directory. For example, "opt" directories are mixed with library jars and example jars. This mail here: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-dist-directory-management-td16784.html After discuss, we determine for the distribution directory style below: - "examples" directory only contain example jars - "opt" directory only contain optional library jars in runtime - "lib" directory only contain library jar that must be loaded at runtime - "resources" directory only contain resource file used at runtime, such as web file -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3609: [FLINK-6073] - Support for SQL inner queries for p...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3609#discussion_r108059835 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet } +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.{ RelNode, RelWriter, BiRel } +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, DataStream, KeyedStream, WindowedStream } +import org.apache.flink.streaming.api.windowing.assigners._ +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.{ Window => DataStreamWindow } +import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions._ +import org.apache.flink.table.plan.logical._ +import org.apache.flink.table.plan.nodes.CommonAggregate +import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._ +import org.apache.flink.table.runtime.aggregate.AggregateUtil._ +import org.apache.flink.table.runtime.aggregate._ +import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval +import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, TimeIntervalTypeInfo } +import org.apache.flink.types.Row +import org.apache.calcite.rel.logical.LogicalJoin +import org.apache.calcite.rel.core.JoinRelType +import org.apache.flink.table.api.TableException +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.windowing.triggers.Trigger +import org.apache.flink.streaming.api.windowing.windows.GlobalWindow +import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult +import org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion +import org.apache.flink.streaming.api.windowing.evictors.Evictor +import org.apache.flink.streaming.api.windowing.evictors.Evictor.EvictorContext +import java.lang.Iterable +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue +import org.apache.flink.api.common.functions.RichFlatJoinFunction +import org.apache.flink.configuration.Configuration +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.util.Collector + +class DataStreamJoin( --- End diff -- need scaladoc to describe the class's responsibility --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3609: [FLINK-6073] - Support for SQL inner queries for p...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3609#discussion_r108059875 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala --- @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet } +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.{ RelNode, RelWriter, BiRel } +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, DataStream, KeyedStream, WindowedStream } +import org.apache.flink.streaming.api.windowing.assigners._ +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.{ Window => DataStreamWindow } +import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions._ +import org.apache.flink.table.plan.logical._ +import org.apache.flink.table.plan.nodes.CommonAggregate +import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._ +import org.apache.flink.table.runtime.aggregate.AggregateUtil._ +import org.apache.flink.table.runtime.aggregate._ +import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval +import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, TimeIntervalTypeInfo } +import org.apache.flink.types.Row +import org.apache.calcite.rel.logical.LogicalJoin +import org.apache.calcite.rel.core.JoinRelType +import org.apache.flink.table.api.TableException +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.windowing.triggers.Trigger +import org.apache.flink.streaming.api.windowing.windows.GlobalWindow +import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult +import org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion +import org.apache.flink.streaming.api.windowing.evictors.Evictor +import org.apache.flink.streaming.api.windowing.evictors.Evictor.EvictorContext +import java.lang.Iterable +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue +import org.apache.flink.api.common.functions.RichFlatJoinFunction +import org.apache.flink.configuration.Configuration +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.util.Collector + +class DataStreamJoin( + calc: LogicalJoin, + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputLeft: RelNode, + inputRight: RelNode, + rowType: RelDataType, + description: String) +extends BiRel(cluster, traitSet, inputLeft, inputRight) with DataStreamRel { + + override def deriveRowType(): RelDataType = rowType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamJoin( + calc, + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + rowType, + description + calc.getId()) + } + + override def toString: String = { +s"Join(${ + if (!calc.getCondition.isAlwaysTrue()) { +s"condition: (${calc.getCondition}), " + } else { +"" + } +}left: ($inputLeft), right($inputRight))" + } + +
[GitHub] flink issue #2460: [FLINK-4562] table examples make an divided module in fli...
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/2460 > I agree with @wuchong , that we should follow the example of Gelly and add the flink-table-examples JAR file to the opt folder. @fhueske Thanks for your review. IMO, directory style below is reasonable: * `examples` directory only contain example jars * `opt` directory only contain optional library jars * `lib` directory only contain library jar that must be load in runtime The `opt` directory is noisy, that contains lib jar and example jar: ``` flink-cep-scala_2.11-1.3.0.jar flink-gelly_2.11-1.3.0.jar flink-metrics-statsd-1.3.0.jar flink-cep_2.11-1.3.0.jar flink-metrics-dropwizard-1.3.0.jar flink-ml_2.11-1.3.0.jar flink-gelly-examples_2.11-1.3.0.jar flink-metrics-ganglia-1.3.0.jar flink-gelly-scala_2.11-1.3.0.jar flink-metrics-graphite-1.3.0.jar ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-6117) 'zookeeper.sasl.disable' not takes effet when starting CuratorFramework
[ https://issues.apache.org/jira/browse/FLINK-6117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui updated FLINK-6117: - Issue Type: Sub-task (was: Bug) Parent: FLINK-5839 > 'zookeeper.sasl.disable' not takes effet when starting CuratorFramework > > > Key: FLINK-6117 > URL: https://issues.apache.org/jira/browse/FLINK-6117 > Project: Flink > Issue Type: Sub-task > Components: Client, JobManager >Affects Versions: 1.2.0 > Environment: Ubuntu, non-secured >Reporter: CanBin Zheng >Assignee: CanBin Zheng > Labels: security > Original Estimate: 336h > Remaining Estimate: 336h > > The value of 'zookeeper.sasl.disable' not used in the right way when starting > CuratorFramework. > Here are all the settings relevant to high-availability in my flink-conf.yaml: > high-availability: zookeeper > high-availability.zookeeper.quorum: localhost:2181 > high-availability.zookeeper.storageDir: hdfs:///flink/ha/ > Obviously, no explicit value is set for 'zookeeper.sasl.disable' so default > value of 'true'(ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE) would be > applied. But when FlinkYarnSessionCli & FlinkApplicationMasterRunner start, > both logs show that they attempt connecting to zookeeper in 'SASL' mode. > logs are like this: > 2017-03-18 23:53:10,498 INFO org.apache.zookeeper.ZooKeeper > - Initiating client connection, connectString=localhost:2181 > sessionTimeout=6 > watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@5949eba8 > 2017-03-18 23:53:10,498 INFO org.apache.zookeeper.ZooKeeper > - Initiating client connection, connectString=localhost:2181 > sessionTimeout=6 > watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@5949eba8 > 2017-03-18 23:53:10,522 WARN org.apache.zookeeper.ClientCnxn > - SASL configuration failed: > javax.security.auth.login.LoginException: No JAAS configuration section named > 'Client' was found in specified JAAS configuration file: > '/tmp/jaas-3047036396963510842.conf'. Will continue connection to Zookeeper > server without SASL authentication, if Zookeeper server allows it. > 2017-03-18 23:53:10,522 WARN org.apache.zookeeper.ClientCnxn > - SASL configuration failed: > javax.security.auth.login.LoginException: No JAAS configuration section named > 'Client' was found in specified JAAS configuration file: > '/tmp/jaas-3047036396963510842.conf'. Will continue connection to Zookeeper > server without SASL authentication, if Zookeeper server allows it. > 2017-03-18 23:53:10,530 INFO org.apache.zookeeper.ClientCnxn > - Opening socket connection to server localhost/127.0.0.1:2181 > 2017-03-18 23:53:10,530 INFO org.apache.zookeeper.ClientCnxn > - Opening socket connection to server localhost/127.0.0.1:2181 > 2017-03-18 23:53:10,534 ERROR > org.apache.flink.shaded.org.apache.curator.ConnectionState- > Authentication failed -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6148) The Zookeeper client occur SASL error when the sasl is disable
[ https://issues.apache.org/jira/browse/FLINK-6148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui reassigned FLINK-6148: Assignee: (was: shijinkui) > The Zookeeper client occur SASL error when the sasl is disable > -- > > Key: FLINK-6148 > URL: https://issues.apache.org/jira/browse/FLINK-6148 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.2.0 >Reporter: zhangrucong1982 > > I use the flink in yarn cluster of version 1.2.0. The HA is configured in > flink-conf.yaml, but the sasl is disabled. The configurations are : > high-availability: zookeeper > high-availability.zookeeper.quorum: > 100.106.40.102:2181,100.106.57.136:2181,100.106.41.233:2181 > high-availability.zookeeper.storageDir: hdfs:/flink > high-availability.zookeeper.client.acl: open > high-availability.zookeeper.path.root: flink0308 > zookeeper.sasl.disable: true > The client log、JobManager log、TaskManager log are contain the following error > information: > 2017-03-22 11:18:24,662 WARN org.apache.zookeeper.ClientCnxn > - SASL configuration failed: > javax.security.auth.login.LoginException: No JAAS configuration section named > 'Client' was found in specified JAAS configuration file: > '/tmp/jaas-441937039502263015.conf'. Will continue connection to Zookeeper > server without SASL authentication, if Zookeeper server allows it. > 2017-03-22 11:18:24,663 ERROR > org.apache.flink.shaded.org.apache.curator.ConnectionState- > Authentication failed -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6148) The Zookeeper client occur SASL error when the sasl is disable
[ https://issues.apache.org/jira/browse/FLINK-6148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui reassigned FLINK-6148: Assignee: shijinkui > The Zookeeper client occur SASL error when the sasl is disable > -- > > Key: FLINK-6148 > URL: https://issues.apache.org/jira/browse/FLINK-6148 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.2.0 >Reporter: zhangrucong1982 >Assignee: shijinkui > > I use the flink in yarn cluster of version 1.2.0. The HA is configured in > flink-conf.yaml, but the sasl is disabled. The configurations are : > high-availability: zookeeper > high-availability.zookeeper.quorum: > 100.106.40.102:2181,100.106.57.136:2181,100.106.41.233:2181 > high-availability.zookeeper.storageDir: hdfs:/flink > high-availability.zookeeper.client.acl: open > high-availability.zookeeper.path.root: flink0308 > zookeeper.sasl.disable: true > The client log、JobManager log、TaskManager log are contain the following error > information: > 2017-03-22 11:18:24,662 WARN org.apache.zookeeper.ClientCnxn > - SASL configuration failed: > javax.security.auth.login.LoginException: No JAAS configuration section named > 'Client' was found in specified JAAS configuration file: > '/tmp/jaas-441937039502263015.conf'. Will continue connection to Zookeeper > server without SASL authentication, if Zookeeper server allows it. > 2017-03-22 11:18:24,663 ERROR > org.apache.flink.shaded.org.apache.curator.ConnectionState- > Authentication failed -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5217) Deprecated interface Checkpointed make clear suggestion
[ https://issues.apache.org/jira/browse/FLINK-5217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941522#comment-15941522 ] shijinkui commented on FLINK-5217: -- ping [~srichter] > Deprecated interface Checkpointed make clear suggestion > --- > > Key: FLINK-5217 > URL: https://issues.apache.org/jira/browse/FLINK-5217 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: shijinkui > Fix For: 1.2.1 > > > package org.apache.flink.streaming.api.checkpoint; > @Deprecated > @PublicEvolving > public interface Checkpointed extends > CheckpointedRestoring > this interface should have clear suggestion which version to give up this > interface, and which interface can instead of it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5860) Replace all the file creating from java.io.tmpdir with TemporaryFolder
[ https://issues.apache.org/jira/browse/FLINK-5860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941520#comment-15941520 ] shijinkui commented on FLINK-5860: -- ping [~yaroslav.mykhaylov] > Replace all the file creating from java.io.tmpdir with TemporaryFolder > -- > > Key: FLINK-5860 > URL: https://issues.apache.org/jira/browse/FLINK-5860 > Project: Flink > Issue Type: Test > Components: Tests >Reporter: shijinkui >Assignee: Yaroslav Mykhaylov > Labels: starter > > Search `System.getProperty("java.io.tmpdir")` in whole Flink project. It will > get a Unit test list. Replace all the file creating from `java.io.tmpdir` > with TemporaryFolder. > Who can fix this problem thoroughly? > ``` > $ grep -ri 'System.getProperty("java.io.tmpdir")' . > ./flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java: > env.setStateBackend(new FsStateBackend("file:///" + > System.getProperty("java.io.tmpdir") + "/flink/backend")); > ./flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java: > return getMockEnvironment(new File[] { new > File(System.getProperty("java.io.tmpdir")) }); > ./flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java: >public static final String DEFAULT_TASK_MANAGER_TMP_PATH = > System.getProperty("java.io.tmpdir"); > ./flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java: > final String tempPath = System.getProperty("java.io.tmpdir"); > ./flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java: > final File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java: > final String outDir = params.get("output", > System.getProperty("java.io.tmpdir")); > ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java: > final String tmpDir = System.getProperty("java.io.tmpdir"); > ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java: > final String outPath = System.getProperty("java.io.tmpdir"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java: >public static final String FLINK_PYTHON_FILE_PATH = > System.getProperty("java
[jira] [Commented] (FLINK-6060) reference nonexistent class in the scaladoc
[ https://issues.apache.org/jira/browse/FLINK-6060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941519#comment-15941519 ] shijinkui commented on FLINK-6060: -- [~aljoscha] Sorry for my unclear description. For example, the class TaskOperationResult in the scaladoc. Actually TaskOperationResult is not exist, or it had been changed file name. So in such scaladoc, we should correct the referenced class. /** * Submits a task to the task manager. The result is to this message is a * [[TaskOperationResult]] message. * * @param tasks Descriptor which contains the information to start the task. */ case class SubmitTask(tasks: TaskDeploymentDescriptor) extends TaskMessage with RequiresLeaderSessionID > reference nonexistent class in the scaladoc > --- > > Key: FLINK-6060 > URL: https://issues.apache.org/jira/browse/FLINK-6060 > Project: Flink > Issue Type: Wish > Components: Scala API >Reporter: shijinkui > > TaskMessages.scala > ConnectedStreams.scala > DataStream.scala > Who can fix it? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6060) reference nonexistent class in the scaladoc
[ https://issues.apache.org/jira/browse/FLINK-6060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui updated FLINK-6060: - Summary: reference nonexistent class in the scaladoc (was: not exist class referance in the scala function annotation) > reference nonexistent class in the scaladoc > --- > > Key: FLINK-6060 > URL: https://issues.apache.org/jira/browse/FLINK-6060 > Project: Flink > Issue Type: Wish > Components: Scala API >Reporter: shijinkui > > TaskMessages.scala > ConnectedStreams.scala > DataStream.scala > Who can fix it? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5754) released tag missing .gitigonore .travis.yml .gitattributes
[ https://issues.apache.org/jira/browse/FLINK-5754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941516#comment-15941516 ] shijinkui commented on FLINK-5754: -- [~greghogan] At first, I image that the flink tag is the same with other open source project, so the checkout a branch from tag. It's have to reset it, we had gone forward too much. If we have no special reason, can we don't delete any thing at tag release on the next milestone, that following the common tag/release rule? If so, it'll be very convenient to develop private flink version. And then it will have no any difficult to merge to flink community code base. Thanks > released tag missing .gitigonore .travis.yml .gitattributes > > > Key: FLINK-5754 > URL: https://issues.apache.org/jira/browse/FLINK-5754 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: shijinkui > > released tag missing .gitigonore .travis.yml .gitattributes. > When make a release version, should only replace the version. > for example: https://github.com/apache/spark/tree/v2.1.0 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5650) Flink-python tests executing cost too long time
[ https://issues.apache.org/jira/browse/FLINK-5650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929946#comment-15929946 ] shijinkui commented on FLINK-5650: -- Good job, Thanks. > Flink-python tests executing cost too long time > --- > > Key: FLINK-5650 > URL: https://issues.apache.org/jira/browse/FLINK-5650 > Project: Flink > Issue Type: Bug > Components: Python API, Tests >Affects Versions: 1.2.0, 1.3.0 > Reporter: shijinkui >Assignee: Chesnay Schepler >Priority: Critical > Labels: osx > Fix For: 1.3.0, 1.2.1 > > > When execute `mvn clean test` in flink-python, it will wait more than half > hour after the console output below: > --- > T E S T S > --- > Running org.apache.flink.python.api.PythonPlanBinderTest > log4j:WARN No appenders could be found for logger > (org.apache.flink.python.api.PythonPlanBinderTest). > log4j:WARN Please initialize the log4j system properly. > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more > info. > The stack below: > "main" prio=5 tid=0x7f8d7780b800 nid=0x1c03 waiting on condition > [0x79fd8000] >java.lang.Thread.State: TIMED_WAITING (sleeping) > at java.lang.Thread.sleep(Native Method) > at > org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.startPython(PythonPlanStreamer.java:70) > at > org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.open(PythonPlanStreamer.java:50) > at > org.apache.flink.python.api.PythonPlanBinder.startPython(PythonPlanBinder.java:211) > at > org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:141) > at > org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:114) > at > org.apache.flink.python.api.PythonPlanBinderTest.testProgram(PythonPlanBinderTest.java:83) > at > org.apache.flink.test.util.JavaProgramTestBase.testJobWithoutObjectReuse(JavaProgramTestBase.java:174) > this is the jstack: > https://gist.github.com/shijinkui/af47e8bc6c9f748336bf52efd3df94b0 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106617241 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -119,6 +154,64 @@ class DataStreamOverAggregate( } + def createTimeBoundedProcessingTimeOverWindow( --- End diff -- 93 characters, not reach 100. Intellij IDEA can see the result clearly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106589672 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( --- End diff -- `(new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType))` can omit the outside brackets. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106590214 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -159,6 +167,46 @@ class DataStreamOverAggregate( result } + def createUnboundedAndCurrentRowEventTimeOverWindow( +inputDS: DataStream[Row]): DataStream[Row] = { + +val overWindow: Group = logicWindow.groups.get(0) +val partitionKeys: Array[Int] = overWindow.keys.toArray +val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + +// get the output types +val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + +val result: DataStream[Row] = + // partitioned aggregation + if (partitionKeys.nonEmpty) { +val keyedStream = inputDS.keyBy(partitionKeys: _*) +val processFunction = AggregateUtil.CreateUnboundedEventTimeOverProcessFunction( --- End diff -- `val processFunction = AggregateUtil.CreateUnboundedEventTimeOverProcessFunction(` can declared before the if/else, because it have no any relationship with `partitionKeys` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106592513 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + override def onTimer( + timestamp: Long, + ctx: ProcessFunction[Row, Row]#OnTimerContext, + out: Collector[Row]): Unit = { + +var rowList = rowState.get.iterator +var sortList = new util.LinkedList[Tuple2[Long, Row]]() +while (rowList.hasNext) { +
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106590692 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + override def onTimer( + timestamp: Long, + ctx: ProcessFunction[Row, Row]#OnTimerContext, + out: Collector[Row]): Unit = { + +var rowList = rowState.get.iterator +var sortList = new util.LinkedList[Tuple2[Long, Row]]() +while (rowList.hasNext) { --- End di
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106590822 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + override def onTimer( + timestamp: Long, + ctx: ProcessFunction[Row, Row]#OnTimerContext, + out: Collector[Row]): Unit = { + +var rowList = rowState.get.iterator +var sortList = new util.LinkedList[Tuple2[Long, Row]]() +while (rowList.hasNext) { +
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106590304 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -91,6 +91,35 @@ object AggregateUtil { } /** +* Create an [[ProcessFunction]] to evaluate final aggregate value. +* +* @param namedAggregates List of calls to aggregate functions and their output field names +* @param inputType Input row type +* @return [[UnboundedProcessingOverProcessFunction]] +*/ + private[flink] def CreateUnboundedEventTimeOverProcessFunction( --- End diff -- function name should start with lowercase --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106590402 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -91,6 +91,35 @@ object AggregateUtil { } /** +* Create an [[ProcessFunction]] to evaluate final aggregate value. +* +* @param namedAggregates List of calls to aggregate functions and their output field names +* @param inputType Input row type +* @return [[UnboundedProcessingOverProcessFunction]] +*/ + private[flink] def CreateUnboundedEventTimeOverProcessFunction( + namedAggregates: Seq[CalcitePair[AggregateCall, String]], + inputType: RelDataType): UnboundedEventTimeOverProcessFunction = { + +val (aggFields, aggregates) = + transformToAggregateFunctions( +namedAggregates.map(_.getKey), +inputType, +needRetraction = false) + +val aggregationStateType: RowTypeInfo = --- End diff -- ``` val aggregationStateType: RowTypeInfo = createDataSetAggregateBufferDataType(Array(), aggregates, inputType) ``` This will be more readable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106592195 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + override def onTimer( + timestamp: Long, + ctx: ProcessFunction[Row, Row]#OnTimerContext, + out: Collector[Row]): Unit = { + +var rowList = rowState.get.iterator +var sortList = new util.LinkedList[Tuple2[Long, Row]]() +while (rowList.hasNext) { +
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106602209 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataStreamProcTimeAggregateWindowFunction.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.types.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.Collector +import org.apache.flink.table.functions.AggregateFunction +import org.apache.flink.table.functions.Accumulator + +/** + * Computes the final aggregate value from incrementally computed aggreagtes. + * + * @param numGroupingKey The number of grouping keys. + * @param numAggregates The number of aggregates. + * @param finalRowArity The arity of the final output row. + */ +class DataStreamIncrementalAggregateWindowFunction[W <: Window]( --- End diff -- 1. `DataStreamIncrementalAggregateWindowFunction` class is different from this class name. 2. scaladoc `numGroupingKey`, `numAggregates`, 'finalRowArity' didn't exist 3. IMO, all the function in the aggregate package have no enough clear doc to describe what/how and the key point function. package `aggregate` is lazy working. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106605456 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataStreamProcTimeAggregateGlobalWindowFunction.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.functions.Accumulator + +import java.lang.Iterable +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.functions.AggregateFunction + + + + //org.apache.flink.streaming.api.functions.windowing.AllWindowFunction + +class DataStreamProcTimeAggregateGlobalWindowFunction[W <: Window]( + private val aggregates: Array[AggregateFunction[_]], --- End diff -- `private val ` should be choose if the `aggregates` need not be access directly. Same to the class field. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106600097 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -119,6 +154,64 @@ class DataStreamOverAggregate( } + def createTimeBoundedProcessingTimeOverWindow( +inputDS: DataStream[Row]): DataStream[Row] = { + +val overWindow: Group = logicWindow.groups.get(0) +val partitionKeys: Array[Int] = overWindow.keys.toArray +val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + +// final long time_boundary = +// Long.parseLong(windowReference.getConstants().get(1).getValue().toString()); +val index = overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex +val count = input.getRowType().getFieldCount() +val lowerboundIndex = index - count +val time_boundary = logicWindow.constants.get(lowerboundIndex) --- End diff -- `val time_boundary = logicWindow.constants.get(lowerboundIndex)` trim the two space, like `val time_boundary = logicWindow.constants.get(lowerboundIndex)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106411557 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -191,3 +287,31 @@ class DataStreamOverAggregate( } +object DataStreamProcTimeCase { + class ProcTimeTimestampExtractor + extends AssignerWithPunctuatedWatermarks[Row] { + +override def checkAndGetNextWatermark( + lastElement: Row, + extractedTimestamp: Long): Watermark = { + null +} + +override def extractTimestamp( + element: Row, + previousElementTimestamp: Long): Long = { + System.currentTimeMillis() +} + } + /* + class MyWindowFunction extends AllWindowFunction[Row, Row, GlobalWindow] { --- End diff -- delete it if useless --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106604502 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataStreamProcTimeAggregateWindowFunction.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.types.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.Collector +import org.apache.flink.table.functions.AggregateFunction +import org.apache.flink.table.functions.Accumulator + +/** + * Computes the final aggregate value from incrementally computed aggreagtes. + * + * @param numGroupingKey The number of grouping keys. + * @param numAggregates The number of aggregates. + * @param finalRowArity The arity of the final output row. + */ +class DataStreamIncrementalAggregateWindowFunction[W <: Window]( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int) + extends RichWindowFunction[Row, Row, Tuple, W] { + +private var output: Row = _ +private var accumulators: Row= _ + + override def open(parameters: Configuration): Unit = { + output = new Row(forwardedFieldCount + aggregates.length) + accumulators = new Row(aggregates.length) + var i = 0 + while (i < aggregates.length) { +accumulators.setField(i, aggregates(i).createAccumulator()) +i = i + 1 + } + } + + + /** +* Calculate aggregated values output by aggregate buffer, and set them into output +* Row based on the mapping relation between intermediate aggregate data and output data. +*/ + override def apply( + key: Tuple, + window: W, + records: Iterable[Row], + out: Collector[Row]): Unit = { + + var i = 0 --- End diff -- 1. this class should be format first. 2. IMO, all the `asInstanceOf` should be guarantee by the match/case except the object is determinated. 3. `Calculate aggregated values output by aggregate buffer, and set them into output` should end with `.`, and the `@param` tag should be add 4. the variable `i` should have clear name. At the same time `i` is used in three loop, that make the code hard to read. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106605103 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataStreamProcTimeAggregateGlobalWindowFunction.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.functions.Accumulator + +import java.lang.Iterable +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.functions.AggregateFunction + + + + //org.apache.flink.streaming.api.functions.windowing.AllWindowFunction + --- End diff -- 1. scaladoc is not well format, need a clear and detail scaladoc 2. the class should be format first 3. delete the un-used import phrase, such as `import org.apache.flink.api.java.typeutils.RowTypeInfo` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106602970 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataStreamProcTimeAggregateWindowFunction.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.types.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.Collector +import org.apache.flink.table.functions.AggregateFunction +import org.apache.flink.table.functions.Accumulator + +/** + * Computes the final aggregate value from incrementally computed aggreagtes. + * + * @param numGroupingKey The number of grouping keys. + * @param numAggregates The number of aggregates. + * @param finalRowArity The arity of the final output row. + */ +class DataStreamIncrementalAggregateWindowFunction[W <: Window]( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int) + extends RichWindowFunction[Row, Row, Tuple, W] { + +private var output: Row = _ +private var accumulators: Row= _ + + override def open(parameters: Configuration): Unit = { + output = new Row(forwardedFieldCount + aggregates.length) + accumulators = new Row(aggregates.length) + var i = 0 --- End diff -- `aggregates` is array, also we can convert it to a `iterator`, then the `i` will be no needed. Iterator is more safe. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106599975 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -119,6 +154,64 @@ class DataStreamOverAggregate( } + def createTimeBoundedProcessingTimeOverWindow( +inputDS: DataStream[Row]): DataStream[Row] = { + +val overWindow: Group = logicWindow.groups.get(0) +val partitionKeys: Array[Int] = overWindow.keys.toArray +val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + +// final long time_boundary = --- End diff -- if no used, delete the comment code lines --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106601075 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -785,7 +785,7 @@ object AggregateUtil { (propPos._1, propPos._2) } - private def transformToAggregateFunctions( --- End diff -- Using [flink] make this function can be accessed, that can work. But we need think one thing: whether there function need be visit in the `flink` range of package. My suggestion is `private[table]` to apply other function in this class. What other reviewer think about that? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106411491 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -136,13 +229,13 @@ class DataStreamOverAggregate( namedAggregates, inputType) - inputDS +inputDS .keyBy(partitionKeys: _*) .process(processFunction) .returns(rowTypeInfo) .name(aggOpName) .asInstanceOf[DataStream[Row]] -} + } // global non-partitioned aggregation --- End diff -- annotation should before `inputDS` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106599719 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -119,6 +154,64 @@ class DataStreamOverAggregate( } + def createTimeBoundedProcessingTimeOverWindow( --- End diff -- `def createTimeBoundedProcessingTimeOverWindow(` in one line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106605509 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataStreamProcTimeAggregateGlobalWindowFunction.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.functions.Accumulator + +import java.lang.Iterable +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.functions.AggregateFunction + + + + //org.apache.flink.streaming.api.functions.windowing.AllWindowFunction + +class DataStreamProcTimeAggregateGlobalWindowFunction[W <: Window]( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int) + extends RichAllWindowFunction[Row, Row, W] { + +private var output: Row = _ +private var accumulators: Row= _ + + + override def open(parameters: Configuration): Unit = { + output = new Row(forwardedFieldCount + aggregates.length) + accumulators = new Row(aggregates.length) + var i = 0 + while (i < aggregates.length) { +accumulators.setField(i, aggregates(i).createAccumulator()) +i = i + 1 + } + } + + override def apply( + window: W, + records: Iterable[Row], + out: Collector[Row]): Unit = { + + + var i = 0 + //initialize the values of the aggregators by re-creating them + //the design of the Accumulator interface should be extended to enable + //a reset function for better performance + while (i < aggregates.length) { +accumulators.setField(i, aggregates(i).createAccumulator()) +i += 1 + } + var reuse:Row = null + //iterate through the elements and aggregate + val iter = records.iterator + while (iter.hasNext) { + reuse = iter.next + i = 0 + while (i < aggregates.length) { + val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] + aggregates(i).accumulate(accumulator, reuse.getField(aggFields(i))) + i += 1 + } + } + +//set the values of the result with current elements values if needed +i = 0 +while (i < forwardedFieldCount) { + output.setField(i, reuse.getField(i)) + i += 1 +} + +//set the values of the result with the accumulators +i = 0 +while (i < aggregates.length) { + val index = forwardedFieldCount + i + val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] + output.setField(index, aggregates(i).getValue(accumulator)) + i += 1 +} + +out.collect(output) + --- End diff -- need not new line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106600464 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -119,6 +154,64 @@ class DataStreamOverAggregate( } + def createTimeBoundedProcessingTimeOverWindow( +inputDS: DataStream[Row]): DataStream[Row] = { + +val overWindow: Group = logicWindow.groups.get(0) +val partitionKeys: Array[Int] = overWindow.keys.toArray +val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + +// final long time_boundary = +// Long.parseLong(windowReference.getConstants().get(1).getValue().toString()); +val index = overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex +val count = input.getRowType().getFieldCount() +val lowerboundIndex = index - count +val time_boundary = logicWindow.constants.get(lowerboundIndex) + .getValue2.asInstanceOf[java.math.BigDecimal].longValue() --- End diff -- `getValue2` returns `Comparable` value, but the value must be `java.math.BigDecimal`? Using match/case to guarantee its type. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3386: [FLINK-5658][table] support unbounded eventtime over wind...
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3386 @hongyuhong , IMO, all the function should have detail doc --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-5860) Replace all the file creating from java.io.tmpdir with TemporaryFolder
[ https://issues.apache.org/jira/browse/FLINK-5860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui updated FLINK-5860: - Labels: starter (was: ) > Replace all the file creating from java.io.tmpdir with TemporaryFolder > -- > > Key: FLINK-5860 > URL: https://issues.apache.org/jira/browse/FLINK-5860 > Project: Flink > Issue Type: Test > Components: Tests >Reporter: shijinkui >Assignee: Yaroslav Mykhaylov > Labels: starter > > Search `System.getProperty("java.io.tmpdir")` in whole Flink project. It will > get a Unit test list. Replace all the file creating from `java.io.tmpdir` > with TemporaryFolder. > Who can fix this problem thoroughly? > ``` > $ grep -ri 'System.getProperty("java.io.tmpdir")' . > ./flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java: > env.setStateBackend(new FsStateBackend("file:///" + > System.getProperty("java.io.tmpdir") + "/flink/backend")); > ./flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java: > return getMockEnvironment(new File[] { new > File(System.getProperty("java.io.tmpdir")) }); > ./flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java: >public static final String DEFAULT_TASK_MANAGER_TMP_PATH = > System.getProperty("java.io.tmpdir"); > ./flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java: > final String tempPath = System.getProperty("java.io.tmpdir"); > ./flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java: > final File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: > File tempDir = new File(System.getProperty("java.io.tmpdir")); > ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java: > final String outDir = params.get("output", > System.getProperty("java.io.tmpdir")); > ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java: > final String tmpDir = System.getProperty("java.io.tmpdir"); > ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java: > final String outPath = System.getProperty("java.io.tmpdir"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: > File out = new File(System.getProperty("java.io.tmpdir"), > "jarcreatortest.jar"); > ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java: >public static final String FLINK_PYTHON_FILE_PATH = > System.getProperty("java.io.tmpdir") + File.separator
[jira] [Commented] (FLINK-5754) released tag missing .gitigonore .travis.yml .gitattributes
[ https://issues.apache.org/jira/browse/FLINK-5754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929365#comment-15929365 ] shijinkui commented on FLINK-5754: -- [~greghogan] I had checkout a branch from tag 1.2 in produce. In general release tag is the final version, not the branch. Am I right? IMO, the release tag commit only change the project version, and shouldn't change any others except version number. I look some other apache project, they follow this rule. Can we consider this normal rule? Must the hidden files be deleted in the release tag? https://github.com/apache/spark/commit/cd0a08361e2526519e7c131c42116bf56fa62c76 https://github.com/apache/hadoop/commit/94152e171178d34864ddf6362239f3c2dda0965f https://github.com/apache/storm/commit/eac433b0beb3798c4723deb39b3c4fad446378f4 > released tag missing .gitigonore .travis.yml .gitattributes > > > Key: FLINK-5754 > URL: https://issues.apache.org/jira/browse/FLINK-5754 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: shijinkui > > released tag missing .gitigonore .travis.yml .gitattributes. > When make a release version, should only replace the version. > for example: https://github.com/apache/spark/tree/v2.1.0 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106411203 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit._ + +import scala.collection.mutable + +class UnboundedRowtimeOverTest extends StreamingWithStateTestBase { --- End diff -- @hongyuhong all the scala UT or IT file name should end with `ITCase` or `Suite` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5650) Flink-python tests executing cost too long time
[ https://issues.apache.org/jira/browse/FLINK-5650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927543#comment-15927543 ] shijinkui commented on FLINK-5650: -- [~Zentol], Your PR can work. flink-python UT cost one min. Very good :) [INFO] BUILD SUCCESS [INFO] [INFO] Total time: 01:09 min [INFO] Finished at: 2017-03-16T13:57:12+08:00 [INFO] Final Memory: 22M/268M > Flink-python tests executing cost too long time > --- > > Key: FLINK-5650 > URL: https://issues.apache.org/jira/browse/FLINK-5650 > Project: Flink > Issue Type: Bug > Components: Python API, Tests >Affects Versions: 1.2.0 > Reporter: shijinkui >Priority: Critical > Labels: osx > Fix For: 1.2.1 > > > When execute `mvn clean test` in flink-python, it will wait more than half > hour after the console output below: > --- > T E S T S > --- > Running org.apache.flink.python.api.PythonPlanBinderTest > log4j:WARN No appenders could be found for logger > (org.apache.flink.python.api.PythonPlanBinderTest). > log4j:WARN Please initialize the log4j system properly. > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more > info. > The stack below: > "main" prio=5 tid=0x7f8d7780b800 nid=0x1c03 waiting on condition > [0x79fd8000] >java.lang.Thread.State: TIMED_WAITING (sleeping) > at java.lang.Thread.sleep(Native Method) > at > org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.startPython(PythonPlanStreamer.java:70) > at > org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.open(PythonPlanStreamer.java:50) > at > org.apache.flink.python.api.PythonPlanBinder.startPython(PythonPlanBinder.java:211) > at > org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:141) > at > org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:114) > at > org.apache.flink.python.api.PythonPlanBinderTest.testProgram(PythonPlanBinderTest.java:83) > at > org.apache.flink.test.util.JavaProgramTestBase.testJobWithoutObjectReuse(JavaProgramTestBase.java:174) > this is the jstack: > https://gist.github.com/shijinkui/af47e8bc6c9f748336bf52efd3df94b0 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5650) Flink-python tests executing cost too long time
[ https://issues.apache.org/jira/browse/FLINK-5650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui updated FLINK-5650: - Labels: osx (was: ) > Flink-python tests executing cost too long time > --- > > Key: FLINK-5650 > URL: https://issues.apache.org/jira/browse/FLINK-5650 > Project: Flink > Issue Type: Bug > Components: Python API, Tests >Affects Versions: 1.2.0 > Reporter: shijinkui >Priority: Critical > Labels: osx > Fix For: 1.2.1 > > > When execute `mvn clean test` in flink-python, it will wait more than half > hour after the console output below: > --- > T E S T S > --- > Running org.apache.flink.python.api.PythonPlanBinderTest > log4j:WARN No appenders could be found for logger > (org.apache.flink.python.api.PythonPlanBinderTest). > log4j:WARN Please initialize the log4j system properly. > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more > info. > The stack below: > "main" prio=5 tid=0x7f8d7780b800 nid=0x1c03 waiting on condition > [0x79fd8000] >java.lang.Thread.State: TIMED_WAITING (sleeping) > at java.lang.Thread.sleep(Native Method) > at > org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.startPython(PythonPlanStreamer.java:70) > at > org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.open(PythonPlanStreamer.java:50) > at > org.apache.flink.python.api.PythonPlanBinder.startPython(PythonPlanBinder.java:211) > at > org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:141) > at > org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:114) > at > org.apache.flink.python.api.PythonPlanBinderTest.testProgram(PythonPlanBinderTest.java:83) > at > org.apache.flink.test.util.JavaProgramTestBase.testJobWithoutObjectReuse(JavaProgramTestBase.java:174) > this is the jstack: > https://gist.github.com/shijinkui/af47e8bc6c9f748336bf52efd3df94b0 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3540: [FLINK-6056] [build]apache-rat exclude flink directory in...
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3540 > I am a bit confused... I think there is no tools/flink* directory that would need an exclusion... When execute `tools/create_release_files.sh`, it will clone flink project from apache. Also we had add `tools/flink*` in the .gitignore --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3540: [FLINK-6056] [build]apache-rat exclude flink directory in...
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3540 > Why exactly is it a problem if the rat-plugin checks the tools directory? There no necessary to check the tmp flink project in the tools, because it will extra cost some time when build the main project. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6060) not exist class referance in the scala function annotation
shijinkui created FLINK-6060: Summary: not exist class referance in the scala function annotation Key: FLINK-6060 URL: https://issues.apache.org/jira/browse/FLINK-6060 Project: Flink Issue Type: Wish Reporter: shijinkui TaskMessages.scala ConnectedStreams.scala DataStream.scala Who can fix it? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #2460: [FLINK-4562] table examples make an divided module in fli...
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/2460 ping @wuchong @fhueske @twalthr --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Comment Edited] (FLINK-5650) Flink-python tests executing cost too long time
[ https://issues.apache.org/jira/browse/FLINK-5650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925833#comment-15925833 ] shijinkui edited comment on FLINK-5650 at 3/15/17 10:11 AM: Also we can make PythonPlanStreamer support async executing in the thread pool, instead of blocked single process. [~StephanEwen] I think such bad design unit test shouldn't be merge into the code base. was (Author: shijinkui): Also we can make PythonPlanStreamer support async executing in the thread pool, instead of blocked single process. [~StephanEwen] > Flink-python tests executing cost too long time > --- > > Key: FLINK-5650 > URL: https://issues.apache.org/jira/browse/FLINK-5650 > Project: Flink > Issue Type: Bug > Components: Python API, Tests >Affects Versions: 1.2.0 > Reporter: shijinkui >Priority: Critical > Fix For: 1.2.1 > > > When execute `mvn clean test` in flink-python, it will wait more than half > hour after the console output below: > --- > T E S T S > --- > Running org.apache.flink.python.api.PythonPlanBinderTest > log4j:WARN No appenders could be found for logger > (org.apache.flink.python.api.PythonPlanBinderTest). > log4j:WARN Please initialize the log4j system properly. > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more > info. > The stack below: > "main" prio=5 tid=0x7f8d7780b800 nid=0x1c03 waiting on condition > [0x79fd8000] >java.lang.Thread.State: TIMED_WAITING (sleeping) > at java.lang.Thread.sleep(Native Method) > at > org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.startPython(PythonPlanStreamer.java:70) > at > org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.open(PythonPlanStreamer.java:50) > at > org.apache.flink.python.api.PythonPlanBinder.startPython(PythonPlanBinder.java:211) > at > org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:141) > at > org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:114) > at > org.apache.flink.python.api.PythonPlanBinderTest.testProgram(PythonPlanBinderTest.java:83) > at > org.apache.flink.test.util.JavaProgramTestBase.testJobWithoutObjectReuse(JavaProgramTestBase.java:174) > this is the jstack: > https://gist.github.com/shijinkui/af47e8bc6c9f748336bf52efd3df94b0 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5650) Flink-python tests executing cost too long time
[ https://issues.apache.org/jira/browse/FLINK-5650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925833#comment-15925833 ] shijinkui commented on FLINK-5650: -- Also we can make PythonPlanStreamer support async executing in the thread pool, instead of blocked single process. [~StephanEwen] > Flink-python tests executing cost too long time > --- > > Key: FLINK-5650 > URL: https://issues.apache.org/jira/browse/FLINK-5650 > Project: Flink > Issue Type: Bug > Components: Python API, Tests >Affects Versions: 1.2.0 > Reporter: shijinkui >Priority: Critical > Fix For: 1.2.1 > > > When execute `mvn clean test` in flink-python, it will wait more than half > hour after the console output below: > --- > T E S T S > --- > Running org.apache.flink.python.api.PythonPlanBinderTest > log4j:WARN No appenders could be found for logger > (org.apache.flink.python.api.PythonPlanBinderTest). > log4j:WARN Please initialize the log4j system properly. > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more > info. > The stack below: > "main" prio=5 tid=0x7f8d7780b800 nid=0x1c03 waiting on condition > [0x79fd8000] >java.lang.Thread.State: TIMED_WAITING (sleeping) > at java.lang.Thread.sleep(Native Method) > at > org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.startPython(PythonPlanStreamer.java:70) > at > org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.open(PythonPlanStreamer.java:50) > at > org.apache.flink.python.api.PythonPlanBinder.startPython(PythonPlanBinder.java:211) > at > org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:141) > at > org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:114) > at > org.apache.flink.python.api.PythonPlanBinderTest.testProgram(PythonPlanBinderTest.java:83) > at > org.apache.flink.test.util.JavaProgramTestBase.testJobWithoutObjectReuse(JavaProgramTestBase.java:174) > this is the jstack: > https://gist.github.com/shijinkui/af47e8bc6c9f748336bf52efd3df94b0 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5650) Flink-python tests executing cost too long time
[ https://issues.apache.org/jira/browse/FLINK-5650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925824#comment-15925824 ] shijinkui commented on FLINK-5650: -- Now I have to excute `mvn clean test -pl '!flink-libraries/flink-python'` to exclude flink-python module. > Flink-python tests executing cost too long time > --- > > Key: FLINK-5650 > URL: https://issues.apache.org/jira/browse/FLINK-5650 > Project: Flink > Issue Type: Bug > Components: Python API, Tests >Affects Versions: 1.2.0 > Reporter: shijinkui >Priority: Critical > Fix For: 1.2.1 > > > When execute `mvn clean test` in flink-python, it will wait more than half > hour after the console output below: > --- > T E S T S > --- > Running org.apache.flink.python.api.PythonPlanBinderTest > log4j:WARN No appenders could be found for logger > (org.apache.flink.python.api.PythonPlanBinderTest). > log4j:WARN Please initialize the log4j system properly. > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more > info. > The stack below: > "main" prio=5 tid=0x7f8d7780b800 nid=0x1c03 waiting on condition > [0x79fd8000] >java.lang.Thread.State: TIMED_WAITING (sleeping) > at java.lang.Thread.sleep(Native Method) > at > org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.startPython(PythonPlanStreamer.java:70) > at > org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.open(PythonPlanStreamer.java:50) > at > org.apache.flink.python.api.PythonPlanBinder.startPython(PythonPlanBinder.java:211) > at > org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:141) > at > org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:114) > at > org.apache.flink.python.api.PythonPlanBinderTest.testProgram(PythonPlanBinderTest.java:83) > at > org.apache.flink.test.util.JavaProgramTestBase.testJobWithoutObjectReuse(JavaProgramTestBase.java:174) > this is the jstack: > https://gist.github.com/shijinkui/af47e8bc6c9f748336bf52efd3df94b0 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5754) released tag missing .gitigonore .travis.yml .gitattributes
[ https://issues.apache.org/jira/browse/FLINK-5754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925816#comment-15925816 ] shijinkui commented on FLINK-5754: -- ping [~greghogan] where did you make the tag, on the Github or in the tools scripts? > released tag missing .gitigonore .travis.yml .gitattributes > > > Key: FLINK-5754 > URL: https://issues.apache.org/jira/browse/FLINK-5754 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: shijinkui > > released tag missing .gitigonore .travis.yml .gitattributes. > When make a release version, should only replace the version. > for example: https://github.com/apache/spark/tree/v2.1.0 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3540: [FLINK-6056] apache-rat exclude flink directory in...
GitHub user shijinkui opened a pull request: https://github.com/apache/flink/pull/3540 [FLINK-6056] apache-rat exclude flink directory in tools The flink* directory in the tools is temporary cloned when build distribution. So when build the Flink project, we should exclude the flink* directory. - [X] General - The pull request references the related JIRA issue ("[FLINK-6056] apache-rat exclude flink directory in tools") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [X] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/hwstreaming/flink FLINK-6056 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3540.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3540 commit ccdd802b333555a36b415dbe1289d04a6470f075 Author: Jinkui Shi <shijin...@huawei.com> Date: 2017-03-15T08:05:20Z [FLINK-6056] apache-rat exclude flink directory in tools --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5756) When there are many values under the same key in ListState, RocksDBStateBackend performances poor
[ https://issues.apache.org/jira/browse/FLINK-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923530#comment-15923530 ] shijinkui commented on FLINK-5756: -- [~StephanEwen] Thank for your reply. [~SyinchwunLeo] Test the mini-benchmark please. FLINK-5715 is nice. > When there are many values under the same key in ListState, > RocksDBStateBackend performances poor > - > > Key: FLINK-5756 > URL: https://issues.apache.org/jira/browse/FLINK-5756 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 > Environment: CentOS 7.2 >Reporter: Syinchwun Leo > > When using RocksDB as the StateBackend, if there are many values under the > same key in ListState, the windowState.get() operator performances very poor. > I also the the RocksDB using version 4.11.2, the performance is also very > poor. The problem is likely to related to RocksDB itself's get() operator > after using merge(). The problem may influences the window operation's > performance when the size is very large using ListState. I try to merge 5 > values under the same key in RocksDB, It costs 120 seconds to execute get() > operation. > /// > The flink's code is as follows: > {code} > class SEventSource extends RichSourceFunction [SEvent] { > private var count = 0L > private val alphabet = > "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321" > override def run(sourceContext: SourceContext[SEvent]): Unit = { > while (true) { > for (i <- 0 until 5000) { > sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1)) > count += 1L > } > Thread.sleep(1000) > } > } > } > env.addSource(new SEventSource) > .assignTimestampsAndWatermarks(new > AssignerWithPeriodicWatermarks[SEvent] { > override def getCurrentWatermark: Watermark = { > new Watermark(System.currentTimeMillis()) > } > override def extractTimestamp(t: SEvent, l: Long): Long = { > System.currentTimeMillis() > } > }) > .keyBy(0) > .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2))) > .apply(new WindowStatistic) > .map(x => (System.currentTimeMillis(), x)) > .print() > {code} > > The RocksDB Test code: > {code} > val stringAppendOperator = new StringAppendOperator > val options = new Options() > options.setCompactionStyle(CompactionStyle.LEVEL) > .setCompressionType(CompressionType.SNAPPY_COMPRESSION) > .setLevelCompactionDynamicLevelBytes(true) > .setIncreaseParallelism(4) > .setUseFsync(true) > .setMaxOpenFiles(-1) > .setCreateIfMissing(true) > .setMergeOperator(stringAppendOperator) > val write_options = new WriteOptions > write_options.setSync(false) > val rocksDB = RocksDB.open(options, "/**/Data/") > val key = "key" > val value = > "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321" > val beginmerge = System.currentTimeMillis() > for(i <- 0 to 5) { > rocksDB.merge(key.getBytes(), ("s"+ i + value).getBytes()) > //rocksDB.put(key.getBytes, value.getBytes) > } > println("finish") > val begin = System.currentTimeMillis() > rocksDB.get(key.getBytes) > val end = System.currentTimeMillis() > println("merge cost:" + (begin - beginmerge)) > println("Time consuming:" + (end - begin)) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5756) When there are many values under the same key in ListState, RocksDBStateBackend performances poor
[ https://issues.apache.org/jira/browse/FLINK-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15907043#comment-15907043 ] shijinkui commented on FLINK-5756: -- hi, [~StephanEwen] Do we have some tuning technique about this problem originated RocksDB get()? > When there are many values under the same key in ListState, > RocksDBStateBackend performances poor > - > > Key: FLINK-5756 > URL: https://issues.apache.org/jira/browse/FLINK-5756 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 > Environment: CentOS 7.2 >Reporter: Syinchwun Leo > > When using RocksDB as the StateBackend, if there are many values under the > same key in ListState, the windowState.get() operator performances very poor. > I also the the RocksDB using version 4.11.2, the performance is also very > poor. The problem is likely to related to RocksDB itself's get() operator > after using merge(). The problem may influences the window operation's > performance when the size is very large using ListState. I try to merge 5 > values under the same key in RocksDB, It costs 120 seconds to execute get() > operation. > /// > The flink's code is as follows: > class SEventSource extends RichSourceFunction [SEvent] { > private var count = 0L > private val alphabet = > "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321" > override def run(sourceContext: SourceContext[SEvent]): Unit = { > while (true) { > for (i <- 0 until 5000) { > sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1)) > count += 1L > } > Thread.sleep(1000) > } > } > } > env.addSource(new SEventSource) > .assignTimestampsAndWatermarks(new > AssignerWithPeriodicWatermarks[SEvent] { > override def getCurrentWatermark: Watermark = { > new Watermark(System.currentTimeMillis()) > } > override def extractTimestamp(t: SEvent, l: Long): Long = { > System.currentTimeMillis() > } > }) > .keyBy(0) > .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2))) > .apply(new WindowStatistic) > .map(x => (System.currentTimeMillis(), x)) > .print() > > The RocksDB Test code: > val stringAppendOperator = new StringAppendOperator > val options = new Options() > options.setCompactionStyle(CompactionStyle.LEVEL) > .setCompressionType(CompressionType.SNAPPY_COMPRESSION) > .setLevelCompactionDynamicLevelBytes(true) > .setIncreaseParallelism(4) > .setUseFsync(true) > .setMaxOpenFiles(-1) > .setCreateIfMissing(true) > .setMergeOperator(stringAppendOperator) > val write_options = new WriteOptions > write_options.setSync(false) > val rocksDB = RocksDB.open(options, "/**/Data/") > val key = "key" > val value = > "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321" > val beginmerge = System.currentTimeMillis() > for(i <- 0 to 5) { > rocksDB.merge(key.getBytes(), ("s"+ i + value).getBytes()) > //rocksDB.put(key.getBytes, value.getBytes) > } > println("finish") > val begin = System.currentTimeMillis() > rocksDB.get(key.getBytes) > val end = System.currentTimeMillis() > println("merge cost:" + (begin - beginmerge)) > println("Time consuming:" + (end - begin)) > } > } -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3459: [FLINK-5654] Add processing time OVER RANGE BETWEE...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3459#discussion_r104282631 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/DataStreamProcTimeTimeAggregate.java --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.logical.LogicalWindow; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; +import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; +import org.apache.flink.table.api.StreamTableEnvironment; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.calcite.FlinkTypeFactory; +import org.apache.flink.table.plan.logical.rel.util.WindowAggregateUtil; +import org.apache.flink.table.plan.nodes.datastream.DataStreamRel; +import org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateGlobalWindowFunction; +import org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateWindowFunction; +import org.apache.flink.types.Row; + +public class DataStreamProcTimeTimeAggregate extends DataStreamRelJava { + + private LogicalWindow windowReference; + private String description; + + public DataStreamProcTimeTimeAggregate(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, + RelDataType rowType, String description, LogicalWindow windowReference) { + super(cluster, traitSet, input); + + this.rowType = rowType; + this.description = description; + this.windowReference = windowReference; + + } + + @Override + protected RelDataType deriveRowType() { + // TODO Auto-generated method stub + return super.deriveRowType(); + } + + @Override + public RelNode copy(RelTraitSet traitSet, java.util.List inputs) { + + if (inputs.size() != 1) { + System.err.println(this.getClass().getName() + " : Input size must be one!"); + } + + return new DataStreamProcTimeTimeAggregate(getCluster(), traitSet, inputs.get(0), getRowType(), + getDescription(), windowReference); + + } + + @Override + public DataStream translateToPlan(StreamTableEnvironment tableEnv) { + + // Get the general parameters related to the datastream, inputs, result + TableConfig config = tableEnv.getConfig(); + + DataStream inputDataStream = ((DataStreamRel) getInput()).translateToPlan(tableEnv); + + TypeInformation[] rowType = new TypeInformation[getRowType().getFieldList().size()]; --- End diff -- 1. `getRowType().getFieldList()` can be reused, it's better to declare a new variable. 2. `getRowType().getFieldList()` shows that `getRowType()` mustn't be null, also `getFieldList()` must greater than -1. So, here should guarantee such precondition first. --- If your project is set up for it, you can reply to this email and ha
[GitHub] flink pull request #3459: [FLINK-5654] Add processing time OVER RANGE BETWEE...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3459#discussion_r104283268 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/StreamAggregator.java --- @@ -0,0 +1,23 @@ +package org.apache.flink.table.plan.nodes.datastream.aggs; --- End diff -- have no apache licence header. Rat check failure. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3459: [FLINK-5654] Add processing time OVER RANGE BETWEE...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3459#discussion_r104282413 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/DataStreamProcTimeTimeAggregate.java --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.logical.LogicalWindow; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; +import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; +import org.apache.flink.table.api.StreamTableEnvironment; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.calcite.FlinkTypeFactory; +import org.apache.flink.table.plan.logical.rel.util.WindowAggregateUtil; +import org.apache.flink.table.plan.nodes.datastream.DataStreamRel; +import org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateGlobalWindowFunction; +import org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateWindowFunction; +import org.apache.flink.types.Row; + +public class DataStreamProcTimeTimeAggregate extends DataStreamRelJava { + + private LogicalWindow windowReference; + private String description; + + public DataStreamProcTimeTimeAggregate(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, + RelDataType rowType, String description, LogicalWindow windowReference) { + super(cluster, traitSet, input); + + this.rowType = rowType; + this.description = description; + this.windowReference = windowReference; + + } + + @Override + protected RelDataType deriveRowType() { + // TODO Auto-generated method stub + return super.deriveRowType(); + } + + @Override + public RelNode copy(RelTraitSet traitSet, java.util.List inputs) { + + if (inputs.size() != 1) { + System.err.println(this.getClass().getName() + " : Input size must be one!"); + } + + return new DataStreamProcTimeTimeAggregate(getCluster(), traitSet, inputs.get(0), getRowType(), + getDescription(), windowReference); + + } + + @Override + public DataStream translateToPlan(StreamTableEnvironment tableEnv) { + + // Get the general parameters related to the datastream, inputs, result + TableConfig config = tableEnv.getConfig(); --- End diff -- `config` is not referenced --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3459: [FLINK-5654] Add processing time OVER RANGE BETWEE...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3459#discussion_r104282451 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/DataStreamProcTimeTimeAggregate.java --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.logical.LogicalWindow; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; +import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; +import org.apache.flink.table.api.StreamTableEnvironment; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.calcite.FlinkTypeFactory; +import org.apache.flink.table.plan.logical.rel.util.WindowAggregateUtil; +import org.apache.flink.table.plan.nodes.datastream.DataStreamRel; +import org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateGlobalWindowFunction; +import org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateWindowFunction; +import org.apache.flink.types.Row; + +public class DataStreamProcTimeTimeAggregate extends DataStreamRelJava { --- End diff -- class annotation shouldn't be omit. Better description is need. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3468: [FLINK-5824] Fix String/byte conversions without explicit...
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3468 As your current replacement with `ConfigConstants.DEFAULT_CHARSET`, it's better for setting other charsets. UTF_8 has clear semantics. If we'll never change the default charset utf_8 to other charset, I prefer `ConfigConstants.UTF_8` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-5902) Some images can not show in IE
[ https://issues.apache.org/jira/browse/FLINK-5902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui updated FLINK-5902: - Issue Type: Sub-task (was: Bug) Parent: FLINK-5839 > Some images can not show in IE > -- > > Key: FLINK-5902 > URL: https://issues.apache.org/jira/browse/FLINK-5902 > Project: Flink > Issue Type: Sub-task > Components: Webfrontend > Environment: IE >Reporter: Tao Wang > Attachments: chrome is ok.png, IE 11 with problem.png > > > Some images in the Overview page can not show in IE, as it is good in Chrome. > I'm using IE 11, but think same with IE9 I'll paste the screenshot > later. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3468: [FLINK-5824] Fix String/byte conversions without explicit...
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3468 Good changes. `ConfigConstants.DEFAULT_CHARSET` change to `ConfigConstants.UTF_8 ` may be more clear. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3459: [FLINK-5654] Add processing time OVER RANGE BETWEEN x PRE...
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3459 @stefanobortoli One PR should has only one commit as soon as possible. If having so many commit, it's hard to review.. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3459: [FLINK-5654] Add processing time OVER RANGE BETWEE...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3459#discussion_r104282393 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/DataStreamProcTimeTimeAggregate.java --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.logical.LogicalWindow; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; +import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; +import org.apache.flink.table.api.StreamTableEnvironment; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.calcite.FlinkTypeFactory; +import org.apache.flink.table.plan.logical.rel.util.WindowAggregateUtil; +import org.apache.flink.table.plan.nodes.datastream.DataStreamRel; +import org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateGlobalWindowFunction; +import org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateWindowFunction; +import org.apache.flink.types.Row; + +public class DataStreamProcTimeTimeAggregate extends DataStreamRelJava { + + private LogicalWindow windowReference; + private String description; --- End diff -- I didn't find where `description` is used. If one variable is not used, it shouldn't be declared. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3408: [FLINK-5903][YARN]respect taskmanager.numberOfTask...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3408#discussion_r104277608 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -317,6 +319,10 @@ public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationN if (cmd.hasOption(SLOTS.getOpt())) { int slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt())); yarnClusterDescriptor.setTaskManagerSlots(slots); + } else if (config.containsKey(ConfigConstants.YARN_VCORES)) { --- End diff -- @tillrohrmann I'm agree with you. We need make sense that YARN_VCORES is available in yarn mode, and yarn/mesos/standalone should have different configuration. And `YARN_VCORES` is the sum of `Resource#getVirtualCores` response from yarn client. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-5818) change checkpoint dir permission to 700 for security reason
[ https://issues.apache.org/jira/browse/FLINK-5818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui reassigned FLINK-5818: Assignee: Tao Wang > change checkpoint dir permission to 700 for security reason > --- > > Key: FLINK-5818 > URL: https://issues.apache.org/jira/browse/FLINK-5818 > Project: Flink > Issue Type: Sub-task > Components: Security, State Backends, Checkpointing >Reporter: Tao Wang >Assignee: Tao Wang > > Now checkpoint directory is made w/o specified permission, so it is easy for > another user to delete or read files under it, which will cause restore > failure or information leak. > It's better to lower it down to 700. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3190: [FLINK-5546][build] java.io.tmpdir setted as project buil...
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3190 > Can we just use the ${project.build.directory} as java.io.tmpdir ? @wenlong88 Sorry for late reply. It's good question. If use `${project.build.directory}` without sub directory `tmp`, the UT will create various directories, maybe the directories overlap with other dir, such as `classes`ï¼`surefire-reports` and so on. Using a special dir `tmp` can avoid the probability of directory conflict. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3190: [FLINK-5546][build] java.io.tmpdir setted as project buil...
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/3190 Get it. Run single test, having no temp created, it should use the default java.io.tmpdir property. Let me check that. In the base test class have a double check about the target --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2460: [FLINK-4562] table examples make an divided module in fli...
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/2460 @wuchong @StephanEwen can you review this pull request. Generating independent example runnable jar will increase the size of flink distribution. This table example jar is 13Mb or so. We can make a agreement of example here: 1. all the example code moved in flink-examples module, as a sub-module. Yes or No? 2. optionally generate runnable example jar in flink distribution. Yes or No? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-5546) java.io.tmpdir setted as project build directory in surefire plugin
[ https://issues.apache.org/jira/browse/FLINK-5546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui closed FLINK-5546. Resolution: Duplicate resolved in FLINK-5817 > java.io.tmpdir setted as project build directory in surefire plugin > --- > > Key: FLINK-5546 > URL: https://issues.apache.org/jira/browse/FLINK-5546 > Project: Flink > Issue Type: Sub-task > Components: Build System > Environment: CentOS 7.2 >Reporter: Syinchwun Leo >Assignee: shijinkui > Fix For: 1.2.1 > > > When multiple Linux users run test at the same time, flink-runtime module may > fail. User A creates /tmp/cacheFile, and User B will have no permission to > visit the fold. > Failed tests: > FileCacheDeleteValidationTest.setup:79 Error initializing the test: > /tmp/cacheFile (Permission denied) > Tests in error: > IOManagerTest.channelEnumerator:54 » Runtime Could not create storage > director... > Tests run: 1385, Failures: 1, Errors: 1, Skipped: 8 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5780) Extend ConfigOption with descriptions
[ https://issues.apache.org/jira/browse/FLINK-5780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875418#comment-15875418 ] shijinkui commented on FLINK-5780: -- Just sound like a extension of apache common-cli. https://commons.apache.org/proper/commons-cli/ IMO, commonk-cli style is the standard, i like it. Is that so? > Extend ConfigOption with descriptions > - > > Key: FLINK-5780 > URL: https://issues.apache.org/jira/browse/FLINK-5780 > Project: Flink > Issue Type: Sub-task > Components: Core, Documentation >Reporter: Ufuk Celebi > > The {{ConfigOption}} type is meant to replace the flat {{ConfigConstants}}. > As part of automating the generation of a docs config page we need to extend > {{ConfigOption}} with description fields. > From the ML discussion, these could be: > {code} > void shortDescription(String); > void longDescription(String); > {code} > In practice, the description string should contain HTML/Markdown. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5860) Replace all the file creating from java.io.tmpdir with TemporaryFolder
[ https://issues.apache.org/jira/browse/FLINK-5860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui updated FLINK-5860: - Description: Search `System.getProperty("java.io.tmpdir")` in whole Flink project. It will get a Unit test list. Replace all the file creating from `java.io.tmpdir` with TemporaryFolder. Who can fix this problem thoroughly? ``` $ grep -ri 'System.getProperty("java.io.tmpdir")' . ./flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java: env.setStateBackend(new FsStateBackend("file:///" + System.getProperty("java.io.tmpdir") + "/flink/backend")); ./flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: File tempDir = new File(System.getProperty("java.io.tmpdir")); ./flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: File tempDir = new File(System.getProperty("java.io.tmpdir")); ./flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java: File tempDir = new File(System.getProperty("java.io.tmpdir")); ./flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java: return getMockEnvironment(new File[] { new File(System.getProperty("java.io.tmpdir")) }); ./flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java: public static final String DEFAULT_TASK_MANAGER_TMP_PATH = System.getProperty("java.io.tmpdir"); ./flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java: final String tempPath = System.getProperty("java.io.tmpdir"); ./flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java: final File tempDir = new File(System.getProperty("java.io.tmpdir")); ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: File tempDir = new File(System.getProperty("java.io.tmpdir")); ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: File tempDir = new File(System.getProperty("java.io.tmpdir")); ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java: final String outDir = params.get("output", System.getProperty("java.io.tmpdir")); ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java: final String tmpDir = System.getProperty("java.io.tmpdir"); ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java: final String outPath = System.getProperty("java.io.tmpdir"); ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar"); ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar"); ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar"); ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java: File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar"); ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java: public static final String FLINK_PYTHON_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "flink_plan"; ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java: public static final String FLINK_TMP_DATA_DIR = System.getProperty("java.io.tmpdir") + File.separator + "flink_data"; ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java: FLINK_HDFS_PATH = "file:" + System.getProperty("java.io.tmpdir") + File.separator + "flink"; ./flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java: baseDir = new File(System.getProperty("java.io.tmpdir")); ./flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInf