[GitHub] flink pull request #2471: Broken links on website
GitHub user apivovarov opened a pull request: https://github.com/apache/flink/pull/2471 Broken links on website Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed The website has the following broken links DataSet API for static data embedded in Java, Scala, and Python, http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html Table API with a SQL-like expression language embedded in Java and Scala. http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/table.html Gelly, a graph processing API and library. http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/gelly.html You can merge this pull request into a Git repository by running: $ git pull https://github.com/apivovarov/flink patch-1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2471.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2471 commit b6d56682a55ae46e70cba33326ec58eb753fa73a Author: Alexander Pivovarov <apivova...@gmail.com> Date: 2016-09-03T21:14:58Z Broken links on website The website has the following broken links DataSet API for static data embedded in Java, Scala, and Python, http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html Table API with a SQL-like expression language embedded in Java and Scala. http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/table.html Gelly, a graph processing API and library. http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/gelly.html --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2471: Broken links on website
Github user apivovarov closed the pull request at: https://github.com/apache/flink/pull/2471 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2471: Broken links on website
Github user apivovarov commented on the issue: https://github.com/apache/flink/pull/2471 https://issues.apache.org/jira/browse/FLINK-4585 I'll submit a PR to flink-web --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2478: [FLINK-4595] Close FileOutputStream in ParameterTo...
GitHub user apivovarov opened a pull request: https://github.com/apache/flink/pull/2478 [FLINK-4595] Close FileOutputStream in ParameterTool https://issues.apache.org/jira/browse/FLINK-4595 You can merge this pull request into a Git repository by running: $ git pull https://github.com/apivovarov/flink FLINK-4595 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2478.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2478 commit 46f68d5fc621324368ad31fbba52bdf13abfae48 Author: Alexander Pivovarov <apivova...@gmail.com> Date: 2016-09-07T21:11:06Z [FLINK-4595] Close FileOutputStream in ParameterTool --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2492: [FLINK-4612] Close FileWriter using try with resou...
GitHub user apivovarov opened a pull request: https://github.com/apache/flink/pull/2492 [FLINK-4612] Close FileWriter using try with resources https://issues.apache.org/jira/browse/FLINK-4612 You can merge this pull request into a Git repository by running: $ git pull https://github.com/apivovarov/flink FLINK-4612 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2492.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2492 commit 3b1a73cb47fb4581e169f2ea5cfaa69d9f4a1c64 Author: Alexander Pivovarov <apivova...@gmail.com> Date: 2016-09-12T05:46:55Z [FLINK-4612] Close FileWriter using try with resources --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2489: [FLINK-4608] Use short-circuit AND in Max/Min Aggregation...
Github user apivovarov commented on the issue: https://github.com/apache/flink/pull/2489 @zentol Can you merge it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2488: [FLINK-4607] Close FileInputStream in ParameterTool and o...
Github user apivovarov commented on the issue: https://github.com/apache/flink/pull/2488 @zentol Time to merge it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2490: [FLINK-4609] Remove redundant check for null in Cr...
Github user apivovarov commented on a diff in the pull request: https://github.com/apache/flink/pull/2490#discussion_r78631478 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java --- @@ -129,14 +129,11 @@ private String getDefaultName() { public DefaultCross(DataSet input1, DataSet input2, CrossHint hint, String defaultName) { - super(input1, input2, new DefaultCrossFunction<I1, I2>(), + super(Preconditions.checkNotNull(input1, "input1 is null"), --- End diff -- @greghogan --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2491: [FLINK-4610] Replace keySet/getValue with entrySet...
GitHub user apivovarov opened a pull request: https://github.com/apache/flink/pull/2491 [FLINK-4610] Replace keySet/getValue with entrySet in UdfAnalyzerUtils https://issues.apache.org/jira/browse/FLINK-4610 You can merge this pull request into a Git repository by running: $ git pull https://github.com/apivovarov/flink FLINK-4610 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2491.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2491 commit 7440989c09fae5325dbb3cebf0cf9d10f59dcbdd Author: Alexander Pivovarov <apivova...@gmail.com> Date: 2016-09-10T06:10:12Z [FLINK-4610] Replace keySet/getValue with entrySet in UdfAnalyzerUtils --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2483: [FLINK-4601] Check for empty string properly
GitHub user apivovarov opened a pull request: https://github.com/apache/flink/pull/2483 [FLINK-4601] Check for empty string properly https://issues.apache.org/jira/browse/FLINK-4601 You can merge this pull request into a Git repository by running: $ git pull https://github.com/apivovarov/flink FLINK-4601 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2483.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2483 commit 325d40e4145c8cfc33782c005b803d5de9d571a6 Author: Alexander Pivovarov <apivova...@gmail.com> Date: 2016-09-09T06:58:45Z [FLINK-4601] Check for empty string properly --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2491: [FLINK-4610] Replace keySet/getValue with entrySet in Udf...
Github user apivovarov commented on the issue: https://github.com/apache/flink/pull/2491 @twalthr Can you look at this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2490: [FLINK-4609] Remove redundant check for null in Cr...
Github user apivovarov commented on a diff in the pull request: https://github.com/apache/flink/pull/2490#discussion_r78283317 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java --- @@ -133,10 +133,6 @@ public DefaultCross(DataSet input1, DataSet input2, CrossHint hint, Stri new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()), hint, defaultName); - if (input1 == null || input2 == null) { - throw new NullPointerException(); - } - --- End diff -- if input1 or/and input2 are null CrossOperator will throw NPE on line 133 I also added check for null to TwoInputOperator --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2490: [FLINK-4609] Remove redundant check for null in Cr...
Github user apivovarov commented on a diff in the pull request: https://github.com/apache/flink/pull/2490#discussion_r78285322 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java --- @@ -133,10 +133,6 @@ public DefaultCross(DataSet input1, DataSet input2, CrossHint hint, Stri new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()), hint, defaultName); - if (input1 == null || input2 == null) { - throw new NullPointerException(); - } - --- End diff -- Ok, I added null check for input1 and input2 with a message inline with calling super in DefaultCross --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2490: [FLINK-4609] Remove redundant check for null in CrossOper...
Github user apivovarov commented on the issue: https://github.com/apache/flink/pull/2490 Ok, I added null check for input1 and input2 with a message before calling super in DefaultCross --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2490: [FLINK-4609] Remove redundant check for null in Cr...
Github user apivovarov commented on a diff in the pull request: https://github.com/apache/flink/pull/2490#discussion_r78287335 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java --- @@ -129,14 +129,11 @@ private String getDefaultName() { public DefaultCross(DataSet input1, DataSet input2, CrossHint hint, String defaultName) { - super(input1, input2, new DefaultCrossFunction<I1, I2>(), + super(Preconditions.checkNotNull(input1, "input1 is null"), --- End diff -- DefaultCross calls `input1.getType()` and `input2.getType()` before calling super() on line 134. So, if we add null check to super class (e.g. TwoInputOperator) it will not work for DefaultCross --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2490: [FLINK-4609] Remove redundant check for null in Cr...
Github user apivovarov commented on a diff in the pull request: https://github.com/apache/flink/pull/2490#discussion_r78288200 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java --- @@ -129,14 +129,11 @@ private String getDefaultName() { public DefaultCross(DataSet input1, DataSet input2, CrossHint hint, String defaultName) { - super(input1, input2, new DefaultCrossFunction<I1, I2>(), + super(Preconditions.checkNotNull(input1, "input1 is null"), --- End diff -- I also added null check to TwoInputOperator And removed `input1` and `input2` fields from DefaultCross because TwoInputOperator already has them --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2492: [FLINK-4612] Close FileWriter using try with resou...
Github user apivovarov commented on a diff in the pull request: https://github.com/apache/flink/pull/2492#discussion_r78447379 --- Diff: flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java --- @@ -55,11 +55,8 @@ public static String createTempFileInDirectory(String dir, String contents) thro f.createNewFile(); f.deleteOnExit(); - BufferedWriter out = new BufferedWriter(new FileWriter(f)); - try { + try(BufferedWriter out = new BufferedWriter(new FileWriter(f))) { --- End diff -- Thank you. Just fixed that --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2488: [FLINK-4607] Close FileInputStream in ParameterToo...
GitHub user apivovarov opened a pull request: https://github.com/apache/flink/pull/2488 [FLINK-4607] Close FileInputStream in ParameterTool and other https://issues.apache.org/jira/browse/FLINK-4607 You can merge this pull request into a Git repository by running: $ git pull https://github.com/apivovarov/flink FLINK-4607 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2488.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2488 commit dc0e636f630f8ab96d7cc71ede0a5a3ea5ce24a4 Author: Alexander Pivovarov <apivova...@gmail.com> Date: 2016-09-10T03:32:28Z [FLINK-4607] Close FileInputStream in ParameterTool and other --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...
Github user apivovarov commented on a diff in the pull request: https://github.com/apache/flink/pull/2487#discussion_r78270750 --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException; +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException; +import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.HashMap; +import java.util.Map; + +/** + * Siddhi CEP Execution Environment + */ +@PublicEvolving +public class SiddhiCEP { + private final StreamExecutionEnvironment executionEnvironment; + private final Map<String, DataStream> dataStreams; + private final Map<String, SiddhiStreamSchema> dataStreamSchemas; + private final Map<String,Class> extensions = new HashMap<>(); + + public Map<String, DataStream> getDataStreams(){ + return this.dataStreams; + } + + public Map<String, SiddhiStreamSchema> getDataStreamSchemas(){ + return this.dataStreamSchemas; + } + + public boolean isStreamDefined(String streamId){ + return dataStreams.containsKey(streamId); + } + + public Map<String,Class> getExtensions(){ + return this.extensions; + } + + public void checkStreamDefined(String streamId) throws UndefinedStreamException { + if(!isStreamDefined(streamId)){ + throw new UndefinedStreamException("Stream (streamId: "+streamId+") not defined"); + } + } + + public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) { + this.executionEnvironment = streamExecutionEnvironment; + this.dataStreams = new HashMap<>(); + this.dataStreamSchemas = new HashMap<>(); --- End diff -- lines 64 and 65 can be removed. Add `= new HashMap<>();` to lines 36 and 37 similar as it was done on line 38 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...
Github user apivovarov commented on a diff in the pull request: https://github.com/apache/flink/pull/2487#discussion_r78270764 --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException; +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException; +import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.HashMap; +import java.util.Map; + +/** + * Siddhi CEP Execution Environment + */ +@PublicEvolving +public class SiddhiCEP { + private final StreamExecutionEnvironment executionEnvironment; + private final Map<String, DataStream> dataStreams; + private final Map<String, SiddhiStreamSchema> dataStreamSchemas; + private final Map<String,Class> extensions = new HashMap<>(); + + public Map<String, DataStream> getDataStreams(){ + return this.dataStreams; + } + + public Map<String, SiddhiStreamSchema> getDataStreamSchemas(){ + return this.dataStreamSchemas; + } + + public boolean isStreamDefined(String streamId){ + return dataStreams.containsKey(streamId); + } + + public Map<String,Class> getExtensions(){ + return this.extensions; + } + + public void checkStreamDefined(String streamId) throws UndefinedStreamException { + if(!isStreamDefined(streamId)){ + throw new UndefinedStreamException("Stream (streamId: "+streamId+") not defined"); + } + } + + public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) { + this.executionEnvironment = streamExecutionEnvironment; + this.dataStreams = new HashMap<>(); + this.dataStreamSchemas = new HashMap<>(); + } + + public static SiddhiStream.SingleSiddhiStream define(String streamId, DataStream inStream, String... fieldNames) { + SiddhiCEP environment = SiddhiCEP.getSiddhiEnvironment(inStream.getExecutionEnvironment()); + return environment.from(streamId,inStream,fieldNames); + } + + public SiddhiStream.SingleSiddhiStream from(String streamId, DataStream inStream, String... fieldNames){ + this.registerStream(streamId,inStream,fieldNames); + return new SiddhiStream.SingleSiddhiStream<>(streamId, this); + } + + public SiddhiStream.SingleSiddhiStream from(String streamId){ + return new SiddhiStream.SingleSiddhiStream<>(streamId, this); + } + + public SiddhiStream.UnionSiddhiStream union(String firstStreamId,String ... unionStreamIds){ + return new SiddhiStream.SingleSiddhiStream(firstStreamId,this).union(unionStreamIds); + } + + public void registerStream(final String streamId, DataStream dataStream, String... fieldNames) { + if (isStreamDefined(streamId)) { + throw new DuplicatedStreamException("Input stream: " + streamId + " already exists"); + } + dataStreams.put(streamId, dataStream); + SiddhiStreamSchema schema = new SiddhiStreamSchema<>(dataStream.getType(), fieldNames); + schema.setTypeSerializer(schema.getTypeInfo().createSerializer(dataStream.getExecutionConfig())); + dataStreamSchemas.put(streamId, schema); + } + + public StreamExecutionEnvironment getExecutionEnvironment() { +
[GitHub] flink pull request #2487: [FLINK-4520][flink-siddhi] Integrate Siddhi as a l...
Github user apivovarov commented on a diff in the pull request: https://github.com/apache/flink/pull/2487#discussion_r78270757 --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException; +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException; +import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.HashMap; +import java.util.Map; + +/** + * Siddhi CEP Execution Environment + */ +@PublicEvolving +public class SiddhiCEP { + private final StreamExecutionEnvironment executionEnvironment; + private final Map<String, DataStream> dataStreams; + private final Map<String, SiddhiStreamSchema> dataStreamSchemas; + private final Map<String,Class> extensions = new HashMap<>(); + + public Map<String, DataStream> getDataStreams(){ + return this.dataStreams; + } + + public Map<String, SiddhiStreamSchema> getDataStreamSchemas(){ + return this.dataStreamSchemas; + } + + public boolean isStreamDefined(String streamId){ + return dataStreams.containsKey(streamId); + } + + public Map<String,Class> getExtensions(){ + return this.extensions; + } + + public void checkStreamDefined(String streamId) throws UndefinedStreamException { + if(!isStreamDefined(streamId)){ + throw new UndefinedStreamException("Stream (streamId: "+streamId+") not defined"); + } + } + + public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) { + this.executionEnvironment = streamExecutionEnvironment; + this.dataStreams = new HashMap<>(); + this.dataStreamSchemas = new HashMap<>(); + } + + public static SiddhiStream.SingleSiddhiStream define(String streamId, DataStream inStream, String... fieldNames) { + SiddhiCEP environment = SiddhiCEP.getSiddhiEnvironment(inStream.getExecutionEnvironment()); + return environment.from(streamId,inStream,fieldNames); + } + + public SiddhiStream.SingleSiddhiStream from(String streamId, DataStream inStream, String... fieldNames){ + this.registerStream(streamId,inStream,fieldNames); + return new SiddhiStream.SingleSiddhiStream<>(streamId, this); + } + + public SiddhiStream.SingleSiddhiStream from(String streamId){ + return new SiddhiStream.SingleSiddhiStream<>(streamId, this); + } + + public SiddhiStream.UnionSiddhiStream union(String firstStreamId,String ... unionStreamIds){ + return new SiddhiStream.SingleSiddhiStream(firstStreamId,this).union(unionStreamIds); + } + + public void registerStream(final String streamId, DataStream dataStream, String... fieldNames) { + if (isStreamDefined(streamId)) { + throw new DuplicatedStreamException("Input stream: " + streamId + " already exists"); + } + dataStreams.put(streamId, dataStream); + SiddhiStreamSchema schema = new SiddhiStreamSchema<>(dataStream.getType(), fieldNames); + schema.setTypeSerializer(schema.getTypeInfo().createSerializer(dataStream.getExecutionConfig())); + dataStreamSchemas.put(streamId, schema); + } + + public StreamExecutionEnvironment getExecutionEnvironment() { +
[GitHub] flink pull request #2485: [Flink 4599] - Add 'explain()' also to StreamTable...
Github user apivovarov commented on a diff in the pull request: https://github.com/apache/flink/pull/2485#discussion_r78270796 --- Diff: docs/dev/table_api.md --- @@ -2457,3 +2457,27 @@ The Table API provides a configuration (the so-called `TableConfig`) to modify r By default, the Table API supports `null` values. Null handling can be disabled to improve preformance by setting the `nullCheck` property in the `TableConfig` to `false`. {% top %} + +Explaining a Table + +The Table API provides a mechanism to describe the graph of operations that leads to the resulting output. This is done through the `TableEnvironment#explain(table)` method. It returns a string describing two graphs: the Abstract Syntax Tree of the relational algebra query and the Flink's Execution Plan of the equivalent Flink's Job. + +Table `explain` is supported for both `BatchTableEnvironment` and `StreamTableEnvironment`. Currently `StreamTableEnvironment` doesn't support the explanation of the Execution Plan. + + + +{% highlight scala %} + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word) + val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word) + val table = table1.unionAll(table2) + + val explanation:String = tEnv.explain(table) --- End diff -- put space before String --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2485: [Flink 4599] - Add 'explain()' also to StreamTable...
Github user apivovarov commented on a diff in the pull request: https://github.com/apache/flink/pull/2485#discussion_r78270826 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala --- @@ -271,5 +271,21 @@ abstract class StreamTableEnvironment( } } + /* + * Returns the AST of the specified Table API and SQL queries and the execution plan to compute +* the result of the given [[Table]]. + * + * @param table The table for which the AST and execution plan will be returned. +* @param extended Flag to include detailed optimizer estimates. + */ + def explain(table: Table): String = { + +val ast = RelOptUtil.toString(table.getRelNode) + +s"== Abstract Syntax Tree ==" + + System.lineSeparator + + s"$ast" --- End diff -- Maybe this? ``` s"== Abstract Syntax Tree ==${System.lineSeparator}$ast" ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2485: [Flink 4599] - Add 'explain()' also to StreamTable...
Github user apivovarov commented on a diff in the pull request: https://github.com/apache/flink/pull/2485#discussion_r78270846 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala --- @@ -271,5 +271,21 @@ abstract class StreamTableEnvironment( } } + /* + * Returns the AST of the specified Table API and SQL queries and the execution plan to compute +* the result of the given [[Table]]. + * + * @param table The table for which the AST and execution plan will be returned. +* @param extended Flag to include detailed optimizer estimates. + */ + def explain(table: Table): String = { + +val ast = RelOptUtil.toString(table.getRelNode) + +s"== Abstract Syntax Tree ==" + + System.lineSeparator + + s"$ast" --- End diff -- No need for s on line 285 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2485: [Flink 4599] - Add 'explain()' also to StreamTable...
Github user apivovarov commented on a diff in the pull request: https://github.com/apache/flink/pull/2485#discussion_r78270898 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.stream + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.TableEnvironment +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.junit.Assert.assertEquals +import org.junit._ + +class ExplainStreamTest + extends StreamingMultipleProgramsTestBase { + + val testFilePath = ExplainStreamTest.this.getClass.getResource("/").getFile + + @Test + def testFilter() : Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) + +val table = env.fromElements((1, "hello")) + .toTable(tEnv, 'a, 'b) + .filter("a % 2 = 0") + +val result = tEnv.explain(table).replaceAll("\\r\\n", "\n") +val source = scala.io.Source.fromFile(testFilePath + + "../../src/test/scala/resources/testFilterStream0.out").mkString.replaceAll("\\r\\n", "\n") +assertEquals(result, source) + } + + @Test + def testUnion() : Unit = { --- End diff -- no need for space after () on line 35 and 50 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2489: [FLINK-4608] Use short-circuit AND in Max/Min Aggr...
GitHub user apivovarov opened a pull request: https://github.com/apache/flink/pull/2489 [FLINK-4608] Use short-circuit AND in Max/Min AggregationFunction https://issues.apache.org/jira/browse/FLINK-4608 You can merge this pull request into a Git repository by running: $ git pull https://github.com/apivovarov/flink FLINK-4608 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2489.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2489 commit ceb3aa05bae3d05b9f6c5a7a55e6e43fc91a9450 Author: Alexander Pivovarov <apivova...@gmail.com> Date: 2016-09-10T04:52:36Z [FLINK-4608] Use short-circuit AND in Max/Min AggregationFunction --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2490: [FLINK-4609] Remove redundant check for null in Cr...
GitHub user apivovarov opened a pull request: https://github.com/apache/flink/pull/2490 [FLINK-4609] Remove redundant check for null in CrossOperator https://issues.apache.org/jira/browse/FLINK-4609 You can merge this pull request into a Git repository by running: $ git pull https://github.com/apivovarov/flink FLINK-4609 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2490.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2490 commit 272007fc8f350b1b998f28762aade6760b588c73 Author: Alexander Pivovarov <apivova...@gmail.com> Date: 2016-09-10T05:24:43Z [FLINK-4609] Remove redundant check for null in CrossOperator --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2490: [FLINK-4609] Remove redundant check for null in Cr...
Github user apivovarov commented on a diff in the pull request: https://github.com/apache/flink/pull/2490#discussion_r79441284 --- Diff: flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java --- @@ -971,10 +971,7 @@ public void testForwardWithAtLeastOneIterationAssumptionForJavac() { public void reduce(Iterable<Tuple2<Long, Long>> values, Collector out) throws Exception { Long id = 0L; for (Tuple2<Long, Long> value : values) { - id = value.f0; --- End diff -- Just reverted this change. Thank you! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2538: [FLINK-4666] Make constants to be final in Paramet...
GitHub user apivovarov opened a pull request: https://github.com/apache/flink/pull/2538 [FLINK-4666] Make constants to be final in ParameterTool https://issues.apache.org/jira/browse/FLINK-4666 You can merge this pull request into a Git repository by running: $ git pull https://github.com/apivovarov/flink FLINK-4666 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2538.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2538 commit 11fdcddb932c5474b506f9338592f142fdf1cee3 Author: Alexander Pivovarov <apivova...@gmail.com> Date: 2016-09-23T05:54:29Z [FLINK-4666] Make constants to be final in ParameterTool --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2537: [FLINK-4665] Remove boxing/unboxing to parse a pri...
GitHub user apivovarov opened a pull request: https://github.com/apache/flink/pull/2537 [FLINK-4665] Remove boxing/unboxing to parse a primitive https://issues.apache.org/jira/browse/FLINK-4665 You can merge this pull request into a Git repository by running: $ git pull https://github.com/apivovarov/flink FLINK-4665 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2537.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2537 commit eeefde68405d27a828d15443b5815a4e53a6b34c Author: Alexander Pivovarov <apivova...@gmail.com> Date: 2016-09-23T03:27:08Z [FLINK-4665] Remove boxing/unboxing to parse a primitive --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2539: [FLINK-4668] Fix positive random int generation
GitHub user apivovarov opened a pull request: https://github.com/apache/flink/pull/2539 [FLINK-4668] Fix positive random int generation https://issues.apache.org/jira/browse/FLINK-4668 You can merge this pull request into a Git repository by running: $ git pull https://github.com/apivovarov/flink FLINK-4668 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2539.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2539 commit 4ab1e4fa97fa41054817955230559766acbdf698 Author: Alexander Pivovarov <apivova...@gmail.com> Date: 2016-09-23T06:26:27Z [FLINK-4668] Fix positive random int generation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2539: [FLINK-4668] Fix positive random int generation
Github user apivovarov commented on the issue: https://github.com/apache/flink/pull/2539 it's not even convenient to work with files started with `-`, e.g. ```$ vi -44.txt VIM - Vi IMproved 7.4 (2013 Aug 10, compiled Aug 1 2016 19:37:21) Unknown option argument: "-44.txt" More info with: "vim -h"``` ```$ rm -rf "-4.txt" rm: illegal option -- 4 usage: rm [-f | -i] [-dPRrvW] file ... unlink file``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---