[GitHub] flink pull request #2471: Broken links on website

2016-09-03 Thread apivovarov
GitHub user apivovarov opened a pull request:

https://github.com/apache/flink/pull/2471

Broken links on website

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

The website has the following broken links

DataSet API for static data embedded in Java, Scala, and Python,
http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html

Table API with a SQL-like expression language embedded in Java and Scala.

http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/table.html

Gelly, a graph processing API and library.

http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/gelly.html

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apivovarov/flink patch-1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2471.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2471


commit b6d56682a55ae46e70cba33326ec58eb753fa73a
Author: Alexander Pivovarov <apivova...@gmail.com>
Date:   2016-09-03T21:14:58Z

Broken links on website

The website has the following broken links

DataSet API for static data embedded in Java, Scala, and Python,
http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html

Table API with a SQL-like expression language embedded in Java and Scala.

http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/table.html

Gelly, a graph processing API and library.

http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/gelly.html




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2471: Broken links on website

2016-09-06 Thread apivovarov
Github user apivovarov closed the pull request at:

https://github.com/apache/flink/pull/2471


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2471: Broken links on website

2016-09-06 Thread apivovarov
Github user apivovarov commented on the issue:

https://github.com/apache/flink/pull/2471
  
https://issues.apache.org/jira/browse/FLINK-4585
I'll submit a PR to flink-web


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2478: [FLINK-4595] Close FileOutputStream in ParameterTo...

2016-09-07 Thread apivovarov
GitHub user apivovarov opened a pull request:

https://github.com/apache/flink/pull/2478

[FLINK-4595] Close FileOutputStream in ParameterTool

https://issues.apache.org/jira/browse/FLINK-4595

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apivovarov/flink FLINK-4595

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2478.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2478


commit 46f68d5fc621324368ad31fbba52bdf13abfae48
Author: Alexander Pivovarov <apivova...@gmail.com>
Date:   2016-09-07T21:11:06Z

[FLINK-4595] Close FileOutputStream in ParameterTool




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2492: [FLINK-4612] Close FileWriter using try with resou...

2016-09-11 Thread apivovarov
GitHub user apivovarov opened a pull request:

https://github.com/apache/flink/pull/2492

[FLINK-4612] Close FileWriter using try with resources

https://issues.apache.org/jira/browse/FLINK-4612

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apivovarov/flink FLINK-4612

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2492.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2492


commit 3b1a73cb47fb4581e169f2ea5cfaa69d9f4a1c64
Author: Alexander Pivovarov <apivova...@gmail.com>
Date:   2016-09-12T05:46:55Z

[FLINK-4612] Close FileWriter using try with resources




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2489: [FLINK-4608] Use short-circuit AND in Max/Min Aggregation...

2016-09-13 Thread apivovarov
Github user apivovarov commented on the issue:

https://github.com/apache/flink/pull/2489
  
@zentol Can you merge it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2488: [FLINK-4607] Close FileInputStream in ParameterTool and o...

2016-09-13 Thread apivovarov
Github user apivovarov commented on the issue:

https://github.com/apache/flink/pull/2488
  
@zentol Time to merge it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2490: [FLINK-4609] Remove redundant check for null in Cr...

2016-09-13 Thread apivovarov
Github user apivovarov commented on a diff in the pull request:

https://github.com/apache/flink/pull/2490#discussion_r78631478
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java 
---
@@ -129,14 +129,11 @@ private String getDefaultName() {
 
public DefaultCross(DataSet input1, DataSet input2, 
CrossHint hint, String defaultName) {

-   super(input1, input2, new DefaultCrossFunction<I1, 
I2>(),
+   super(Preconditions.checkNotNull(input1, "input1 is 
null"),
--- End diff --

@greghogan 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2491: [FLINK-4610] Replace keySet/getValue with entrySet...

2016-09-10 Thread apivovarov
GitHub user apivovarov opened a pull request:

https://github.com/apache/flink/pull/2491

[FLINK-4610] Replace keySet/getValue with entrySet in UdfAnalyzerUtils

https://issues.apache.org/jira/browse/FLINK-4610

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apivovarov/flink FLINK-4610

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2491.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2491


commit 7440989c09fae5325dbb3cebf0cf9d10f59dcbdd
Author: Alexander Pivovarov <apivova...@gmail.com>
Date:   2016-09-10T06:10:12Z

[FLINK-4610] Replace keySet/getValue with entrySet in UdfAnalyzerUtils




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2483: [FLINK-4601] Check for empty string properly

2016-09-09 Thread apivovarov
GitHub user apivovarov opened a pull request:

https://github.com/apache/flink/pull/2483

[FLINK-4601] Check for empty string properly

https://issues.apache.org/jira/browse/FLINK-4601

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apivovarov/flink FLINK-4601

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2483.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2483


commit 325d40e4145c8cfc33782c005b803d5de9d571a6
Author: Alexander Pivovarov <apivova...@gmail.com>
Date:   2016-09-09T06:58:45Z

[FLINK-4601] Check for empty string properly




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2491: [FLINK-4610] Replace keySet/getValue with entrySet in Udf...

2016-09-10 Thread apivovarov
Github user apivovarov commented on the issue:

https://github.com/apache/flink/pull/2491
  
@twalthr Can you look at this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2490: [FLINK-4609] Remove redundant check for null in Cr...

2016-09-10 Thread apivovarov
Github user apivovarov commented on a diff in the pull request:

https://github.com/apache/flink/pull/2490#discussion_r78283317
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java 
---
@@ -133,10 +133,6 @@ public DefaultCross(DataSet input1, DataSet 
input2, CrossHint hint, Stri
new TupleTypeInfo<Tuple2<I1, 
I2>>(input1.getType(), input2.getType()),
hint, defaultName);
 
-   if (input1 == null || input2 == null) {
-   throw new NullPointerException();
-   }
-
--- End diff --

if input1 or/and input2 are null CrossOperator will throw NPE on line 133

I also added check for null to TwoInputOperator



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2490: [FLINK-4609] Remove redundant check for null in Cr...

2016-09-10 Thread apivovarov
Github user apivovarov commented on a diff in the pull request:

https://github.com/apache/flink/pull/2490#discussion_r78285322
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java 
---
@@ -133,10 +133,6 @@ public DefaultCross(DataSet input1, DataSet 
input2, CrossHint hint, Stri
new TupleTypeInfo<Tuple2<I1, 
I2>>(input1.getType(), input2.getType()),
hint, defaultName);
 
-   if (input1 == null || input2 == null) {
-   throw new NullPointerException();
-   }
-
--- End diff --

Ok, I added null check for input1 and input2 with a message inline with 
calling super in DefaultCross


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2490: [FLINK-4609] Remove redundant check for null in CrossOper...

2016-09-10 Thread apivovarov
Github user apivovarov commented on the issue:

https://github.com/apache/flink/pull/2490
  
Ok, I added null check for input1 and input2 with a message before calling 
super in DefaultCross



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2490: [FLINK-4609] Remove redundant check for null in Cr...

2016-09-10 Thread apivovarov
Github user apivovarov commented on a diff in the pull request:

https://github.com/apache/flink/pull/2490#discussion_r78287335
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java 
---
@@ -129,14 +129,11 @@ private String getDefaultName() {
 
public DefaultCross(DataSet input1, DataSet input2, 
CrossHint hint, String defaultName) {

-   super(input1, input2, new DefaultCrossFunction<I1, 
I2>(),
+   super(Preconditions.checkNotNull(input1, "input1 is 
null"),
--- End diff --

DefaultCross calls `input1.getType()` and `input2.getType()` before calling 
super() on line 134. So, if we add null check to super class (e.g. 
TwoInputOperator) it will not work for DefaultCross


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2490: [FLINK-4609] Remove redundant check for null in Cr...

2016-09-10 Thread apivovarov
Github user apivovarov commented on a diff in the pull request:

https://github.com/apache/flink/pull/2490#discussion_r78288200
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java 
---
@@ -129,14 +129,11 @@ private String getDefaultName() {
 
public DefaultCross(DataSet input1, DataSet input2, 
CrossHint hint, String defaultName) {

-   super(input1, input2, new DefaultCrossFunction<I1, 
I2>(),
+   super(Preconditions.checkNotNull(input1, "input1 is 
null"),
--- End diff --

I also added null check to TwoInputOperator
And removed `input1` and `input2` fields from DefaultCross because 
TwoInputOperator already has them


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2492: [FLINK-4612] Close FileWriter using try with resou...

2016-09-12 Thread apivovarov
Github user apivovarov commented on a diff in the pull request:

https://github.com/apache/flink/pull/2492#discussion_r78447379
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java ---
@@ -55,11 +55,8 @@ public static String createTempFileInDirectory(String 
dir, String contents) thro
f.createNewFile();
f.deleteOnExit();

-   BufferedWriter out = new BufferedWriter(new FileWriter(f));
-   try { 
+   try(BufferedWriter out = new BufferedWriter(new FileWriter(f))) 
{
--- End diff --

Thank you. Just fixed that


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2488: [FLINK-4607] Close FileInputStream in ParameterToo...

2016-09-09 Thread apivovarov
GitHub user apivovarov opened a pull request:

https://github.com/apache/flink/pull/2488

[FLINK-4607] Close FileInputStream in ParameterTool and other

https://issues.apache.org/jira/browse/FLINK-4607

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apivovarov/flink FLINK-4607

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2488.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2488


commit dc0e636f630f8ab96d7cc71ede0a5a3ea5ce24a4
Author: Alexander Pivovarov <apivova...@gmail.com>
Date:   2016-09-10T03:32:28Z

[FLINK-4607] Close FileInputStream in ParameterTool and other




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...

2016-09-09 Thread apivovarov
Github user apivovarov commented on a diff in the pull request:

https://github.com/apache/flink/pull/2487#discussion_r78270750
  
--- Diff: 
flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.siddhi;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException;
+import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Siddhi CEP Execution Environment
+ */
+@PublicEvolving
+public class SiddhiCEP {
+   private final StreamExecutionEnvironment executionEnvironment;
+   private final Map<String, DataStream> dataStreams;
+   private final Map<String, SiddhiStreamSchema> dataStreamSchemas;
+   private final Map<String,Class> extensions = new HashMap<>();
+
+   public Map<String, DataStream> getDataStreams(){
+   return this.dataStreams;
+   }
+
+   public Map<String, SiddhiStreamSchema> getDataStreamSchemas(){
+   return this.dataStreamSchemas;
+   }
+
+   public boolean isStreamDefined(String streamId){
+   return dataStreams.containsKey(streamId);
+   }
+
+   public Map<String,Class> getExtensions(){
+   return this.extensions;
+   }
+
+   public void checkStreamDefined(String streamId) throws 
UndefinedStreamException {
+   if(!isStreamDefined(streamId)){
+   throw new UndefinedStreamException("Stream (streamId: 
"+streamId+") not defined");
+   }
+   }
+
+   public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) 
{
+   this.executionEnvironment = streamExecutionEnvironment;
+   this.dataStreams = new HashMap<>();
+   this.dataStreamSchemas = new HashMap<>();
--- End diff --

lines 64 and 65 can be removed.
Add `= new HashMap<>();` to lines 36 and 37 similar as it was done on line 
38


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...

2016-09-09 Thread apivovarov
Github user apivovarov commented on a diff in the pull request:

https://github.com/apache/flink/pull/2487#discussion_r78270764
  
--- Diff: 
flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.siddhi;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException;
+import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Siddhi CEP Execution Environment
+ */
+@PublicEvolving
+public class SiddhiCEP {
+   private final StreamExecutionEnvironment executionEnvironment;
+   private final Map<String, DataStream> dataStreams;
+   private final Map<String, SiddhiStreamSchema> dataStreamSchemas;
+   private final Map<String,Class> extensions = new HashMap<>();
+
+   public Map<String, DataStream> getDataStreams(){
+   return this.dataStreams;
+   }
+
+   public Map<String, SiddhiStreamSchema> getDataStreamSchemas(){
+   return this.dataStreamSchemas;
+   }
+
+   public boolean isStreamDefined(String streamId){
+   return dataStreams.containsKey(streamId);
+   }
+
+   public Map<String,Class> getExtensions(){
+   return this.extensions;
+   }
+
+   public void checkStreamDefined(String streamId) throws 
UndefinedStreamException {
+   if(!isStreamDefined(streamId)){
+   throw new UndefinedStreamException("Stream (streamId: 
"+streamId+") not defined");
+   }
+   }
+
+   public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) 
{
+   this.executionEnvironment = streamExecutionEnvironment;
+   this.dataStreams = new HashMap<>();
+   this.dataStreamSchemas = new HashMap<>();
+   }
+
+   public static  SiddhiStream.SingleSiddhiStream define(String 
streamId, DataStream inStream, String... fieldNames) {
+   SiddhiCEP environment = 
SiddhiCEP.getSiddhiEnvironment(inStream.getExecutionEnvironment());
+   return environment.from(streamId,inStream,fieldNames);
+   }
+
+   public  SiddhiStream.SingleSiddhiStream from(String streamId, 
DataStream inStream, String... fieldNames){
+   this.registerStream(streamId,inStream,fieldNames);
+   return new SiddhiStream.SingleSiddhiStream<>(streamId, this);
+   }
+
+   public  SiddhiStream.SingleSiddhiStream from(String streamId){
+   return new SiddhiStream.SingleSiddhiStream<>(streamId, this);
+   }
+
+   public  SiddhiStream.UnionSiddhiStream union(String 
firstStreamId,String ... unionStreamIds){
+   return new 
SiddhiStream.SingleSiddhiStream(firstStreamId,this).union(unionStreamIds);
+   }
+
+   public   void registerStream(final String streamId, DataStream 
dataStream, String... fieldNames) {
+   if (isStreamDefined(streamId)) {
+   throw new DuplicatedStreamException("Input stream: " + 
streamId + " already exists");
+   }
+   dataStreams.put(streamId, dataStream);
+   SiddhiStreamSchema schema = new 
SiddhiStreamSchema<>(dataStream.getType(), fieldNames);
+   
schema.setTypeSerializer(schema.getTypeInfo().createSerializer(dataStream.getExecutionConfig()));
+   dataStreamSchemas.put(streamId, schema);
+   }
+
+   public StreamExecutionEnvironment getExecutionEnvironment() {
+ 

[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...

2016-09-09 Thread apivovarov
Github user apivovarov commented on a diff in the pull request:

https://github.com/apache/flink/pull/2487#discussion_r78270757
  
--- Diff: 
flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.siddhi;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException;
+import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Siddhi CEP Execution Environment
+ */
+@PublicEvolving
+public class SiddhiCEP {
+   private final StreamExecutionEnvironment executionEnvironment;
+   private final Map<String, DataStream> dataStreams;
+   private final Map<String, SiddhiStreamSchema> dataStreamSchemas;
+   private final Map<String,Class> extensions = new HashMap<>();
+
+   public Map<String, DataStream> getDataStreams(){
+   return this.dataStreams;
+   }
+
+   public Map<String, SiddhiStreamSchema> getDataStreamSchemas(){
+   return this.dataStreamSchemas;
+   }
+
+   public boolean isStreamDefined(String streamId){
+   return dataStreams.containsKey(streamId);
+   }
+
+   public Map<String,Class> getExtensions(){
+   return this.extensions;
+   }
+
+   public void checkStreamDefined(String streamId) throws 
UndefinedStreamException {
+   if(!isStreamDefined(streamId)){
+   throw new UndefinedStreamException("Stream (streamId: 
"+streamId+") not defined");
+   }
+   }
+
+   public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) 
{
+   this.executionEnvironment = streamExecutionEnvironment;
+   this.dataStreams = new HashMap<>();
+   this.dataStreamSchemas = new HashMap<>();
+   }
+
+   public static  SiddhiStream.SingleSiddhiStream define(String 
streamId, DataStream inStream, String... fieldNames) {
+   SiddhiCEP environment = 
SiddhiCEP.getSiddhiEnvironment(inStream.getExecutionEnvironment());
+   return environment.from(streamId,inStream,fieldNames);
+   }
+
+   public  SiddhiStream.SingleSiddhiStream from(String streamId, 
DataStream inStream, String... fieldNames){
+   this.registerStream(streamId,inStream,fieldNames);
+   return new SiddhiStream.SingleSiddhiStream<>(streamId, this);
+   }
+
+   public  SiddhiStream.SingleSiddhiStream from(String streamId){
+   return new SiddhiStream.SingleSiddhiStream<>(streamId, this);
+   }
+
+   public  SiddhiStream.UnionSiddhiStream union(String 
firstStreamId,String ... unionStreamIds){
+   return new 
SiddhiStream.SingleSiddhiStream(firstStreamId,this).union(unionStreamIds);
+   }
+
+   public   void registerStream(final String streamId, DataStream 
dataStream, String... fieldNames) {
+   if (isStreamDefined(streamId)) {
+   throw new DuplicatedStreamException("Input stream: " + 
streamId + " already exists");
+   }
+   dataStreams.put(streamId, dataStream);
+   SiddhiStreamSchema schema = new 
SiddhiStreamSchema<>(dataStream.getType(), fieldNames);
+   
schema.setTypeSerializer(schema.getTypeInfo().createSerializer(dataStream.getExecutionConfig()));
+   dataStreamSchemas.put(streamId, schema);
+   }
+
+   public StreamExecutionEnvironment getExecutionEnvironment() {
+ 

[GitHub] flink pull request #2485: [Flink 4599] - Add 'explain()' also to StreamTable...

2016-09-09 Thread apivovarov
Github user apivovarov commented on a diff in the pull request:

https://github.com/apache/flink/pull/2485#discussion_r78270796
  
--- Diff: docs/dev/table_api.md ---
@@ -2457,3 +2457,27 @@ The Table API provides a configuration (the 
so-called `TableConfig`) to modify r
 By default, the Table API supports `null` values. Null handling can be 
disabled to improve preformance by setting the `nullCheck` property in the 
`TableConfig` to `false`.
 
 {% top %}
+
+Explaining a Table
+
+The Table API provides a mechanism to describe the graph of operations 
that leads to the resulting output. This is done through the 
`TableEnvironment#explain(table)` method. It returns a string describing two 
graphs: the Abstract Syntax Tree of the relational algebra query and the 
Flink's Execution Plan of the equivalent Flink's Job. 
+
+Table `explain` is supported for both `BatchTableEnvironment` and 
`StreamTableEnvironment`. Currently `StreamTableEnvironment` doesn't support 
the explanation of the Execution Plan.
+
+
+
+{% highlight scala %}
+   val env = StreamExecutionEnvironment.getExecutionEnvironment
+   val tEnv = TableEnvironment.getTableEnvironment(env)
+
+   val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+   val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+   val table = table1.unionAll(table2)
+
+   val explanation:String = tEnv.explain(table)
--- End diff --

put space before String


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2485: [Flink 4599] - Add 'explain()' also to StreamTable...

2016-09-09 Thread apivovarov
Github user apivovarov commented on a diff in the pull request:

https://github.com/apache/flink/pull/2485#discussion_r78270826
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
 ---
@@ -271,5 +271,21 @@ abstract class StreamTableEnvironment(
 }
 
   }
+  /*
+  * Returns the AST of the specified Table API and SQL queries and the 
execution plan to compute
+* the result of the given [[Table]].
+  *
+  * @param table The table for which the AST and execution plan will be 
returned.
+* @param extended Flag to include detailed optimizer estimates.
+  */
+   def explain(table: Table): String = {
+
+val ast = RelOptUtil.toString(table.getRelNode)
+
+s"== Abstract Syntax Tree ==" +
+  System.lineSeparator +
+  s"$ast"
--- End diff --

Maybe this? 
```
s"== Abstract Syntax Tree ==${System.lineSeparator}$ast"
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2485: [Flink 4599] - Add 'explain()' also to StreamTable...

2016-09-09 Thread apivovarov
Github user apivovarov commented on a diff in the pull request:

https://github.com/apache/flink/pull/2485#discussion_r78270846
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
 ---
@@ -271,5 +271,21 @@ abstract class StreamTableEnvironment(
 }
 
   }
+  /*
+  * Returns the AST of the specified Table API and SQL queries and the 
execution plan to compute
+* the result of the given [[Table]].
+  *
+  * @param table The table for which the AST and execution plan will be 
returned.
+* @param extended Flag to include detailed optimizer estimates.
+  */
+   def explain(table: Table): String = {
+
+val ast = RelOptUtil.toString(table.getRelNode)
+
+s"== Abstract Syntax Tree ==" +
+  System.lineSeparator +
+  s"$ast"
--- End diff --

No need for s on line 285


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2485: [Flink 4599] - Add 'explain()' also to StreamTable...

2016-09-09 Thread apivovarov
Github user apivovarov commented on a diff in the pull request:

https://github.com/apache/flink/pull/2485#discussion_r78270898
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.stream
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Assert.assertEquals
+import org.junit._
+
+class ExplainStreamTest
+  extends StreamingMultipleProgramsTestBase {
+
+  val testFilePath = 
ExplainStreamTest.this.getClass.getResource("/").getFile
+
+  @Test
+  def testFilter() : Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val table = env.fromElements((1, "hello"))
+  .toTable(tEnv, 'a, 'b)
+  .filter("a % 2 = 0")
+
+val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
+val source = scala.io.Source.fromFile(testFilePath +
+  
"../../src/test/scala/resources/testFilterStream0.out").mkString.replaceAll("\\r\\n",
 "\n")
+assertEquals(result, source)
+  }
+
+  @Test
+  def testUnion() : Unit = {
--- End diff --

no need for space after () on line 35 and 50


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2489: [FLINK-4608] Use short-circuit AND in Max/Min Aggr...

2016-09-09 Thread apivovarov
GitHub user apivovarov opened a pull request:

https://github.com/apache/flink/pull/2489

[FLINK-4608] Use short-circuit AND in Max/Min AggregationFunction

https://issues.apache.org/jira/browse/FLINK-4608

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apivovarov/flink FLINK-4608

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2489.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2489


commit ceb3aa05bae3d05b9f6c5a7a55e6e43fc91a9450
Author: Alexander Pivovarov <apivova...@gmail.com>
Date:   2016-09-10T04:52:36Z

[FLINK-4608] Use short-circuit AND in Max/Min AggregationFunction




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2490: [FLINK-4609] Remove redundant check for null in Cr...

2016-09-09 Thread apivovarov
GitHub user apivovarov opened a pull request:

https://github.com/apache/flink/pull/2490

[FLINK-4609] Remove redundant check for null in CrossOperator

https://issues.apache.org/jira/browse/FLINK-4609

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apivovarov/flink FLINK-4609

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2490.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2490


commit 272007fc8f350b1b998f28762aade6760b588c73
Author: Alexander Pivovarov <apivova...@gmail.com>
Date:   2016-09-10T05:24:43Z

[FLINK-4609] Remove redundant check for null in CrossOperator




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2490: [FLINK-4609] Remove redundant check for null in Cr...

2016-09-19 Thread apivovarov
Github user apivovarov commented on a diff in the pull request:

https://github.com/apache/flink/pull/2490#discussion_r79441284
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java ---
@@ -971,10 +971,7 @@ public void 
testForwardWithAtLeastOneIterationAssumptionForJavac() {
public void reduce(Iterable<Tuple2<Long, Long>> values, 
Collector out) throws Exception {
Long id = 0L;
for (Tuple2<Long, Long> value : values) {
-   id = value.f0;
--- End diff --

Just reverted this change. Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2538: [FLINK-4666] Make constants to be final in Paramet...

2016-09-22 Thread apivovarov
GitHub user apivovarov opened a pull request:

https://github.com/apache/flink/pull/2538

[FLINK-4666] Make constants to be final in ParameterTool

https://issues.apache.org/jira/browse/FLINK-4666

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apivovarov/flink FLINK-4666

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2538.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2538


commit 11fdcddb932c5474b506f9338592f142fdf1cee3
Author: Alexander Pivovarov <apivova...@gmail.com>
Date:   2016-09-23T05:54:29Z

[FLINK-4666] Make constants to be final in ParameterTool




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2537: [FLINK-4665] Remove boxing/unboxing to parse a pri...

2016-09-22 Thread apivovarov
GitHub user apivovarov opened a pull request:

https://github.com/apache/flink/pull/2537

[FLINK-4665] Remove boxing/unboxing to parse a primitive

https://issues.apache.org/jira/browse/FLINK-4665

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apivovarov/flink FLINK-4665

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2537.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2537


commit eeefde68405d27a828d15443b5815a4e53a6b34c
Author: Alexander Pivovarov <apivova...@gmail.com>
Date:   2016-09-23T03:27:08Z

[FLINK-4665] Remove boxing/unboxing to parse a primitive




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2539: [FLINK-4668] Fix positive random int generation

2016-09-23 Thread apivovarov
GitHub user apivovarov opened a pull request:

https://github.com/apache/flink/pull/2539

[FLINK-4668] Fix positive random int generation

https://issues.apache.org/jira/browse/FLINK-4668

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apivovarov/flink FLINK-4668

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2539.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2539


commit 4ab1e4fa97fa41054817955230559766acbdf698
Author: Alexander Pivovarov <apivova...@gmail.com>
Date:   2016-09-23T06:26:27Z

[FLINK-4668] Fix positive random int generation




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2539: [FLINK-4668] Fix positive random int generation

2016-09-23 Thread apivovarov
Github user apivovarov commented on the issue:

https://github.com/apache/flink/pull/2539
  
it's not even convenient to work with files started with `-`, e.g.
```$ vi -44.txt
VIM - Vi IMproved 7.4 (2013 Aug 10, compiled Aug  1 2016 19:37:21)
Unknown option argument: "-44.txt"
More info with: "vim -h"```

```$ rm -rf "-4.txt"
rm: illegal option -- 4
usage: rm [-f | -i] [-dPRrvW] file ...
   unlink file```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---