[GitHub] flink issue #3769: [FLINK-6367] support custom header settings of allow orig...

2017-05-03 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/3769
  
hi, @zentol @StephanEwen It didn't merge in. @zentol Please check again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3769: [FLINK-6367] support custom header settings of all...

2017-05-02 Thread shijinkui
Github user shijinkui closed the pull request at:

https://github.com/apache/flink/pull/3769


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3769: [FLINK-6367] support custom header settings of all...

2017-05-02 Thread shijinkui
GitHub user shijinkui reopened a pull request:

https://github.com/apache/flink/pull/3769

[FLINK-6367] support custom header settings of allow origin

`jobmanager.web.access-control-allow-origin`: Enable custom access control 
parameter for allow origin header, default is `*`.

- [X] General
  - The pull request references the related JIRA issue ("[FLINK-6367] 
support custom header settings of allow origin")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hwstreaming/flink allow_origin

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3769.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3769


commit b6815edca3d38fed5da32175c32170fbbef084a0
Author: shijinkui <shijin...@huawei.com>
Date:   2017-04-25T12:09:15Z

[FLINK-6367] support custom header settings of allow origin

commit 62cfc7dd5e0575ca64fda2d9b13c4281550383bf
Author: Jinkui Shi <shijin...@huawei.com>
Date:   2017-04-25T15:18:38Z

use ConfigOption wrap key and default value

commit bd295833a038ebce45db921b843863d4221cd25d
Author: Jinkui Shi <shijin...@huawei.com>
Date:   2017-04-25T23:20:42Z

code line format




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3777: [FLINK-6387] [webfrontend]Flink UI support access log

2017-04-27 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/3777
  
@StephanEwen @zentol   The Flink webmonitor is netty inside. In generally, 
Netty is used as a tcp transfer. For a web or API server, jetty or 
Playframework maybe more suitable. Especially we must fill much gaps and holes 
on web security if choose netty as web framework.

I think we should re-consider whether Netty is suitable?  Current Flink UI 
is hard to extend.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3777: [FLINK-6387] [webfrontend]Flink UI support access log

2017-04-26 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/3777
  
> I can see this flooding the logs like crazy, especially with things like 
metrics and watermarks that update often. Some kind of filtering is probably 
necessary here.

You are right. The crazy log can separated into a new log file.

> This seems to be about auditing, so completely different...

The main purpose is record every access, like auditing log.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3777: [FLINK-6387] [webfrontend]Flink UI support access ...

2017-04-26 Thread shijinkui
GitHub user shijinkui opened a pull request:

https://github.com/apache/flink/pull/3777

[FLINK-6387] [webfrontend]Flink UI support access log

Record the use request to the access log. Append use access to the log file.

- [X] General
  - The pull request references the related JIRA issue ("[FLINK-6387] 
[webfrontend]Flink UI support access log")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hwstreaming/flink access_log_support

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3777.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3777


commit 0d19fb95072c90125152513c9b2a07b518d16b27
Author: shijinkui <shijin...@huawei.com>
Date:   2017-02-23T12:06:43Z

[FLINK-6387] [webfrontend]Flink UI support access log




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6387) Flink UI support access log

2017-04-26 Thread shijinkui (JIRA)
shijinkui created FLINK-6387:


 Summary: Flink UI support access log
 Key: FLINK-6387
 URL: https://issues.apache.org/jira/browse/FLINK-6387
 Project: Flink
  Issue Type: Improvement
  Components: Webfrontend
Reporter: shijinkui
Assignee: shijinkui


Record the use request to the access log. Append use access to the log file.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3769: [FLINK-6367] support custom header settings of allow orig...

2017-04-25 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/3769
  
@zentol  fix that


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3769: [FLINK-6367] support custom header settings of all...

2017-04-25 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3769#discussion_r113226235
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
@@ -701,6 +701,9 @@
@Deprecated
public static final String JOB_MANAGER_WEB_BACK_PRESSURE_DELAY = 
"jobmanager.web.backpressure.delay-between-samples";
 
+   /** Web response header of Access-Control-Allow-Origin */
+   public static final String JOB_MANAGER_WEB_ACCESS_CONTROL_ALLOW_ORIGIN 
= "jobmanager.web.access-control-allow-origin";
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-6367) support custom header settings of allow origin

2017-04-25 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui reassigned FLINK-6367:


Assignee: shijinkui

> support custom header settings of allow origin
> --
>
> Key: FLINK-6367
> URL: https://issues.apache.org/jira/browse/FLINK-6367
> Project: Flink
>  Issue Type: Sub-task
>  Components: Webfrontend
>Reporter: shijinkui
>    Assignee: shijinkui
>
> `jobmanager.web.access-control-allow-origin`: Enable custom access control 
> parameter for allow origin header, default is `*`.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3769: [FLINK-6367] support custom header settings of all...

2017-04-25 Thread shijinkui
GitHub user shijinkui opened a pull request:

https://github.com/apache/flink/pull/3769

[FLINK-6367] support custom header settings of allow origin

`jobmanager.web.access-control-allow-origin`: Enable custom access control 
parameter for allow origin header, default is `*`.

- [X] General
  - The pull request references the related JIRA issue ("[FLINK-6367] 
support custom header settings of allow origin")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hwstreaming/flink allow_origin

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3769.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3769


commit b6815edca3d38fed5da32175c32170fbbef084a0
Author: shijinkui <shijin...@huawei.com>
Date:   2017-04-25T12:09:15Z

[FLINK-6367] support custom header settings of allow origin




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6367) support custom header settings of allow origin

2017-04-25 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15982764#comment-15982764
 ] 

shijinkui commented on FLINK-6367:
--

[~greghogan] We need configure some special allow_origin. For example, flink 
set the yarn url as flink allow origin so that forbidden the other urls.

> support custom header settings of allow origin
> --
>
> Key: FLINK-6367
> URL: https://issues.apache.org/jira/browse/FLINK-6367
> Project: Flink
>  Issue Type: Sub-task
>  Components: Webfrontend
>Reporter: shijinkui
>
> `jobmanager.web.access-control-allow-origin`: Enable custom access control 
> parameter for allow origin header, default is `*`.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6367) support custom header settings of allow origin

2017-04-24 Thread shijinkui (JIRA)
shijinkui created FLINK-6367:


 Summary: support custom header settings of allow origin
 Key: FLINK-6367
 URL: https://issues.apache.org/jira/browse/FLINK-6367
 Project: Flink
  Issue Type: Sub-task
  Components: Webfrontend
Reporter: shijinkui


`jobmanager.web.access-control-allow-origin`: Enable custom access control 
parameter for allow origin header, default is `*`.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6193) Flink dist directory normalize

2017-04-21 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui closed FLINK-6193.

Resolution: Fixed

> Flink dist directory normalize
> --
>
> Key: FLINK-6193
> URL: https://issues.apache.org/jira/browse/FLINK-6193
> Project: Flink
>  Issue Type: Improvement
>  Components: Examples
>Reporter: shijinkui
>
> The Flink distribution's directory have no very clear responsibility about 
> what type of files should be in which directory. For example, "opt" 
> directories are mixed with library jars and example jars.
> This mail here: 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-dist-directory-management-td16784.html
> After discuss, we determine for the distribution directory style below:
> - "examples" directory only contain example jars
> - "opt" directory only contain optional library jars in runtime
> - "lib" directory only contain library jar that must be loaded at runtime
> - "resources" directory only contain resource file used at runtime, such as 
> web file



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #2460: [FLINK-4562] table examples make an divided module in fli...

2017-04-21 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/2460
  
@fhueske Thanks : )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2460: [FLINK-4562] table examples make an divided module in fli...

2017-04-21 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/2460
  
> Hi @shijinkui, yes that sounds good to me.

Thanks. When will we merge this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5902) Some images can not show in IE

2017-04-05 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15958261#comment-15958261
 ] 

shijinkui commented on FLINK-5902:
--

hi, [~ajithshetty]  do we have some solution?

> Some images can not show in IE
> --
>
> Key: FLINK-5902
> URL: https://issues.apache.org/jira/browse/FLINK-5902
> Project: Flink
>  Issue Type: Sub-task
>  Components: Webfrontend
> Environment: IE
>Reporter: Tao Wang
> Attachments: chrome is ok.png, IE 11 with problem.png
>
>
> Some images in the Overview page can not show in IE, as it is good in Chrome.
> I'm using IE 11, but think same with IE9 I'll paste the screenshot 
> later.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API

2017-03-31 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950672#comment-15950672
 ] 

shijinkui commented on FLINK-6233:
--

Is this the sub-issue of FLINK-4557?

> Support rowtime inner equi-join between two streams in the SQL API
> --
>
> Key: FLINK-6233
> URL: https://issues.apache.org/jira/browse/FLINK-6233
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: hongyuhong
>Assignee: hongyuhong
>
> The goal of this issue is to add support for inner equi-join on proc time 
> streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime 
> FROM Orders AS o 
> JOIN Shipments AS s 
> ON o.orderId = s.orderId 
> AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR;
> {code}
> The following restrictions should initially apply:
> * The join hint only support inner join
> * The ON clause should include equi-join condition
> * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL 
> '1' HOUR}} only can use rowtime that is a system attribute, the time 
> condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime 
> - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support 
> unbounded like {{o.rowtime > s.rowtime }},  and  should include both two 
> stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + 
> 1}} should also not be supported.
> An row-time streams join will not be able to handle late data, because this 
> would mean in insert a row into a sorted order shift all other computations. 
> This would be too expensive to maintain. Therefore, we will throw an error if 
> a user tries to use an row-time stream join with late data handling.
> This issue includes:
> * Design of the DataStream operator to deal with stream join
> * Translation from Calcite's RelNode representation (LogicalJoin). 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5860) Replace all the file creating from java.io.tmpdir with TemporaryFolder

2017-03-29 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15948410#comment-15948410
 ] 

shijinkui commented on FLINK-5860:
--

[~yaroslav.mykhaylov] Thank for your work. Wait for your message.

> Replace all the file creating from java.io.tmpdir with TemporaryFolder
> --
>
> Key: FLINK-5860
> URL: https://issues.apache.org/jira/browse/FLINK-5860
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: shijinkui
>Assignee: Yaroslav Mykhaylov
>  Labels: starter
>
> Search `System.getProperty("java.io.tmpdir")` in whole Flink project. It will 
> get a  Unit test list. Replace all the file creating from `java.io.tmpdir` 
> with TemporaryFolder.
> Who can fix this problem thoroughly?
> ```
> $ grep -ri 'System.getProperty("java.io.tmpdir")' .
> ./flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java:
>   env.setStateBackend(new FsStateBackend("file:///" + 
> System.getProperty("java.io.tmpdir") + "/flink/backend"));
> ./flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
> File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
>  File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
>  File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java:
>  return getMockEnvironment(new File[] { new 
> File(System.getProperty("java.io.tmpdir")) });
> ./flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java:
>public static final String DEFAULT_TASK_MANAGER_TMP_PATH = 
> System.getProperty("java.io.tmpdir");
> ./flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java:
>   final String tempPath = System.getProperty("java.io.tmpdir");
> ./flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java:   
> final File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: 
> File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: 
> File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java:
>   final String outDir = params.get("output", 
> System.getProperty("java.io.tmpdir"));
> ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java:
> final String tmpDir = System.getProperty("java.io.tmpdir");
> ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java:
>   final String outPath = System.getProperty("java.io.tmpdir");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java:
>pub

[jira] [Commented] (FLINK-6204) Improve Event-Time OVER ROWS BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-28 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15946477#comment-15946477
 ] 

shijinkui commented on FLINK-6204:
--

-1.
hi, guys. I want to know  the differance between this PR and 
https://github.com/apache/flink/pull/3386
You have 138 comments, but now rewrite the FLINK-3386. Why not recommend this 
solution at 3386. Do we must waste of time on the same problem?

> Improve Event-Time OVER ROWS BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> ---
>
> Key: FLINK-6204
> URL: https://issues.apache.org/jira/browse/FLINK-6204
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Currently `event time OVER ROWS BETWEEN UNBOUNDED PRECEDING aggregation to 
> SQL`  implementation  class: ` UnboundedEventTimeOverProcessFunction` use 
> data size uncontrollable memory data structures`sortedTimestamps: 
> util.LinkedList [Long] cache data timestamps and sort timestamps. IMO,It's 
> not a good way, because in the production environment there are millions of 
> window data pre millisecond in our application scenario.So, I want to remove 
> `util.LinkedList [Long] `. Welcome anyone to give me feedback.
> What do you think? [~fhueske] and [~Yuhong_kyo]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3628: [FLINK-6201][example] move python example files from reso...

2017-03-28 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/3628
  
> I would also suggest to run an example at least once to make sure you 
don't break anything.

You are right. Only move the example files 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3628: [FLINK-6201][example] move python example files fr...

2017-03-28 Thread shijinkui
GitHub user shijinkui opened a pull request:

https://github.com/apache/flink/pull/3628

[FLINK-6201][example] move python example files from resources to the 
examples

Python example in the resource dir is not suitable. Move them to the 
examples/python dir.

- [X] General
  - The pull request references the related JIRA issue ("[FLINK-6201] move 
python example files from resources to the examples")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hwstreaming/flink FLINK-6201

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3628.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3628


commit c9eafd960707971d762da2f7d9179404e91b6211
Author: Jinkui Shi <shijin...@huawei.com>
Date:   2017-03-28T06:40:26Z

[FLINK-6201][example] move python example files from resources to the 
examples




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-6201) move python example files from resources to the examples

2017-03-27 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui reassigned FLINK-6201:


Assignee: shijinkui

> move python example files from resources to the examples
> 
>
> Key: FLINK-6201
> URL: https://issues.apache.org/jira/browse/FLINK-6201
> Project: Flink
>  Issue Type: Sub-task
>  Components: Examples
>Reporter: shijinkui
>    Assignee: shijinkui
>Priority: Trivial
>
> Python example in the resource dir is not suitable. Move them to the 
> examples/python dir.
> ```
> 
>   
> ../flink-libraries/flink-python/src/main/python/org/apache/flink/python/api
>   resources/python
>   0755
> 
> ```



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6201) move python example files from resources to the examples

2017-03-27 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui updated FLINK-6201:
-
Priority: Trivial  (was: Major)

> move python example files from resources to the examples
> 
>
> Key: FLINK-6201
> URL: https://issues.apache.org/jira/browse/FLINK-6201
> Project: Flink
>  Issue Type: Sub-task
>  Components: Examples
>Reporter: shijinkui
>Priority: Trivial
>
> Python example in the resource dir is not suitable. Move them to the 
> examples/python dir.
> ```
> 
>   
> ../flink-libraries/flink-python/src/main/python/org/apache/flink/python/api
>   resources/python
>   0755
> 
> ```



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6201) move python example files from resources to the examples

2017-03-27 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui updated FLINK-6201:
-
Summary: move python example files from resources to the examples  (was: 
move python example files to the examples dir)

> move python example files from resources to the examples
> 
>
> Key: FLINK-6201
> URL: https://issues.apache.org/jira/browse/FLINK-6201
> Project: Flink
>  Issue Type: Sub-task
>  Components: Examples
>Reporter: shijinkui
>
> Python example in the resource dir is not suitable. Move them to the 
> examples/python dir.
> ```
> 
>   
> ../flink-libraries/flink-python/src/main/python/org/apache/flink/python/api
>   resources/python
>   0755
> 
> ```



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6201) move python example files to the examples dir

2017-03-27 Thread shijinkui (JIRA)
shijinkui created FLINK-6201:


 Summary: move python example files to the examples dir
 Key: FLINK-6201
 URL: https://issues.apache.org/jira/browse/FLINK-6201
 Project: Flink
  Issue Type: Sub-task
  Components: Examples
Reporter: shijinkui


Python example in the resource dir is not suitable. Move them to the 
examples/python dir.
```


../flink-libraries/flink-python/src/main/python/org/apache/flink/python/api
resources/python
0755

```



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4319) Rework Cluster Management (FLIP-6)

2017-03-27 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943198#comment-15943198
 ] 

shijinkui commented on FLINK-4319:
--

Flink on Kubernetes, do we have some schedule?

> Rework Cluster Management (FLIP-6)
> --
>
> Key: FLINK-4319
> URL: https://issues.apache.org/jira/browse/FLINK-4319
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management
>Affects Versions: 1.1.0
>Reporter: Stephan Ewen
>
> This is the root issue to track progress of the rework of cluster management 
> (FLIP-6) 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-5217) Deprecated interface Checkpointed make clear suggestion

2017-03-27 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui closed FLINK-5217.

Resolution: Fixed

> Deprecated interface Checkpointed make clear suggestion
> ---
>
> Key: FLINK-5217
> URL: https://issues.apache.org/jira/browse/FLINK-5217
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: shijinkui
> Fix For: 1.2.1
>
>
> package org.apache.flink.streaming.api.checkpoint;
> @Deprecated
> @PublicEvolving
> public interface Checkpointed extends 
> CheckpointedRestoring
> this interface should have clear suggestion which version to give up this 
> interface, and which interface can instead of it.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-5754) released tag missing .gitigonore .travis.yml .gitattributes

2017-03-27 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui closed FLINK-5754.

Resolution: Won't Fix

> released tag missing .gitigonore  .travis.yml .gitattributes
> 
>
> Key: FLINK-5754
> URL: https://issues.apache.org/jira/browse/FLINK-5754
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: shijinkui
>
> released tag missing .gitigonore  .travis.yml .gitattributes.
> When make a release version, should only replace the version.
> for example: https://github.com/apache/spark/tree/v2.1.0



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5754) released tag missing .gitigonore .travis.yml .gitattributes

2017-03-27 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942804#comment-15942804
 ] 

shijinkui commented on FLINK-5754:
--

[~greghogan] It's OK

> released tag missing .gitigonore  .travis.yml .gitattributes
> 
>
> Key: FLINK-5754
> URL: https://issues.apache.org/jira/browse/FLINK-5754
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: shijinkui
>
> released tag missing .gitigonore  .travis.yml .gitattributes.
> When make a release version, should only replace the version.
> for example: https://github.com/apache/spark/tree/v2.1.0



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-4562) table examples make an divided module in flink-examples

2017-03-27 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui updated FLINK-4562:
-
Issue Type: Sub-task  (was: Improvement)
Parent: FLINK-6193

> table examples make an divided module in flink-examples
> ---
>
> Key: FLINK-4562
> URL: https://issues.apache.org/jira/browse/FLINK-4562
> Project: Flink
>  Issue Type: Sub-task
>  Components: Examples, Table API & SQL
>Reporter: shijinkui
>Assignee: shijinkui
> Fix For: 1.2.1
>
>
> example code should't packaged in table module.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6193) Flink dist directory normalize

2017-03-27 Thread shijinkui (JIRA)
shijinkui created FLINK-6193:


 Summary: Flink dist directory normalize
 Key: FLINK-6193
 URL: https://issues.apache.org/jira/browse/FLINK-6193
 Project: Flink
  Issue Type: Improvement
  Components: Examples
Reporter: shijinkui


The Flink distribution's directory have no very clear responsibility about what 
type of files should be in which directory. For example, "opt" directories are 
mixed with library jars and example jars.

This mail here: 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-dist-directory-management-td16784.html

After discuss, we determine for the distribution directory style below:
- "examples" directory only contain example jars
- "opt" directory only contain optional library jars in runtime
- "lib" directory only contain library jar that must be loaded at runtime
- "resources" directory only contain resource file used at runtime, such as web 
file



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3609: [FLINK-6073] - Support for SQL inner queries for p...

2017-03-26 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3609#discussion_r108059835
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{ RelNode, RelWriter, BiRel }
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, 
DataStream, KeyedStream, WindowedStream }
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{ Window => 
DataStreamWindow }
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, 
TimeIntervalTypeInfo }
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.flink.table.api.TableException
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.windowing.triggers.Trigger
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
+import 
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult
+import 
org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion
+import org.apache.flink.streaming.api.windowing.evictors.Evictor
+import 
org.apache.flink.streaming.api.windowing.evictors.Evictor.EvictorContext
+import java.lang.Iterable
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import 
org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue
+import org.apache.flink.api.common.functions.RichFlatJoinFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.util.Collector
+
+class DataStreamJoin(
--- End diff --

need scaladoc to describe the class's responsibility


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3609: [FLINK-6073] - Support for SQL inner queries for p...

2017-03-26 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3609#discussion_r108059875
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{ RelNode, RelWriter, BiRel }
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.datastream.{ AllWindowedStream, 
DataStream, KeyedStream, WindowedStream }
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{ Window => 
DataStreamWindow }
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{ RowIntervalTypeInfo, 
TimeIntervalTypeInfo }
+import org.apache.flink.types.Row
+import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.flink.table.api.TableException
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.windowing.triggers.Trigger
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
+import 
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult
+import 
org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion
+import org.apache.flink.streaming.api.windowing.evictors.Evictor
+import 
org.apache.flink.streaming.api.windowing.evictors.Evictor.EvictorContext
+import java.lang.Iterable
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import 
org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue
+import org.apache.flink.api.common.functions.RichFlatJoinFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.util.Collector
+
+class DataStreamJoin(
+  calc: LogicalJoin,
+  cluster: RelOptCluster,
+  traitSet: RelTraitSet,
+  inputLeft: RelNode,
+  inputRight: RelNode,
+  rowType: RelDataType,
+  description: String)
+extends BiRel(cluster, traitSet, inputLeft, inputRight) with 
DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamJoin(
+  calc,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputs.get(1),
+  rowType,
+  description + calc.getId())
+  }
+
+  override def toString: String = {
+s"Join(${
+  if (!calc.getCondition.isAlwaysTrue()) {
+s"condition: (${calc.getCondition}), "
+  } else {
+""
+  }
+}left: ($inputLeft), right($inputRight))"
+  }
+
+

[GitHub] flink issue #2460: [FLINK-4562] table examples make an divided module in fli...

2017-03-25 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/2460
  
> I agree with @wuchong , that we should follow the example of Gelly and 
add the flink-table-examples JAR file to the opt folder.

@fhueske Thanks for your review.

IMO, directory style below is reasonable:
* `examples` directory only contain example jars
* `opt` directory only contain  optional library jars
* `lib` directory only contain library jar that must be load in runtime

The `opt` directory is noisy, that contains lib jar and example jar: 

``` 
flink-cep-scala_2.11-1.3.0.jar 
flink-gelly_2.11-1.3.0.jar  
flink-metrics-statsd-1.3.0.jar
flink-cep_2.11-1.3.0.jar   
flink-metrics-dropwizard-1.3.0.jar  
flink-ml_2.11-1.3.0.jar
flink-gelly-examples_2.11-1.3.0.jar 
flink-metrics-ganglia-1.3.0.jar
flink-gelly-scala_2.11-1.3.0.jar
flink-metrics-graphite-1.3.0.jar
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-6117) 'zookeeper.sasl.disable' not takes effet when starting CuratorFramework

2017-03-24 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui updated FLINK-6117:
-
Issue Type: Sub-task  (was: Bug)
Parent: FLINK-5839

> 'zookeeper.sasl.disable'  not takes effet when starting CuratorFramework
> 
>
> Key: FLINK-6117
> URL: https://issues.apache.org/jira/browse/FLINK-6117
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client, JobManager
>Affects Versions: 1.2.0
> Environment: Ubuntu, non-secured
>Reporter: CanBin Zheng
>Assignee: CanBin Zheng
>  Labels: security
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> The value of 'zookeeper.sasl.disable' not used in the right way when starting 
> CuratorFramework.
> Here are all the settings relevant to high-availability in my flink-conf.yaml:
>   high-availability: zookeeper
>   high-availability.zookeeper.quorum: localhost:2181
>   high-availability.zookeeper.storageDir: hdfs:///flink/ha/
> Obviously, no explicit value is set for 'zookeeper.sasl.disable' so default 
> value of 'true'(ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE) would be 
> applied. But when FlinkYarnSessionCli & FlinkApplicationMasterRunner start,
> both logs show that they attempt connecting to zookeeper in 'SASL' mode.
> logs are like this:
> 2017-03-18 23:53:10,498 INFO  org.apache.zookeeper.ZooKeeper  
>   - Initiating client connection, connectString=localhost:2181 
> sessionTimeout=6 
> watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@5949eba8
> 2017-03-18 23:53:10,498 INFO  org.apache.zookeeper.ZooKeeper  
>   - Initiating client connection, connectString=localhost:2181 
> sessionTimeout=6 
> watcher=org.apache.flink.shaded.org.apache.curator.ConnectionState@5949eba8
> 2017-03-18 23:53:10,522 WARN  org.apache.zookeeper.ClientCnxn 
>   - SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/jaas-3047036396963510842.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it.
> 2017-03-18 23:53:10,522 WARN  org.apache.zookeeper.ClientCnxn 
>   - SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/jaas-3047036396963510842.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it.
> 2017-03-18 23:53:10,530 INFO  org.apache.zookeeper.ClientCnxn 
>   - Opening socket connection to server localhost/127.0.0.1:2181
> 2017-03-18 23:53:10,530 INFO  org.apache.zookeeper.ClientCnxn 
>   - Opening socket connection to server localhost/127.0.0.1:2181
> 2017-03-18 23:53:10,534 ERROR 
> org.apache.flink.shaded.org.apache.curator.ConnectionState- 
> Authentication failed



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-6148) The Zookeeper client occur SASL error when the sasl is disable

2017-03-24 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui reassigned FLINK-6148:


Assignee: (was: shijinkui)

> The Zookeeper client occur SASL error when the sasl is disable
> --
>
> Key: FLINK-6148
> URL: https://issues.apache.org/jira/browse/FLINK-6148
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.2.0
>Reporter: zhangrucong1982
>
> I use the flink in yarn cluster of version 1.2.0.  The HA is configured in 
> flink-conf.yaml, but the sasl is disabled. The configurations are :
> high-availability: zookeeper
> high-availability.zookeeper.quorum: 
> 100.106.40.102:2181,100.106.57.136:2181,100.106.41.233:2181
> high-availability.zookeeper.storageDir: hdfs:/flink
> high-availability.zookeeper.client.acl: open
> high-availability.zookeeper.path.root:  flink0308
> zookeeper.sasl.disable: true
> The client log、JobManager log、TaskManager log are contain the following error 
> information:
> 2017-03-22 11:18:24,662 WARN  org.apache.zookeeper.ClientCnxn 
>   - SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/jaas-441937039502263015.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it.
> 2017-03-22 11:18:24,663 ERROR 
> org.apache.flink.shaded.org.apache.curator.ConnectionState- 
> Authentication failed



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-6148) The Zookeeper client occur SASL error when the sasl is disable

2017-03-24 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui reassigned FLINK-6148:


Assignee: shijinkui

> The Zookeeper client occur SASL error when the sasl is disable
> --
>
> Key: FLINK-6148
> URL: https://issues.apache.org/jira/browse/FLINK-6148
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.2.0
>Reporter: zhangrucong1982
>Assignee: shijinkui
>
> I use the flink in yarn cluster of version 1.2.0.  The HA is configured in 
> flink-conf.yaml, but the sasl is disabled. The configurations are :
> high-availability: zookeeper
> high-availability.zookeeper.quorum: 
> 100.106.40.102:2181,100.106.57.136:2181,100.106.41.233:2181
> high-availability.zookeeper.storageDir: hdfs:/flink
> high-availability.zookeeper.client.acl: open
> high-availability.zookeeper.path.root:  flink0308
> zookeeper.sasl.disable: true
> The client log、JobManager log、TaskManager log are contain the following error 
> information:
> 2017-03-22 11:18:24,662 WARN  org.apache.zookeeper.ClientCnxn 
>   - SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/jaas-441937039502263015.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it.
> 2017-03-22 11:18:24,663 ERROR 
> org.apache.flink.shaded.org.apache.curator.ConnectionState- 
> Authentication failed



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5217) Deprecated interface Checkpointed make clear suggestion

2017-03-24 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941522#comment-15941522
 ] 

shijinkui commented on FLINK-5217:
--

ping [~srichter]

> Deprecated interface Checkpointed make clear suggestion
> ---
>
> Key: FLINK-5217
> URL: https://issues.apache.org/jira/browse/FLINK-5217
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: shijinkui
> Fix For: 1.2.1
>
>
> package org.apache.flink.streaming.api.checkpoint;
> @Deprecated
> @PublicEvolving
> public interface Checkpointed extends 
> CheckpointedRestoring
> this interface should have clear suggestion which version to give up this 
> interface, and which interface can instead of it.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5860) Replace all the file creating from java.io.tmpdir with TemporaryFolder

2017-03-24 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941520#comment-15941520
 ] 

shijinkui commented on FLINK-5860:
--

ping [~yaroslav.mykhaylov] 

> Replace all the file creating from java.io.tmpdir with TemporaryFolder
> --
>
> Key: FLINK-5860
> URL: https://issues.apache.org/jira/browse/FLINK-5860
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: shijinkui
>Assignee: Yaroslav Mykhaylov
>  Labels: starter
>
> Search `System.getProperty("java.io.tmpdir")` in whole Flink project. It will 
> get a  Unit test list. Replace all the file creating from `java.io.tmpdir` 
> with TemporaryFolder.
> Who can fix this problem thoroughly?
> ```
> $ grep -ri 'System.getProperty("java.io.tmpdir")' .
> ./flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java:
>   env.setStateBackend(new FsStateBackend("file:///" + 
> System.getProperty("java.io.tmpdir") + "/flink/backend"));
> ./flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
> File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
>  File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
>  File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java:
>  return getMockEnvironment(new File[] { new 
> File(System.getProperty("java.io.tmpdir")) });
> ./flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java:
>public static final String DEFAULT_TASK_MANAGER_TMP_PATH = 
> System.getProperty("java.io.tmpdir");
> ./flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java:
>   final String tempPath = System.getProperty("java.io.tmpdir");
> ./flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java:   
> final File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: 
> File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: 
> File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java:
>   final String outDir = params.get("output", 
> System.getProperty("java.io.tmpdir"));
> ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java:
> final String tmpDir = System.getProperty("java.io.tmpdir");
> ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java:
>   final String outPath = System.getProperty("java.io.tmpdir");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java:
>public static final String FLINK_PYTHON_FILE_PATH = 
> System.getProperty("java

[jira] [Commented] (FLINK-6060) reference nonexistent class in the scaladoc

2017-03-24 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941519#comment-15941519
 ] 

shijinkui commented on FLINK-6060:
--

[~aljoscha] Sorry for my unclear description.

For example, the class TaskOperationResult in the scaladoc. Actually 
TaskOperationResult is not exist, or it had been changed file name. So in such 
scaladoc, we should correct the referenced class.

  /**
   * Submits a task to the task manager. The result is to this message is a
   * [[TaskOperationResult]] message.
   *
   * @param tasks Descriptor which contains the information to start the task.
   */
  case class SubmitTask(tasks: TaskDeploymentDescriptor)
extends TaskMessage with RequiresLeaderSessionID


> reference nonexistent class in the scaladoc
> ---
>
> Key: FLINK-6060
> URL: https://issues.apache.org/jira/browse/FLINK-6060
> Project: Flink
>  Issue Type: Wish
>  Components: Scala API
>Reporter: shijinkui
>
> TaskMessages.scala
> ConnectedStreams.scala
> DataStream.scala
> Who can fix it?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6060) reference nonexistent class in the scaladoc

2017-03-24 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui updated FLINK-6060:
-
Summary: reference nonexistent class in the scaladoc  (was: not exist class 
referance in the scala function annotation)

> reference nonexistent class in the scaladoc
> ---
>
> Key: FLINK-6060
> URL: https://issues.apache.org/jira/browse/FLINK-6060
> Project: Flink
>  Issue Type: Wish
>  Components: Scala API
>Reporter: shijinkui
>
> TaskMessages.scala
> ConnectedStreams.scala
> DataStream.scala
> Who can fix it?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5754) released tag missing .gitigonore .travis.yml .gitattributes

2017-03-24 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941516#comment-15941516
 ] 

shijinkui commented on FLINK-5754:
--

[~greghogan] At first, I image that the flink tag is the same with other open 
source project, so the checkout a branch from tag. It's have to reset it, we 
had gone forward too much.

If we have no special reason, can we don't delete any thing at tag release on 
the next milestone, that following the common tag/release rule?
If so, it'll be very convenient to develop private flink version. And then it 
will have no any difficult to merge to flink community code base.

Thanks



> released tag missing .gitigonore  .travis.yml .gitattributes
> 
>
> Key: FLINK-5754
> URL: https://issues.apache.org/jira/browse/FLINK-5754
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: shijinkui
>
> released tag missing .gitigonore  .travis.yml .gitattributes.
> When make a release version, should only replace the version.
> for example: https://github.com/apache/spark/tree/v2.1.0



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5650) Flink-python tests executing cost too long time

2017-03-17 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929946#comment-15929946
 ] 

shijinkui commented on FLINK-5650:
--

Good job, Thanks.

> Flink-python tests executing cost too long time
> ---
>
> Key: FLINK-5650
> URL: https://issues.apache.org/jira/browse/FLINK-5650
> Project: Flink
>  Issue Type: Bug
>  Components: Python API, Tests
>Affects Versions: 1.2.0, 1.3.0
>    Reporter: shijinkui
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: osx
> Fix For: 1.3.0, 1.2.1
>
>
> When execute `mvn clean test` in flink-python, it will wait more than half 
> hour after the console output below:
> ---
>  T E S T S
> ---
> Running org.apache.flink.python.api.PythonPlanBinderTest
> log4j:WARN No appenders could be found for logger 
> (org.apache.flink.python.api.PythonPlanBinderTest).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
> info.
> The stack below:
> "main" prio=5 tid=0x7f8d7780b800 nid=0x1c03 waiting on condition 
> [0x79fd8000]
>java.lang.Thread.State: TIMED_WAITING (sleeping)
>   at java.lang.Thread.sleep(Native Method)
>   at 
> org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.startPython(PythonPlanStreamer.java:70)
>   at 
> org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.open(PythonPlanStreamer.java:50)
>   at 
> org.apache.flink.python.api.PythonPlanBinder.startPython(PythonPlanBinder.java:211)
>   at 
> org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:141)
>   at 
> org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:114)
>   at 
> org.apache.flink.python.api.PythonPlanBinderTest.testProgram(PythonPlanBinderTest.java:83)
>   at 
> org.apache.flink.test.util.JavaProgramTestBase.testJobWithoutObjectReuse(JavaProgramTestBase.java:174)
> this is the jstack:
> https://gist.github.com/shijinkui/af47e8bc6c9f748336bf52efd3df94b0



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106617241
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -119,6 +154,64 @@ class DataStreamOverAggregate(
 
   }
 
+  def createTimeBoundedProcessingTimeOverWindow(
--- End diff --

93 characters, not reach 100. Intellij IDEA can see the result clearly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-17 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106589672
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param intermediateType the intermediate row tye which the state saved
+  * @param inputType the input row tye which the state saved
+  *
+  */
+class UnboundedEventTimeOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val intermediateType: TypeInformation[Row],
+private val inputType: TypeInformation[Row])
+  extends ProcessFunction[Row, Row]{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var accumulatorState: ValueState[Row] = _
+  private var rowState: ListState[Tuple2[Long, Row]] = _
+
+
+  override def open(config: Configuration) {
+output = new Row(forwardedFieldCount + aggregates.length)
+val valueSerializer: TypeSerializer[Row] =
+  
intermediateType.createSerializer(getRuntimeContext.getExecutionConfig)
+val stateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer)
+accumulatorState = getRuntimeContext.getState[Row](stateDescriptor)
+
+val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] =
+  (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, 
inputType)).createSerializer(
--- End diff --

`(new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType))` can omit the 
outside brackets.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-17 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106590214
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -159,6 +167,46 @@ class DataStreamOverAggregate(
 result
   }
 
+  def createUnboundedAndCurrentRowEventTimeOverWindow(
+inputDS: DataStream[Row]): DataStream[Row]  = {
+
+val overWindow: Group = logicWindow.groups.get(0)
+val partitionKeys: Array[Int] = overWindow.keys.toArray
+val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = 
generateNamedAggregates
+
+// get the output types
+val rowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+
+val result: DataStream[Row] =
+  // partitioned aggregation
+  if (partitionKeys.nonEmpty) {
+val keyedStream = inputDS.keyBy(partitionKeys: _*)
+val processFunction = 
AggregateUtil.CreateUnboundedEventTimeOverProcessFunction(
--- End diff --

`val processFunction = 
AggregateUtil.CreateUnboundedEventTimeOverProcessFunction(` can declared before 
the if/else, because  it have no any relationship with `partitionKeys`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-17 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106592513
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param intermediateType the intermediate row tye which the state saved
+  * @param inputType the input row tye which the state saved
+  *
+  */
+class UnboundedEventTimeOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val intermediateType: TypeInformation[Row],
+private val inputType: TypeInformation[Row])
+  extends ProcessFunction[Row, Row]{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var accumulatorState: ValueState[Row] = _
+  private var rowState: ListState[Tuple2[Long, Row]] = _
+
+
+  override def open(config: Configuration) {
+output = new Row(forwardedFieldCount + aggregates.length)
+val valueSerializer: TypeSerializer[Row] =
+  
intermediateType.createSerializer(getRuntimeContext.getExecutionConfig)
+val stateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer)
+accumulatorState = getRuntimeContext.getState[Row](stateDescriptor)
+
+val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] =
+  (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, 
inputType)).createSerializer(
+
getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, 
Row]]]
+val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] =
+  new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", 
tupleSerializer)
+rowState = getRuntimeContext.getListState[Tuple2[Long, 
Row]](tupleStateDescriptor)
+
+  }
+
+  override def processElement(
+ input: Row,
+ ctx:  ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+// discard later record
+if (ctx.timestamp() >= ctx.timerService().currentWatermark()) {
+  // ensure every key just register on timer
+  
ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1)
+
+  rowState.add(new Tuple2(ctx.timestamp, input))
+}
+  }
+
+  override def onTimer(
+  timestamp: Long,
+  ctx: ProcessFunction[Row, Row]#OnTimerContext,
+  out: Collector[Row]): Unit = {
+
+var rowList = rowState.get.iterator
+var sortList = new util.LinkedList[Tuple2[Long, Row]]()
+while (rowList.hasNext) {
+

[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-17 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106590692
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param intermediateType the intermediate row tye which the state saved
+  * @param inputType the input row tye which the state saved
+  *
+  */
+class UnboundedEventTimeOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val intermediateType: TypeInformation[Row],
+private val inputType: TypeInformation[Row])
+  extends ProcessFunction[Row, Row]{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var accumulatorState: ValueState[Row] = _
+  private var rowState: ListState[Tuple2[Long, Row]] = _
+
+
+  override def open(config: Configuration) {
+output = new Row(forwardedFieldCount + aggregates.length)
+val valueSerializer: TypeSerializer[Row] =
+  
intermediateType.createSerializer(getRuntimeContext.getExecutionConfig)
+val stateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer)
+accumulatorState = getRuntimeContext.getState[Row](stateDescriptor)
+
+val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] =
+  (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, 
inputType)).createSerializer(
+
getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, 
Row]]]
+val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] =
+  new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", 
tupleSerializer)
+rowState = getRuntimeContext.getListState[Tuple2[Long, 
Row]](tupleStateDescriptor)
+
+  }
+
+  override def processElement(
+ input: Row,
+ ctx:  ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+// discard later record
+if (ctx.timestamp() >= ctx.timerService().currentWatermark()) {
+  // ensure every key just register on timer
+  
ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1)
+
+  rowState.add(new Tuple2(ctx.timestamp, input))
+}
+  }
+
+  override def onTimer(
+  timestamp: Long,
+  ctx: ProcessFunction[Row, Row]#OnTimerContext,
+  out: Collector[Row]): Unit = {
+
+var rowList = rowState.get.iterator
+var sortList = new util.LinkedList[Tuple2[Long, Row]]()
+while (rowList.hasNext) {
--- End di

[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-17 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106590822
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param intermediateType the intermediate row tye which the state saved
+  * @param inputType the input row tye which the state saved
+  *
+  */
+class UnboundedEventTimeOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val intermediateType: TypeInformation[Row],
+private val inputType: TypeInformation[Row])
+  extends ProcessFunction[Row, Row]{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var accumulatorState: ValueState[Row] = _
+  private var rowState: ListState[Tuple2[Long, Row]] = _
+
+
+  override def open(config: Configuration) {
+output = new Row(forwardedFieldCount + aggregates.length)
+val valueSerializer: TypeSerializer[Row] =
+  
intermediateType.createSerializer(getRuntimeContext.getExecutionConfig)
+val stateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer)
+accumulatorState = getRuntimeContext.getState[Row](stateDescriptor)
+
+val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] =
+  (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, 
inputType)).createSerializer(
+
getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, 
Row]]]
+val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] =
+  new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", 
tupleSerializer)
+rowState = getRuntimeContext.getListState[Tuple2[Long, 
Row]](tupleStateDescriptor)
+
+  }
+
+  override def processElement(
+ input: Row,
+ ctx:  ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+// discard later record
+if (ctx.timestamp() >= ctx.timerService().currentWatermark()) {
+  // ensure every key just register on timer
+  
ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1)
+
+  rowState.add(new Tuple2(ctx.timestamp, input))
+}
+  }
+
+  override def onTimer(
+  timestamp: Long,
+  ctx: ProcessFunction[Row, Row]#OnTimerContext,
+  out: Collector[Row]): Unit = {
+
+var rowList = rowState.get.iterator
+var sortList = new util.LinkedList[Tuple2[Long, Row]]()
+while (rowList.hasNext) {
+

[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-17 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106590304
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -91,6 +91,35 @@ object AggregateUtil {
   }
 
   /**
+* Create an [[ProcessFunction]] to evaluate final aggregate value.
+*
+* @param namedAggregates List of calls to aggregate functions and 
their output field names
+* @param inputType Input row type
+* @return [[UnboundedProcessingOverProcessFunction]]
+*/
+  private[flink] def CreateUnboundedEventTimeOverProcessFunction(
--- End diff --

function name should start with lowercase


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-17 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106590402
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -91,6 +91,35 @@ object AggregateUtil {
   }
 
   /**
+* Create an [[ProcessFunction]] to evaluate final aggregate value.
+*
+* @param namedAggregates List of calls to aggregate functions and 
their output field names
+* @param inputType Input row type
+* @return [[UnboundedProcessingOverProcessFunction]]
+*/
+  private[flink] def CreateUnboundedEventTimeOverProcessFunction(
+   namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+   inputType: RelDataType): UnboundedEventTimeOverProcessFunction = {
+
+val (aggFields, aggregates) =
+  transformToAggregateFunctions(
+namedAggregates.map(_.getKey),
+inputType,
+needRetraction = false)
+
+val aggregationStateType: RowTypeInfo =
--- End diff --

```
val aggregationStateType: RowTypeInfo =
  createDataSetAggregateBufferDataType(Array(), aggregates, inputType)
```
This will be more readable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-17 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106592195
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param intermediateType the intermediate row tye which the state saved
+  * @param inputType the input row tye which the state saved
+  *
+  */
+class UnboundedEventTimeOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val intermediateType: TypeInformation[Row],
+private val inputType: TypeInformation[Row])
+  extends ProcessFunction[Row, Row]{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var accumulatorState: ValueState[Row] = _
+  private var rowState: ListState[Tuple2[Long, Row]] = _
+
+
+  override def open(config: Configuration) {
+output = new Row(forwardedFieldCount + aggregates.length)
+val valueSerializer: TypeSerializer[Row] =
+  
intermediateType.createSerializer(getRuntimeContext.getExecutionConfig)
+val stateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer)
+accumulatorState = getRuntimeContext.getState[Row](stateDescriptor)
+
+val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] =
+  (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, 
inputType)).createSerializer(
+
getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, 
Row]]]
+val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] =
+  new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", 
tupleSerializer)
+rowState = getRuntimeContext.getListState[Tuple2[Long, 
Row]](tupleStateDescriptor)
+
+  }
+
+  override def processElement(
+ input: Row,
+ ctx:  ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+// discard later record
+if (ctx.timestamp() >= ctx.timerService().currentWatermark()) {
+  // ensure every key just register on timer
+  
ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1)
+
+  rowState.add(new Tuple2(ctx.timestamp, input))
+}
+  }
+
+  override def onTimer(
+  timestamp: Long,
+  ctx: ProcessFunction[Row, Row]#OnTimerContext,
+  out: Collector[Row]): Unit = {
+
+var rowList = rowState.get.iterator
+var sortList = new util.LinkedList[Tuple2[Long, Row]]()
+while (rowList.hasNext) {
+

[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106602209
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataStreamProcTimeAggregateWindowFunction.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import 
org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.functions.Accumulator
+
+/**
+  * Computes the final aggregate value from incrementally computed 
aggreagtes.
+  *
+  * @param numGroupingKey The number of grouping keys.
+  * @param numAggregates The number of aggregates.
+  * @param finalRowArity The arity of the final output row.
+  */
+class DataStreamIncrementalAggregateWindowFunction[W <: Window](
--- End diff --

1. `DataStreamIncrementalAggregateWindowFunction` class is different from 
this class name. 
2. scaladoc `numGroupingKey`, `numAggregates`, 'finalRowArity' didn't exist
3. IMO, all the function in the aggregate package have no enough clear doc 
to describe what/how and the key point function. package `aggregate` is lazy 
working.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106605456
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataStreamProcTimeAggregateGlobalWindowFunction.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate
+
+import 
org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.functions.Accumulator
+
+import java.lang.Iterable
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+
+ //org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
+
+class DataStreamProcTimeAggregateGlobalWindowFunction[W <: Window](
+ private val aggregates: Array[AggregateFunction[_]],
--- End diff --

`private val ` should be choose if the `aggregates` need not be access 
directly. Same to the class field.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106600097
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -119,6 +154,64 @@ class DataStreamOverAggregate(
 
   }
 
+  def createTimeBoundedProcessingTimeOverWindow(
+inputDS: DataStream[Row]): DataStream[Row] = {
+
+val overWindow: Group = logicWindow.groups.get(0)
+val partitionKeys: Array[Int] = overWindow.keys.toArray
+val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = 
generateNamedAggregates
+
+// final long time_boundary =
+// 
Long.parseLong(windowReference.getConstants().get(1).getValue().toString());
+val index = 
overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex
+val count = input.getRowType().getFieldCount()
+val lowerboundIndex = index - count
+val time_boundary =  logicWindow.constants.get(lowerboundIndex)
--- End diff --

`val time_boundary =  logicWindow.constants.get(lowerboundIndex)`
trim the two space, like
`val time_boundary = logicWindow.constants.get(lowerboundIndex)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106411557
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -191,3 +287,31 @@ class DataStreamOverAggregate(
 
 }
 
+object DataStreamProcTimeCase {
+  class ProcTimeTimestampExtractor
+  extends AssignerWithPunctuatedWatermarks[Row] {
+
+override def checkAndGetNextWatermark(
+  lastElement: Row,
+  extractedTimestamp: Long): Watermark = {
+  null
+}
+
+override def extractTimestamp(
+  element: Row,
+  previousElementTimestamp: Long): Long = {
+  System.currentTimeMillis()
+}
+  }
+  /*
+  class MyWindowFunction extends AllWindowFunction[Row, Row, GlobalWindow] 
{
--- End diff --

delete it if useless


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106604502
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataStreamProcTimeAggregateWindowFunction.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import 
org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.functions.Accumulator
+
+/**
+  * Computes the final aggregate value from incrementally computed 
aggreagtes.
+  *
+  * @param numGroupingKey The number of grouping keys.
+  * @param numAggregates The number of aggregates.
+  * @param finalRowArity The arity of the final output row.
+  */
+class DataStreamIncrementalAggregateWindowFunction[W <: Window](
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Int],
+ private val forwardedFieldCount: Int)
+  extends RichWindowFunction[Row, Row, Tuple, W] {
+
+private var output: Row = _
+private var accumulators: Row= _
+
+  override def open(parameters: Configuration): Unit = {
+ output = new Row(forwardedFieldCount + aggregates.length)
+ accumulators = new Row(aggregates.length)
+ var i = 0
+ while (i < aggregates.length) {
+accumulators.setField(i, aggregates(i).createAccumulator())
+i = i + 1
+ }
+  }
+  
+ 
+  /**
+* Calculate aggregated values output by aggregate buffer, and set them 
into output
+* Row based on the mapping relation between intermediate aggregate 
data and output data.
+*/
+  override def apply(
+  key: Tuple,
+  window: W,
+  records: Iterable[Row],
+  out: Collector[Row]): Unit = {
+
+   var i = 0
--- End diff --

1. this class should be format first.
2. IMO, all the `asInstanceOf` should be guarantee by the match/case except 
the object is determinated.
3. `Calculate aggregated values output by aggregate buffer, and set them 
into output` should end with `.`, and the `@param` tag should be add
4. the variable `i` should have clear name. At the same time `i` is used in 
three loop, that make the code hard to read.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106605103
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataStreamProcTimeAggregateGlobalWindowFunction.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate
+
+import 
org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.functions.Accumulator
+
+import java.lang.Iterable
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+
+ //org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
+
--- End diff --

1. scaladoc is not well format, need a clear and detail scaladoc
2. the class should be format first
3. delete the un-used import phrase, such as `import 
org.apache.flink.api.java.typeutils.RowTypeInfo`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106602970
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataStreamProcTimeAggregateWindowFunction.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import 
org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.functions.Accumulator
+
+/**
+  * Computes the final aggregate value from incrementally computed 
aggreagtes.
+  *
+  * @param numGroupingKey The number of grouping keys.
+  * @param numAggregates The number of aggregates.
+  * @param finalRowArity The arity of the final output row.
+  */
+class DataStreamIncrementalAggregateWindowFunction[W <: Window](
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Int],
+ private val forwardedFieldCount: Int)
+  extends RichWindowFunction[Row, Row, Tuple, W] {
+
+private var output: Row = _
+private var accumulators: Row= _
+
+  override def open(parameters: Configuration): Unit = {
+ output = new Row(forwardedFieldCount + aggregates.length)
+ accumulators = new Row(aggregates.length)
+ var i = 0
--- End diff --

`aggregates` is array, also we can convert it to a `iterator`, then the `i` 
will be no needed. Iterator is more safe.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106599975
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -119,6 +154,64 @@ class DataStreamOverAggregate(
 
   }
 
+  def createTimeBoundedProcessingTimeOverWindow(
+inputDS: DataStream[Row]): DataStream[Row] = {
+
+val overWindow: Group = logicWindow.groups.get(0)
+val partitionKeys: Array[Int] = overWindow.keys.toArray
+val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = 
generateNamedAggregates
+
+// final long time_boundary =
--- End diff --

if no used, delete the comment code lines


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106601075
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -785,7 +785,7 @@ object AggregateUtil {
 (propPos._1, propPos._2)
   }
 
-  private def transformToAggregateFunctions(
--- End diff --

Using [flink] make this function can be accessed, that can work. But we 
need think one thing: whether there function need be visit in the `flink` range 
of package. 
My suggestion is `private[table]` to apply other function in this class. 
What other reviewer think about that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106411491
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -136,13 +229,13 @@ class DataStreamOverAggregate(
 namedAggregates,
 inputType)
 
-  inputDS
+inputDS
   .keyBy(partitionKeys: _*)
   .process(processFunction)
   .returns(rowTypeInfo)
   .name(aggOpName)
   .asInstanceOf[DataStream[Row]]
-}
+  } // global non-partitioned aggregation
--- End diff --

annotation should before `inputDS`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106599719
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -119,6 +154,64 @@ class DataStreamOverAggregate(
 
   }
 
+  def createTimeBoundedProcessingTimeOverWindow(
--- End diff --

`def createTimeBoundedProcessingTimeOverWindow(` in one line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106605509
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataStreamProcTimeAggregateGlobalWindowFunction.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate
+
+import 
org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.functions.Accumulator
+
+import java.lang.Iterable
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+
+ //org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
+
+class DataStreamProcTimeAggregateGlobalWindowFunction[W <: Window](
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Int],
+ private val forwardedFieldCount: Int)
+ extends RichAllWindowFunction[Row, Row, W] {
+  
+private var output: Row = _
+private var accumulators: Row= _
+ 
+
+ override def open(parameters: Configuration): Unit = {
+ output = new Row(forwardedFieldCount + aggregates.length)
+ accumulators = new Row(aggregates.length)
+ var i = 0
+ while (i < aggregates.length) {
+accumulators.setField(i, aggregates(i).createAccumulator())
+i = i + 1
+ }
+  }
+   
+ override def apply(
+  window: W,
+  records: Iterable[Row],
+  out: Collector[Row]): Unit = {
+
+  
+ var i = 0
+ //initialize the values of the aggregators by re-creating them
+ //the design of the Accumulator interface should be extended to 
enable 
+ //a reset function for better performance
+ while (i < aggregates.length) {
+accumulators.setField(i, aggregates(i).createAccumulator())
+i += 1
+ }
+ var reuse:Row = null
+ //iterate through the elements and aggregate
+ val iter = records.iterator
+ while (iter.hasNext) {
+   reuse = iter.next
+   i = 0
+   while (i < aggregates.length) {
+  val accumulator = 
accumulators.getField(i).asInstanceOf[Accumulator]
+  aggregates(i).accumulate(accumulator, 
reuse.getField(aggFields(i)))
+  i += 1
+   }
+ }
+
+//set the values of the result with current elements values if needed
+i = 0
+while (i < forwardedFieldCount) {
+  output.setField(i, reuse.getField(i))
+  i += 1
+}
+
+//set the values of the result with the accumulators
+i = 0
+while (i < aggregates.length) {
+  val index = forwardedFieldCount + i
+  val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
+  output.setField(index, aggregates(i).getValue(accumulator))
+  i += 1
+}
+
+out.collect(output)
+
--- End diff --

need not new line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106600464
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -119,6 +154,64 @@ class DataStreamOverAggregate(
 
   }
 
+  def createTimeBoundedProcessingTimeOverWindow(
+inputDS: DataStream[Row]): DataStream[Row] = {
+
+val overWindow: Group = logicWindow.groups.get(0)
+val partitionKeys: Array[Int] = overWindow.keys.toArray
+val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = 
generateNamedAggregates
+
+// final long time_boundary =
+// 
Long.parseLong(windowReference.getConstants().get(1).getValue().toString());
+val index = 
overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex
+val count = input.getRowType().getFieldCount()
+val lowerboundIndex = index - count
+val time_boundary =  logicWindow.constants.get(lowerboundIndex)
+ .getValue2.asInstanceOf[java.math.BigDecimal].longValue()
--- End diff --

`getValue2` returns `Comparable` value, but the value must be 
`java.math.BigDecimal`?
Using match/case to guarantee its type. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3386: [FLINK-5658][table] support unbounded eventtime over wind...

2017-03-17 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/3386
  
@hongyuhong , IMO, all the function should have detail doc 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5860) Replace all the file creating from java.io.tmpdir with TemporaryFolder

2017-03-16 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui updated FLINK-5860:
-
Labels: starter  (was: )

> Replace all the file creating from java.io.tmpdir with TemporaryFolder
> --
>
> Key: FLINK-5860
> URL: https://issues.apache.org/jira/browse/FLINK-5860
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: shijinkui
>Assignee: Yaroslav Mykhaylov
>  Labels: starter
>
> Search `System.getProperty("java.io.tmpdir")` in whole Flink project. It will 
> get a  Unit test list. Replace all the file creating from `java.io.tmpdir` 
> with TemporaryFolder.
> Who can fix this problem thoroughly?
> ```
> $ grep -ri 'System.getProperty("java.io.tmpdir")' .
> ./flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java:
>   env.setStateBackend(new FsStateBackend("file:///" + 
> System.getProperty("java.io.tmpdir") + "/flink/backend"));
> ./flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
> File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
>  File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
>  File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java:
>  return getMockEnvironment(new File[] { new 
> File(System.getProperty("java.io.tmpdir")) });
> ./flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java:
>public static final String DEFAULT_TASK_MANAGER_TMP_PATH = 
> System.getProperty("java.io.tmpdir");
> ./flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java:
>   final String tempPath = System.getProperty("java.io.tmpdir");
> ./flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java:   
> final File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: 
> File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: 
> File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java:
>   final String outDir = params.get("output", 
> System.getProperty("java.io.tmpdir"));
> ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java:
> final String tmpDir = System.getProperty("java.io.tmpdir");
> ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java:
>   final String outPath = System.getProperty("java.io.tmpdir");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java:
>public static final String FLINK_PYTHON_FILE_PATH = 
> System.getProperty("java.io.tmpdir") + File.separator 

[jira] [Commented] (FLINK-5754) released tag missing .gitigonore .travis.yml .gitattributes

2017-03-16 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929365#comment-15929365
 ] 

shijinkui commented on FLINK-5754:
--

[~greghogan] I had checkout a branch from tag 1.2 in produce. In general 
release tag is the final version, not the branch. Am I right?

IMO, the release tag commit only change the project version, and shouldn't 
change any others except version number. 

I look some other apache project, they follow this rule. Can we consider this 
normal rule? Must the hidden files be deleted in the release tag?

https://github.com/apache/spark/commit/cd0a08361e2526519e7c131c42116bf56fa62c76
https://github.com/apache/hadoop/commit/94152e171178d34864ddf6362239f3c2dda0965f
https://github.com/apache/storm/commit/eac433b0beb3798c4723deb39b3c4fad446378f4

> released tag missing .gitigonore  .travis.yml .gitattributes
> 
>
> Key: FLINK-5754
> URL: https://issues.apache.org/jira/browse/FLINK-5754
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: shijinkui
>
> released tag missing .gitigonore  .travis.yml .gitattributes.
> When make a release version, should only replace the version.
> for example: https://github.com/apache/spark/tree/v2.1.0



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-16 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106411203
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala._
+import 
org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamTestData, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+
+import scala.collection.mutable
+
+class UnboundedRowtimeOverTest extends StreamingWithStateTestBase {
--- End diff --

@hongyuhong all the scala UT or IT file name should end with `ITCase` or 
`Suite`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5650) Flink-python tests executing cost too long time

2017-03-16 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927543#comment-15927543
 ] 

shijinkui commented on FLINK-5650:
--

[~Zentol], Your PR can work. flink-python UT cost one min. Very good :)

[INFO] BUILD SUCCESS
[INFO] 
[INFO] Total time: 01:09 min
[INFO] Finished at: 2017-03-16T13:57:12+08:00
[INFO] Final Memory: 22M/268M

> Flink-python tests executing cost too long time
> ---
>
> Key: FLINK-5650
> URL: https://issues.apache.org/jira/browse/FLINK-5650
> Project: Flink
>  Issue Type: Bug
>  Components: Python API, Tests
>Affects Versions: 1.2.0
>    Reporter: shijinkui
>Priority: Critical
>  Labels: osx
> Fix For: 1.2.1
>
>
> When execute `mvn clean test` in flink-python, it will wait more than half 
> hour after the console output below:
> ---
>  T E S T S
> ---
> Running org.apache.flink.python.api.PythonPlanBinderTest
> log4j:WARN No appenders could be found for logger 
> (org.apache.flink.python.api.PythonPlanBinderTest).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
> info.
> The stack below:
> "main" prio=5 tid=0x7f8d7780b800 nid=0x1c03 waiting on condition 
> [0x79fd8000]
>java.lang.Thread.State: TIMED_WAITING (sleeping)
>   at java.lang.Thread.sleep(Native Method)
>   at 
> org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.startPython(PythonPlanStreamer.java:70)
>   at 
> org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.open(PythonPlanStreamer.java:50)
>   at 
> org.apache.flink.python.api.PythonPlanBinder.startPython(PythonPlanBinder.java:211)
>   at 
> org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:141)
>   at 
> org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:114)
>   at 
> org.apache.flink.python.api.PythonPlanBinderTest.testProgram(PythonPlanBinderTest.java:83)
>   at 
> org.apache.flink.test.util.JavaProgramTestBase.testJobWithoutObjectReuse(JavaProgramTestBase.java:174)
> this is the jstack:
> https://gist.github.com/shijinkui/af47e8bc6c9f748336bf52efd3df94b0



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5650) Flink-python tests executing cost too long time

2017-03-15 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui updated FLINK-5650:
-
Labels: osx  (was: )

> Flink-python tests executing cost too long time
> ---
>
> Key: FLINK-5650
> URL: https://issues.apache.org/jira/browse/FLINK-5650
> Project: Flink
>  Issue Type: Bug
>  Components: Python API, Tests
>Affects Versions: 1.2.0
>    Reporter: shijinkui
>Priority: Critical
>  Labels: osx
> Fix For: 1.2.1
>
>
> When execute `mvn clean test` in flink-python, it will wait more than half 
> hour after the console output below:
> ---
>  T E S T S
> ---
> Running org.apache.flink.python.api.PythonPlanBinderTest
> log4j:WARN No appenders could be found for logger 
> (org.apache.flink.python.api.PythonPlanBinderTest).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
> info.
> The stack below:
> "main" prio=5 tid=0x7f8d7780b800 nid=0x1c03 waiting on condition 
> [0x79fd8000]
>java.lang.Thread.State: TIMED_WAITING (sleeping)
>   at java.lang.Thread.sleep(Native Method)
>   at 
> org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.startPython(PythonPlanStreamer.java:70)
>   at 
> org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.open(PythonPlanStreamer.java:50)
>   at 
> org.apache.flink.python.api.PythonPlanBinder.startPython(PythonPlanBinder.java:211)
>   at 
> org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:141)
>   at 
> org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:114)
>   at 
> org.apache.flink.python.api.PythonPlanBinderTest.testProgram(PythonPlanBinderTest.java:83)
>   at 
> org.apache.flink.test.util.JavaProgramTestBase.testJobWithoutObjectReuse(JavaProgramTestBase.java:174)
> this is the jstack:
> https://gist.github.com/shijinkui/af47e8bc6c9f748336bf52efd3df94b0



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3540: [FLINK-6056] [build]apache-rat exclude flink directory in...

2017-03-15 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/3540
  
> I am a bit confused... I think there is no tools/flink* directory that 
would need an exclusion...

When execute `tools/create_release_files.sh`, it will clone flink project 
from apache. Also we had add `tools/flink*` in the .gitignore


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3540: [FLINK-6056] [build]apache-rat exclude flink directory in...

2017-03-15 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/3540
  
> Why exactly is it a problem if the rat-plugin checks the tools directory?
There no necessary to check the tmp flink project in the tools, because it 
will extra cost some time when build the main project.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6060) not exist class referance in the scala function annotation

2017-03-15 Thread shijinkui (JIRA)
shijinkui created FLINK-6060:


 Summary: not exist class referance in the scala function annotation
 Key: FLINK-6060
 URL: https://issues.apache.org/jira/browse/FLINK-6060
 Project: Flink
  Issue Type: Wish
Reporter: shijinkui


TaskMessages.scala
ConnectedStreams.scala
DataStream.scala

Who can fix it?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #2460: [FLINK-4562] table examples make an divided module in fli...

2017-03-15 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/2460
  
ping @wuchong @fhueske @twalthr 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-5650) Flink-python tests executing cost too long time

2017-03-15 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925833#comment-15925833
 ] 

shijinkui edited comment on FLINK-5650 at 3/15/17 10:11 AM:


Also we can make PythonPlanStreamer support async executing in the thread pool, 
instead of blocked single process. [~StephanEwen]

I think such bad design unit test shouldn't be merge into the code base.


was (Author: shijinkui):
Also we can make PythonPlanStreamer support async executing in the thread pool, 
instead of blocked single process. [~StephanEwen]

> Flink-python tests executing cost too long time
> ---
>
> Key: FLINK-5650
> URL: https://issues.apache.org/jira/browse/FLINK-5650
> Project: Flink
>  Issue Type: Bug
>  Components: Python API, Tests
>Affects Versions: 1.2.0
>    Reporter: shijinkui
>Priority: Critical
> Fix For: 1.2.1
>
>
> When execute `mvn clean test` in flink-python, it will wait more than half 
> hour after the console output below:
> ---
>  T E S T S
> ---
> Running org.apache.flink.python.api.PythonPlanBinderTest
> log4j:WARN No appenders could be found for logger 
> (org.apache.flink.python.api.PythonPlanBinderTest).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
> info.
> The stack below:
> "main" prio=5 tid=0x7f8d7780b800 nid=0x1c03 waiting on condition 
> [0x79fd8000]
>java.lang.Thread.State: TIMED_WAITING (sleeping)
>   at java.lang.Thread.sleep(Native Method)
>   at 
> org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.startPython(PythonPlanStreamer.java:70)
>   at 
> org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.open(PythonPlanStreamer.java:50)
>   at 
> org.apache.flink.python.api.PythonPlanBinder.startPython(PythonPlanBinder.java:211)
>   at 
> org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:141)
>   at 
> org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:114)
>   at 
> org.apache.flink.python.api.PythonPlanBinderTest.testProgram(PythonPlanBinderTest.java:83)
>   at 
> org.apache.flink.test.util.JavaProgramTestBase.testJobWithoutObjectReuse(JavaProgramTestBase.java:174)
> this is the jstack:
> https://gist.github.com/shijinkui/af47e8bc6c9f748336bf52efd3df94b0



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5650) Flink-python tests executing cost too long time

2017-03-15 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925833#comment-15925833
 ] 

shijinkui commented on FLINK-5650:
--

Also we can make PythonPlanStreamer support async executing in the thread pool, 
instead of blocked single process. [~StephanEwen]

> Flink-python tests executing cost too long time
> ---
>
> Key: FLINK-5650
> URL: https://issues.apache.org/jira/browse/FLINK-5650
> Project: Flink
>  Issue Type: Bug
>  Components: Python API, Tests
>Affects Versions: 1.2.0
>    Reporter: shijinkui
>Priority: Critical
> Fix For: 1.2.1
>
>
> When execute `mvn clean test` in flink-python, it will wait more than half 
> hour after the console output below:
> ---
>  T E S T S
> ---
> Running org.apache.flink.python.api.PythonPlanBinderTest
> log4j:WARN No appenders could be found for logger 
> (org.apache.flink.python.api.PythonPlanBinderTest).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
> info.
> The stack below:
> "main" prio=5 tid=0x7f8d7780b800 nid=0x1c03 waiting on condition 
> [0x79fd8000]
>java.lang.Thread.State: TIMED_WAITING (sleeping)
>   at java.lang.Thread.sleep(Native Method)
>   at 
> org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.startPython(PythonPlanStreamer.java:70)
>   at 
> org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.open(PythonPlanStreamer.java:50)
>   at 
> org.apache.flink.python.api.PythonPlanBinder.startPython(PythonPlanBinder.java:211)
>   at 
> org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:141)
>   at 
> org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:114)
>   at 
> org.apache.flink.python.api.PythonPlanBinderTest.testProgram(PythonPlanBinderTest.java:83)
>   at 
> org.apache.flink.test.util.JavaProgramTestBase.testJobWithoutObjectReuse(JavaProgramTestBase.java:174)
> this is the jstack:
> https://gist.github.com/shijinkui/af47e8bc6c9f748336bf52efd3df94b0



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5650) Flink-python tests executing cost too long time

2017-03-15 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925824#comment-15925824
 ] 

shijinkui commented on FLINK-5650:
--

Now I have to excute `mvn clean test -pl '!flink-libraries/flink-python'` to 
exclude flink-python module.

> Flink-python tests executing cost too long time
> ---
>
> Key: FLINK-5650
> URL: https://issues.apache.org/jira/browse/FLINK-5650
> Project: Flink
>  Issue Type: Bug
>  Components: Python API, Tests
>Affects Versions: 1.2.0
>    Reporter: shijinkui
>Priority: Critical
> Fix For: 1.2.1
>
>
> When execute `mvn clean test` in flink-python, it will wait more than half 
> hour after the console output below:
> ---
>  T E S T S
> ---
> Running org.apache.flink.python.api.PythonPlanBinderTest
> log4j:WARN No appenders could be found for logger 
> (org.apache.flink.python.api.PythonPlanBinderTest).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
> info.
> The stack below:
> "main" prio=5 tid=0x7f8d7780b800 nid=0x1c03 waiting on condition 
> [0x79fd8000]
>java.lang.Thread.State: TIMED_WAITING (sleeping)
>   at java.lang.Thread.sleep(Native Method)
>   at 
> org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.startPython(PythonPlanStreamer.java:70)
>   at 
> org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.open(PythonPlanStreamer.java:50)
>   at 
> org.apache.flink.python.api.PythonPlanBinder.startPython(PythonPlanBinder.java:211)
>   at 
> org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:141)
>   at 
> org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:114)
>   at 
> org.apache.flink.python.api.PythonPlanBinderTest.testProgram(PythonPlanBinderTest.java:83)
>   at 
> org.apache.flink.test.util.JavaProgramTestBase.testJobWithoutObjectReuse(JavaProgramTestBase.java:174)
> this is the jstack:
> https://gist.github.com/shijinkui/af47e8bc6c9f748336bf52efd3df94b0



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5754) released tag missing .gitigonore .travis.yml .gitattributes

2017-03-15 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925816#comment-15925816
 ] 

shijinkui commented on FLINK-5754:
--

ping [~greghogan] where did you make the tag, on the Github or in the tools 
scripts?

> released tag missing .gitigonore  .travis.yml .gitattributes
> 
>
> Key: FLINK-5754
> URL: https://issues.apache.org/jira/browse/FLINK-5754
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: shijinkui
>
> released tag missing .gitigonore  .travis.yml .gitattributes.
> When make a release version, should only replace the version.
> for example: https://github.com/apache/spark/tree/v2.1.0



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3540: [FLINK-6056] apache-rat exclude flink directory in...

2017-03-15 Thread shijinkui
GitHub user shijinkui opened a pull request:

https://github.com/apache/flink/pull/3540

[FLINK-6056] apache-rat exclude flink directory in tools

The flink* directory in the tools is temporary cloned when build 
distribution.
So when build the Flink project, we should exclude the flink* directory.

- [X] General
  - The pull request references the related JIRA issue ("[FLINK-6056] 
apache-rat exclude flink directory in tools")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hwstreaming/flink FLINK-6056

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3540.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3540


commit ccdd802b333555a36b415dbe1289d04a6470f075
Author: Jinkui Shi <shijin...@huawei.com>
Date:   2017-03-15T08:05:20Z

[FLINK-6056] apache-rat exclude flink directory in tools




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5756) When there are many values under the same key in ListState, RocksDBStateBackend performances poor

2017-03-13 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923530#comment-15923530
 ] 

shijinkui commented on FLINK-5756:
--

[~StephanEwen] Thank for your reply. [~SyinchwunLeo] Test the mini-benchmark 
please.
FLINK-5715 is nice.

> When there are many values under the same key in ListState, 
> RocksDBStateBackend performances poor
> -
>
> Key: FLINK-5756
> URL: https://issues.apache.org/jira/browse/FLINK-5756
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
> Environment: CentOS 7.2
>Reporter: Syinchwun Leo
>
> When using RocksDB as the StateBackend, if there are many values under the 
> same key in ListState, the windowState.get() operator performances very poor. 
> I also the the RocksDB using version 4.11.2, the performance is also very 
> poor. The problem is likely to related to RocksDB itself's get() operator 
> after using merge(). The problem may influences the window operation's 
> performance when the size is very large using ListState. I try to merge 5 
> values under the same key in RocksDB, It costs 120 seconds to execute get() 
> operation.
> ///
> The flink's code is as follows:
> {code}
> class SEventSource extends RichSourceFunction [SEvent] {
>   private var count = 0L
>   private val alphabet = 
> "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"
>   override def run(sourceContext: SourceContext[SEvent]): Unit = {
> while (true) {
>   for (i <- 0 until 5000) {
> sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1))
> count += 1L
>   }
>   Thread.sleep(1000)
> }
>   }
> }
> env.addSource(new SEventSource)
>   .assignTimestampsAndWatermarks(new 
> AssignerWithPeriodicWatermarks[SEvent] {
> override def getCurrentWatermark: Watermark = {
>   new Watermark(System.currentTimeMillis())
> }
> override def extractTimestamp(t: SEvent, l: Long): Long = {
>   System.currentTimeMillis()
> }
>   })
>   .keyBy(0)
>   .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2)))
>   .apply(new WindowStatistic)
>   .map(x => (System.currentTimeMillis(), x))
>   .print()
> {code}
> 
> The RocksDB Test code:
> {code}
> val stringAppendOperator = new StringAppendOperator
> val options = new Options()
> options.setCompactionStyle(CompactionStyle.LEVEL)
>   .setCompressionType(CompressionType.SNAPPY_COMPRESSION)
>   .setLevelCompactionDynamicLevelBytes(true)
>   .setIncreaseParallelism(4)
>   .setUseFsync(true)
>   .setMaxOpenFiles(-1)
>   .setCreateIfMissing(true)
>   .setMergeOperator(stringAppendOperator)
> val write_options = new WriteOptions
> write_options.setSync(false)
> val rocksDB = RocksDB.open(options, "/**/Data/")
> val key = "key"
> val value = 
> "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"
> val beginmerge = System.currentTimeMillis()
> for(i <- 0 to 5) {
>   rocksDB.merge(key.getBytes(), ("s"+ i + value).getBytes())
>   //rocksDB.put(key.getBytes, value.getBytes)
> }
> println("finish")
> val begin = System.currentTimeMillis()
> rocksDB.get(key.getBytes)
> val end = System.currentTimeMillis()
> println("merge cost:" + (begin - beginmerge))
> println("Time consuming:" + (end - begin))
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5756) When there are many values under the same key in ListState, RocksDBStateBackend performances poor

2017-03-13 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15907043#comment-15907043
 ] 

shijinkui commented on FLINK-5756:
--

hi, [~StephanEwen]  
Do we have some tuning technique about this problem originated RocksDB get()?

> When there are many values under the same key in ListState, 
> RocksDBStateBackend performances poor
> -
>
> Key: FLINK-5756
> URL: https://issues.apache.org/jira/browse/FLINK-5756
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
> Environment: CentOS 7.2
>Reporter: Syinchwun Leo
>
> When using RocksDB as the StateBackend, if there are many values under the 
> same key in ListState, the windowState.get() operator performances very poor. 
> I also the the RocksDB using version 4.11.2, the performance is also very 
> poor. The problem is likely to related to RocksDB itself's get() operator 
> after using merge(). The problem may influences the window operation's 
> performance when the size is very large using ListState. I try to merge 5 
> values under the same key in RocksDB, It costs 120 seconds to execute get() 
> operation.
> ///
> The flink's code is as follows:
> class SEventSource extends RichSourceFunction [SEvent] {
>   private var count = 0L
>   private val alphabet = 
> "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"
>   override def run(sourceContext: SourceContext[SEvent]): Unit = {
> while (true) {
>   for (i <- 0 until 5000) {
> sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1))
> count += 1L
>   }
>   Thread.sleep(1000)
> }
>   }
> }
> env.addSource(new SEventSource)
>   .assignTimestampsAndWatermarks(new 
> AssignerWithPeriodicWatermarks[SEvent] {
> override def getCurrentWatermark: Watermark = {
>   new Watermark(System.currentTimeMillis())
> }
> override def extractTimestamp(t: SEvent, l: Long): Long = {
>   System.currentTimeMillis()
> }
>   })
>   .keyBy(0)
>   .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2)))
>   .apply(new WindowStatistic)
>   .map(x => (System.currentTimeMillis(), x))
>   .print()
> 
> The RocksDB Test code:
> val stringAppendOperator = new StringAppendOperator
> val options = new Options()
> options.setCompactionStyle(CompactionStyle.LEVEL)
>   .setCompressionType(CompressionType.SNAPPY_COMPRESSION)
>   .setLevelCompactionDynamicLevelBytes(true)
>   .setIncreaseParallelism(4)
>   .setUseFsync(true)
>   .setMaxOpenFiles(-1)
>   .setCreateIfMissing(true)
>   .setMergeOperator(stringAppendOperator)
> val write_options = new WriteOptions
> write_options.setSync(false)
> val rocksDB = RocksDB.open(options, "/**/Data/")
> val key = "key"
> val value = 
> "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"
> val beginmerge = System.currentTimeMillis()
> for(i <- 0 to 5) {
>   rocksDB.merge(key.getBytes(), ("s"+ i + value).getBytes())
>   //rocksDB.put(key.getBytes, value.getBytes)
> }
> println("finish")
> val begin = System.currentTimeMillis()
> rocksDB.get(key.getBytes)
> val end = System.currentTimeMillis()
> println("merge cost:" + (begin - beginmerge))
> println("Time consuming:" + (end - begin))
>   }
> }



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3459: [FLINK-5654] Add processing time OVER RANGE BETWEE...

2017-03-06 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3459#discussion_r104282631
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/DataStreamProcTimeTimeAggregate.java
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.table.api.StreamTableEnvironment;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+import org.apache.flink.table.plan.logical.rel.util.WindowAggregateUtil;
+import org.apache.flink.table.plan.nodes.datastream.DataStreamRel;
+import 
org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateGlobalWindowFunction;
+import 
org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateWindowFunction;
+import org.apache.flink.types.Row;
+
+public class DataStreamProcTimeTimeAggregate extends DataStreamRelJava {
+
+   private LogicalWindow windowReference;
+   private String description;
+
+   public DataStreamProcTimeTimeAggregate(RelOptCluster cluster, 
RelTraitSet traitSet, RelNode input,
+   RelDataType rowType, String description, LogicalWindow 
windowReference) {
+   super(cluster, traitSet, input);
+
+   this.rowType = rowType;
+   this.description = description;
+   this.windowReference = windowReference;
+
+   }
+
+   @Override
+   protected RelDataType deriveRowType() {
+   // TODO Auto-generated method stub
+   return super.deriveRowType();
+   }
+
+   @Override
+   public RelNode copy(RelTraitSet traitSet, java.util.List 
inputs) {
+
+   if (inputs.size() != 1) {
+   System.err.println(this.getClass().getName() + " : 
Input size must be one!");
+   }
+
+   return new DataStreamProcTimeTimeAggregate(getCluster(), 
traitSet, inputs.get(0), getRowType(),
+   getDescription(), windowReference);
+
+   }
+
+   @Override
+   public DataStream translateToPlan(StreamTableEnvironment tableEnv) 
{
+
+   // Get the general parameters related to the datastream, 
inputs, result
+   TableConfig config = tableEnv.getConfig();
+
+   DataStream inputDataStream = ((DataStreamRel) 
getInput()).translateToPlan(tableEnv);
+
+   TypeInformation[] rowType = new 
TypeInformation[getRowType().getFieldList().size()];
--- End diff --

1. `getRowType().getFieldList()` can be reused, it's better to declare a 
new variable.
2.  `getRowType().getFieldList()` shows that `getRowType()` mustn't be 
null, also `getFieldList()` must greater than -1.  So, here should guarantee 
such precondition first.


---
If your project is set up for it, you can reply to this email and ha

[GitHub] flink pull request #3459: [FLINK-5654] Add processing time OVER RANGE BETWEE...

2017-03-06 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3459#discussion_r104283268
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/StreamAggregator.java
 ---
@@ -0,0 +1,23 @@
+package org.apache.flink.table.plan.nodes.datastream.aggs;
--- End diff --

have no apache licence header. Rat check failure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3459: [FLINK-5654] Add processing time OVER RANGE BETWEE...

2017-03-06 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3459#discussion_r104282413
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/DataStreamProcTimeTimeAggregate.java
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.table.api.StreamTableEnvironment;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+import org.apache.flink.table.plan.logical.rel.util.WindowAggregateUtil;
+import org.apache.flink.table.plan.nodes.datastream.DataStreamRel;
+import 
org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateGlobalWindowFunction;
+import 
org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateWindowFunction;
+import org.apache.flink.types.Row;
+
+public class DataStreamProcTimeTimeAggregate extends DataStreamRelJava {
+
+   private LogicalWindow windowReference;
+   private String description;
+
+   public DataStreamProcTimeTimeAggregate(RelOptCluster cluster, 
RelTraitSet traitSet, RelNode input,
+   RelDataType rowType, String description, LogicalWindow 
windowReference) {
+   super(cluster, traitSet, input);
+
+   this.rowType = rowType;
+   this.description = description;
+   this.windowReference = windowReference;
+
+   }
+
+   @Override
+   protected RelDataType deriveRowType() {
+   // TODO Auto-generated method stub
+   return super.deriveRowType();
+   }
+
+   @Override
+   public RelNode copy(RelTraitSet traitSet, java.util.List 
inputs) {
+
+   if (inputs.size() != 1) {
+   System.err.println(this.getClass().getName() + " : 
Input size must be one!");
+   }
+
+   return new DataStreamProcTimeTimeAggregate(getCluster(), 
traitSet, inputs.get(0), getRowType(),
+   getDescription(), windowReference);
+
+   }
+
+   @Override
+   public DataStream translateToPlan(StreamTableEnvironment tableEnv) 
{
+
+   // Get the general parameters related to the datastream, 
inputs, result
+   TableConfig config = tableEnv.getConfig();
--- End diff --

`config` is not referenced


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3459: [FLINK-5654] Add processing time OVER RANGE BETWEE...

2017-03-06 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3459#discussion_r104282451
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/DataStreamProcTimeTimeAggregate.java
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.table.api.StreamTableEnvironment;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+import org.apache.flink.table.plan.logical.rel.util.WindowAggregateUtil;
+import org.apache.flink.table.plan.nodes.datastream.DataStreamRel;
+import 
org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateGlobalWindowFunction;
+import 
org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateWindowFunction;
+import org.apache.flink.types.Row;
+
+public class DataStreamProcTimeTimeAggregate extends DataStreamRelJava {
--- End diff --

class annotation shouldn't be omit. Better description is need.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3468: [FLINK-5824] Fix String/byte conversions without explicit...

2017-03-06 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/3468
  
As your current replacement with `ConfigConstants.DEFAULT_CHARSET`, it's 
better for setting other charsets. UTF_8 has clear semantics. If we'll never 
change the default charset utf_8 to other charset, I prefer 
`ConfigConstants.UTF_8`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5902) Some images can not show in IE

2017-03-05 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui updated FLINK-5902:
-
Issue Type: Sub-task  (was: Bug)
Parent: FLINK-5839

> Some images can not show in IE
> --
>
> Key: FLINK-5902
> URL: https://issues.apache.org/jira/browse/FLINK-5902
> Project: Flink
>  Issue Type: Sub-task
>  Components: Webfrontend
> Environment: IE
>Reporter: Tao Wang
> Attachments: chrome is ok.png, IE 11 with problem.png
>
>
> Some images in the Overview page can not show in IE, as it is good in Chrome.
> I'm using IE 11, but think same with IE9 I'll paste the screenshot 
> later.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3468: [FLINK-5824] Fix String/byte conversions without explicit...

2017-03-04 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/3468
  
Good changes. `ConfigConstants.DEFAULT_CHARSET` change to 
`ConfigConstants.UTF_8 ` may be more clear.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3459: [FLINK-5654] Add processing time OVER RANGE BETWEEN x PRE...

2017-03-03 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/3459
  
@stefanobortoli One PR should has only one commit as soon as possible. If 
having so many commit, it's hard to review.. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3459: [FLINK-5654] Add processing time OVER RANGE BETWEE...

2017-03-03 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3459#discussion_r104282393
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/DataStreamProcTimeTimeAggregate.java
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.table.api.StreamTableEnvironment;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+import org.apache.flink.table.plan.logical.rel.util.WindowAggregateUtil;
+import org.apache.flink.table.plan.nodes.datastream.DataStreamRel;
+import 
org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateGlobalWindowFunction;
+import 
org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateWindowFunction;
+import org.apache.flink.types.Row;
+
+public class DataStreamProcTimeTimeAggregate extends DataStreamRelJava {
+
+   private LogicalWindow windowReference;
+   private String description;
--- End diff --

I didn't find where  `description` is used. If one variable is not used, it 
shouldn't be declared.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3408: [FLINK-5903][YARN]respect taskmanager.numberOfTask...

2017-03-03 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3408#discussion_r104277608
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -317,6 +319,10 @@ public AbstractYarnClusterDescriptor 
createDescriptor(String defaultApplicationN
if (cmd.hasOption(SLOTS.getOpt())) {
int slots = 
Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt()));
yarnClusterDescriptor.setTaskManagerSlots(slots);
+   } else if (config.containsKey(ConfigConstants.YARN_VCORES)) {
--- End diff --

@tillrohrmann I'm agree with you. We need make sense that YARN_VCORES is 
available  in yarn mode, and yarn/mesos/standalone should have different 
configuration. And `YARN_VCORES` is the sum of `Resource#getVirtualCores` 
response from yarn client.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-5818) change checkpoint dir permission to 700 for security reason

2017-02-27 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui reassigned FLINK-5818:


Assignee: Tao Wang

> change checkpoint dir permission to 700 for security reason
> ---
>
> Key: FLINK-5818
> URL: https://issues.apache.org/jira/browse/FLINK-5818
> Project: Flink
>  Issue Type: Sub-task
>  Components: Security, State Backends, Checkpointing
>Reporter: Tao Wang
>Assignee: Tao Wang
>
> Now checkpoint directory is made w/o specified permission, so it is easy for 
> another user to delete or read files under it, which will cause restore 
> failure or information leak.
> It's better to lower it down to 700.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3190: [FLINK-5546][build] java.io.tmpdir setted as project buil...

2017-02-23 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/3190
  
> Can we just use the ${project.build.directory} as java.io.tmpdir ?
@wenlong88 Sorry for late reply. 
It's good question. If use `${project.build.directory}` without sub 
directory `tmp`, the UT will create various directories, maybe the directories 
overlap with other dir, such as `classes`/`surefire-reports` and so on. 

Using a special dir `tmp` can avoid the probability of directory conflict.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3190: [FLINK-5546][build] java.io.tmpdir setted as project buil...

2017-02-22 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/3190
  
Get it. Run single test, having no temp created, it should use the default 
java.io.tmpdir property.
Let me check that.
In the base test class have a double check about the target 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2460: [FLINK-4562] table examples make an divided module in fli...

2017-02-21 Thread shijinkui
Github user shijinkui commented on the issue:

https://github.com/apache/flink/pull/2460
  
@wuchong @StephanEwen can you review this pull request. 
Generating independent example runnable jar will increase the size of flink 
distribution. 
This table example jar is 13Mb or so.

We can make a agreement of example here: 
1. all the example code moved in flink-examples module, as a sub-module. 
Yes or No?
2. optionally generate runnable example jar in flink distribution. Yes or 
No?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-5546) java.io.tmpdir setted as project build directory in surefire plugin

2017-02-21 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui closed FLINK-5546.

Resolution: Duplicate

resolved in FLINK-5817

> java.io.tmpdir setted as project build directory in surefire plugin
> ---
>
> Key: FLINK-5546
> URL: https://issues.apache.org/jira/browse/FLINK-5546
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
> Environment: CentOS 7.2
>Reporter: Syinchwun Leo
>Assignee: shijinkui
> Fix For: 1.2.1
>
>
> When multiple Linux users run test at the same time, flink-runtime module may 
> fail. User A creates /tmp/cacheFile, and User B will have no permission to 
> visit the fold.  
> Failed tests: 
> FileCacheDeleteValidationTest.setup:79 Error initializing the test: 
> /tmp/cacheFile (Permission denied)
> Tests in error: 
> IOManagerTest.channelEnumerator:54 » Runtime Could not create storage 
> director...
> Tests run: 1385, Failures: 1, Errors: 1, Skipped: 8



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5780) Extend ConfigOption with descriptions

2017-02-20 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875418#comment-15875418
 ] 

shijinkui commented on FLINK-5780:
--

Just sound like a extension of apache common-cli. 
https://commons.apache.org/proper/commons-cli/
IMO, commonk-cli style is the standard, i like it.
Is that so?

> Extend ConfigOption with descriptions
> -
>
> Key: FLINK-5780
> URL: https://issues.apache.org/jira/browse/FLINK-5780
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core, Documentation
>Reporter: Ufuk Celebi
>
> The {{ConfigOption}} type is meant to replace the flat {{ConfigConstants}}. 
> As part of automating the generation of a docs config page we need to extend  
> {{ConfigOption}} with description fields.
> From the ML discussion, these could be:
> {code}
> void shortDescription(String);
> void longDescription(String);
> {code}
> In practice, the description string should contain HTML/Markdown.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5860) Replace all the file creating from java.io.tmpdir with TemporaryFolder

2017-02-20 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui updated FLINK-5860:
-
Description: 
Search `System.getProperty("java.io.tmpdir")` in whole Flink project. It will 
get a  Unit test list. Replace all the file creating from `java.io.tmpdir` with 
TemporaryFolder.

Who can fix this problem thoroughly?

```

$ grep -ri 'System.getProperty("java.io.tmpdir")' .
./flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java:
env.setStateBackend(new FsStateBackend("file:///" + 
System.getProperty("java.io.tmpdir") + "/flink/backend"));
./flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
  File tempDir = new File(System.getProperty("java.io.tmpdir"));
./flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
   File tempDir = new File(System.getProperty("java.io.tmpdir"));
./flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
   File tempDir = new File(System.getProperty("java.io.tmpdir"));
./flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java:
   return getMockEnvironment(new File[] { new 
File(System.getProperty("java.io.tmpdir")) });
./flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java: 
public static final String DEFAULT_TASK_MANAGER_TMP_PATH = 
System.getProperty("java.io.tmpdir");
./flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java:
final String tempPath = System.getProperty("java.io.tmpdir");
./flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java: 
final File tempDir = new File(System.getProperty("java.io.tmpdir"));
./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java:   
File tempDir = new File(System.getProperty("java.io.tmpdir"));
./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java:   
File tempDir = new File(System.getProperty("java.io.tmpdir"));
./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java:
final String outDir = params.get("output", 
System.getProperty("java.io.tmpdir"));
./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java:
  final String tmpDir = System.getProperty("java.io.tmpdir");
./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java:
final String outPath = System.getProperty("java.io.tmpdir");
./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
File out = new File(System.getProperty("java.io.tmpdir"), 
"jarcreatortest.jar");
./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
File out = new File(System.getProperty("java.io.tmpdir"), 
"jarcreatortest.jar");
./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
File out = new File(System.getProperty("java.io.tmpdir"), 
"jarcreatortest.jar");
./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
File out = new File(System.getProperty("java.io.tmpdir"), 
"jarcreatortest.jar");
./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java:
 public static final String FLINK_PYTHON_FILE_PATH = 
System.getProperty("java.io.tmpdir") + File.separator + "flink_plan";
./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java:
 public static final String FLINK_TMP_DATA_DIR = 
System.getProperty("java.io.tmpdir") + File.separator + "flink_data";
./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java:
 FLINK_HDFS_PATH = "file:" + 
System.getProperty("java.io.tmpdir") + File.separator + "flink";
./flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java: 
baseDir = new File(System.getProperty("java.io.tmpdir"));
./flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInf

  1   2   3   >