[GitHub] flink issue #6136: [FLINK-4303] [CEP] Add CEP examples

2018-06-07 Thread kisimple
Github user kisimple commented on the issue:

https://github.com/apache/flink/pull/6136
  
cc @twalthr 


---


[jira] [Commented] (FLINK-4303) Add CEP examples

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505698#comment-16505698
 ] 

ASF GitHub Bot commented on FLINK-4303:
---

Github user kisimple commented on the issue:

https://github.com/apache/flink/pull/6136
  
cc @twalthr 


> Add CEP examples
> 
>
> Key: FLINK-4303
> URL: https://issues.apache.org/jira/browse/FLINK-4303
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Timo Walther
>Assignee: blues zheng
>Priority: Major
>
> Neither CEP Java nor CEP Scala contain a runnable example. The example on the 
> website is also not runnable without adding some additional code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4303) Add CEP examples

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505696#comment-16505696
 ] 

ASF GitHub Bot commented on FLINK-4303:
---

Github user kisimple commented on the issue:

https://github.com/apache/flink/pull/6136
  
@medcv Thanks for your review :) Updated as your suggestions.


> Add CEP examples
> 
>
> Key: FLINK-4303
> URL: https://issues.apache.org/jira/browse/FLINK-4303
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Timo Walther
>Assignee: blues zheng
>Priority: Major
>
> Neither CEP Java nor CEP Scala contain a runnable example. The example on the 
> website is also not runnable without adding some additional code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6136: [FLINK-4303] [CEP] Add CEP examples

2018-06-07 Thread kisimple
Github user kisimple commented on the issue:

https://github.com/apache/flink/pull/6136
  
@medcv Thanks for your review :) Updated as your suggestions.


---


[GitHub] flink pull request #6141: flink-metrics-datadog: beautify metric name by exc...

2018-06-07 Thread DmitryBe
GitHub user DmitryBe opened a pull request:

https://github.com/apache/flink/pull/6141

flink-metrics-datadog: beautify metric name by excluding host_name, tm_id, 
and task|job_manager values from metric name and providing them as tags

this allows to aggregate values by using tags in datadog

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/DmitryBe/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6141.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6141


commit 72adcfc0be04dc98863916676cc964e3bce17372
Author: Dmitry B 
Date:   2018-06-08T03:58:32Z

exclude host_name, job|task_manager name and tm_id from metric name; this 
values are provided as a tags




---


[jira] [Assigned] (FLINK-9552) NPE in SpanningRecordSerializer during checkpoint

2018-06-07 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9552:
---

Assignee: vinoyang

> NPE in SpanningRecordSerializer during checkpoint
> -
>
> Key: FLINK-9552
> URL: https://issues.apache.org/jira/browse/FLINK-9552
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.5.0
>Reporter: Truong Duc Kien
>Assignee: vinoyang
>Priority: Major
>
> We're encountering NPE intermittently inside SpanningRecordSerializer during 
> checkpoint.
>  
> {code:java}
> 2018-06-08 08:31:35,741 [ka.actor.default-dispatcher-83] INFO  
> o.a.f.r.e.ExecutionGraph IterationSource-22 (44/120) 
> (c1b94ef849db0e5fb9fb7b85c17073ce) switched from RUNNING to FAILED.
> java.lang.RuntimeException
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamIterationHead.run(StreamIterationHead.java:91)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:98)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
>   at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>   ... 5 more
> 2018-06-08 08:31:35,746 [ka.actor.default-dispatcher-83] INFO  
> o.a.f.r.e.ExecutionGraph Job xxx (8a4eaf02c46dc21c7d6f3f70657dbb17) switched 
> from state RUNNING to FAILING.
> java.lang.RuntimeException
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamIterationHead.run(StreamIterationHead.java:91)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:98)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
>   at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>   ... 5 more
> {code}
> This issue is probably concurrency related, because the revelant Flink code 
> seems to have proper null checking 
> https://github.com/apache/flink/blob/fa024726bb801fc71cec5cc303cac1d4a03f555e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java#L98
> {code:java}
> // Copy from intermediate buffers to current target memory segment
> if (targetBuffer != null) {
> targetBuffer.append(lengthBuffer);
> targetBuffer.append(dataBuffer);
> targetBuffer.commit();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4303) Add CEP examples

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505638#comment-16505638
 ] 

ASF GitHub Bot commented on FLINK-4303:
---

Github user kisimple commented on a diff in the pull request:

https://github.com/apache/flink/pull/6136#discussion_r193941724
  
--- Diff: 
flink-examples/flink-examples-cep/src/main/java/org/apache/flink/cep/examples/java/monitoring/TemperatureMonitoring.java
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.examples.java.monitoring;
+
+import org.apache.flink.cep.CEP;
+import org.apache.flink.cep.PatternFlatSelectFunction;
+import org.apache.flink.cep.PatternSelectFunction;
+import org.apache.flink.cep.PatternStream;
+import 
org.apache.flink.cep.examples.java.monitoring.events.MonitoringEvent;
+import 
org.apache.flink.cep.examples.java.monitoring.events.TemperatureAlert;
+import 
org.apache.flink.cep.examples.java.monitoring.events.TemperatureEvent;
+import 
org.apache.flink.cep.examples.java.monitoring.events.TemperatureWarning;
+import 
org.apache.flink.cep.examples.java.monitoring.sources.MonitoringEventSource;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * CEP example monitoring program.
+ * This example program generates a stream of monitoring events which are 
analyzed using
+ * Flink's CEP library. The input event stream consists of temperature and 
power events
+ * from a set of racks. The goal is to detect when a rack is about to 
overheat.
+ * In order to do that, we create a CEP pattern which generates a 
TemperatureWarning
+ * whenever it sees two consecutive temperature events in a given time 
interval whose temperatures
+ * are higher than a given threshold value. A warning itself is not 
critical but if we see
+ * two warning for the same rack whose temperatures are rising, we want to 
generate an alert.
+ * This is achieved by defining another CEP pattern which analyzes the 
stream of generated
+ * temperature warnings.
+ */
+public class TemperatureMonitoring {
+
+   private static final double TEMPERATURE_THRESHOLD = 100;
+
+   public static void main(String[] args) throws Exception {
+   System.out.println("Executing temperature monitoring Java 
example.");
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   // Use ingestion time => TimeCharacteristic == EventTime + 
IngestionTimeExtractor
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   // Input stream of monitoring events
+   DataStream inputEventStream = 
env.addSource(new MonitoringEventSource())
+   .assignTimestampsAndWatermarks(new 
IngestionTimeExtractor<>());
+
+   // Warning pattern: Two consecutive temperature events whose 
temperature is higher
+   // than the given threshold appearing within a time interval of 
10 seconds
+   Pattern warningPattern = Pattern
+   .begin("first")
+   .subtype(TemperatureEvent.class)
+   .where(new SimpleCondition() {
+   @Override
+   public boolean filter(TemperatureEvent 
event) throws Exception {
+   return 

[GitHub] flink pull request #6136: FLINK-4303] [CEP] Add CEP examples

2018-06-07 Thread kisimple
Github user kisimple commented on a diff in the pull request:

https://github.com/apache/flink/pull/6136#discussion_r193941724
  
--- Diff: 
flink-examples/flink-examples-cep/src/main/java/org/apache/flink/cep/examples/java/monitoring/TemperatureMonitoring.java
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.examples.java.monitoring;
+
+import org.apache.flink.cep.CEP;
+import org.apache.flink.cep.PatternFlatSelectFunction;
+import org.apache.flink.cep.PatternSelectFunction;
+import org.apache.flink.cep.PatternStream;
+import 
org.apache.flink.cep.examples.java.monitoring.events.MonitoringEvent;
+import 
org.apache.flink.cep.examples.java.monitoring.events.TemperatureAlert;
+import 
org.apache.flink.cep.examples.java.monitoring.events.TemperatureEvent;
+import 
org.apache.flink.cep.examples.java.monitoring.events.TemperatureWarning;
+import 
org.apache.flink.cep.examples.java.monitoring.sources.MonitoringEventSource;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * CEP example monitoring program.
+ * This example program generates a stream of monitoring events which are 
analyzed using
+ * Flink's CEP library. The input event stream consists of temperature and 
power events
+ * from a set of racks. The goal is to detect when a rack is about to 
overheat.
+ * In order to do that, we create a CEP pattern which generates a 
TemperatureWarning
+ * whenever it sees two consecutive temperature events in a given time 
interval whose temperatures
+ * are higher than a given threshold value. A warning itself is not 
critical but if we see
+ * two warning for the same rack whose temperatures are rising, we want to 
generate an alert.
+ * This is achieved by defining another CEP pattern which analyzes the 
stream of generated
+ * temperature warnings.
+ */
+public class TemperatureMonitoring {
+
+   private static final double TEMPERATURE_THRESHOLD = 100;
+
+   public static void main(String[] args) throws Exception {
+   System.out.println("Executing temperature monitoring Java 
example.");
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   // Use ingestion time => TimeCharacteristic == EventTime + 
IngestionTimeExtractor
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   // Input stream of monitoring events
+   DataStream inputEventStream = 
env.addSource(new MonitoringEventSource())
+   .assignTimestampsAndWatermarks(new 
IngestionTimeExtractor<>());
+
+   // Warning pattern: Two consecutive temperature events whose 
temperature is higher
+   // than the given threshold appearing within a time interval of 
10 seconds
+   Pattern warningPattern = Pattern
+   .begin("first")
+   .subtype(TemperatureEvent.class)
+   .where(new SimpleCondition() {
+   @Override
+   public boolean filter(TemperatureEvent 
event) throws Exception {
+   return event.getTemperature() > 
TEMPERATURE_THRESHOLD;
+   }
+   })
+   .next("second")
+   .subtype(TemperatureEvent.class)
+

[jira] [Commented] (FLINK-9554) flink scala shell doesn't work in yarn mode

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505637#comment-16505637
 ] 

ASF GitHub Bot commented on FLINK-9554:
---

Github user zjffdu commented on the issue:

https://github.com/apache/flink/pull/6140
  
@tillrohrmann  Could you help review it ? Thanks


> flink scala shell doesn't work in yarn mode
> ---
>
> Key: FLINK-9554
> URL: https://issues.apache.org/jira/browse/FLINK-9554
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.5.0
>Reporter: Jeff Zhang
>Priority: Major
>
> It still try to use StandaloneCluster even I specify it using yarn mode.
>  
> Command I Use: bin/start-scala-shell.sh yarn -n 1
>  
> {code:java}
> Starting Flink Shell:
> 2018-06-06 12:30:02,672 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.rpc.address, localhost
> 2018-06-06 12:30:02,673 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.rpc.port, 6123
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.heap.mb, 1024
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: taskmanager.heap.mb, 1024
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: taskmanager.numberOfTaskSlots, 1
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: parallelism.default, 1
> 2018-06-06 12:30:02,675 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: rest.port, 8081
> Exception in thread "main" java.lang.UnsupportedOperationException: Can't 
> deploy a standalone cluster.
> at 
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:57)
> at 
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:31)
> at 
> org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:272)
> at 
> org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:164)
> at org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:194)
> at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:193)
> at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:135)
> at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6140: [FLINK-9554] flink scala shell doesn't work in yarn mode

2018-06-07 Thread zjffdu
Github user zjffdu commented on the issue:

https://github.com/apache/flink/pull/6140
  
@tillrohrmann  Could you help review it ? Thanks


---


[jira] [Commented] (FLINK-9554) flink scala shell doesn't work in yarn mode

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505627#comment-16505627
 ] 

ASF GitHub Bot commented on FLINK-9554:
---

GitHub user zjffdu opened a pull request:

https://github.com/apache/flink/pull/6140

[FLINK-9554] flink scala shell doesn't work in yarn mode

## What is the purpose of the change

This PR is trying to fix the issue of scala-shell unable to run in yarn 
mode. 

## Brief change log

The root cause is the options of CustomCommandLine is missed which cause 
the "-m yarn-cluster" can not be parsed correctly.

## Verifying this change

I verify it manually on a single node hadoop cluster. I think it is better 
to add integration for that so that we can avoid the regression issue in 
future. If necessary, I can create a ticket for it and do it in another PR. 

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): ( no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no )
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? ( not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zjffdu/flink FLINK-9554

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6140.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6140


commit f3d37d4799e51323fbbcd52cf972ac1d874d2514
Author: Jeff Zhang 
Date:   2018-06-07T09:47:32Z

save

commit d65058133ce220e2d4212b906d521b38b9ef53dd
Author: Jeff Zhang 
Date:   2018-06-08T02:39:27Z

[FLINK-9554] flink scala shell doesn't work in yarn mode




> flink scala shell doesn't work in yarn mode
> ---
>
> Key: FLINK-9554
> URL: https://issues.apache.org/jira/browse/FLINK-9554
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.5.0
>Reporter: Jeff Zhang
>Priority: Major
>
> It still try to use StandaloneCluster even I specify it using yarn mode.
>  
> Command I Use: bin/start-scala-shell.sh yarn -n 1
>  
> {code:java}
> Starting Flink Shell:
> 2018-06-06 12:30:02,672 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.rpc.address, localhost
> 2018-06-06 12:30:02,673 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.rpc.port, 6123
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.heap.mb, 1024
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: taskmanager.heap.mb, 1024
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: taskmanager.numberOfTaskSlots, 1
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: parallelism.default, 1
> 2018-06-06 12:30:02,675 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: rest.port, 8081
> Exception in thread "main" java.lang.UnsupportedOperationException: Can't 
> deploy a standalone cluster.
> at 
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:57)
> at 
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:31)
> at 
> org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:272)
> at 
> org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:164)
> at org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:194)
> at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:193)
> at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:135)
> at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6140: [FLINK-9554] flink scala shell doesn't work in yar...

2018-06-07 Thread zjffdu
GitHub user zjffdu opened a pull request:

https://github.com/apache/flink/pull/6140

[FLINK-9554] flink scala shell doesn't work in yarn mode

## What is the purpose of the change

This PR is trying to fix the issue of scala-shell unable to run in yarn 
mode. 

## Brief change log

The root cause is the options of CustomCommandLine is missed which cause 
the "-m yarn-cluster" can not be parsed correctly.

## Verifying this change

I verify it manually on a single node hadoop cluster. I think it is better 
to add integration for that so that we can avoid the regression issue in 
future. If necessary, I can create a ticket for it and do it in another PR. 

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): ( no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no )
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? ( not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zjffdu/flink FLINK-9554

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6140.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6140


commit f3d37d4799e51323fbbcd52cf972ac1d874d2514
Author: Jeff Zhang 
Date:   2018-06-07T09:47:32Z

save

commit d65058133ce220e2d4212b906d521b38b9ef53dd
Author: Jeff Zhang 
Date:   2018-06-08T02:39:27Z

[FLINK-9554] flink scala shell doesn't work in yarn mode




---


[jira] [Closed] (FLINK-9329) hasRowtimeAttribute will throw NPE if user use setProctimeAttribute for table source

2018-06-07 Thread yuemeng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yuemeng closed FLINK-9329.
--
Resolution: Invalid

> hasRowtimeAttribute will throw NPE if user use setProctimeAttribute for table 
> source
> 
>
> Key: FLINK-9329
> URL: https://issues.apache.org/jira/browse/FLINK-9329
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Critical
>
> {code:java}
> Examples:
> KafkaTableSource source = Kafka010JsonTableSource.builder()
> .withSchema(TableSchema.builder()
> .field("sensorId", Types.LONG()) 
> .field("temp", Types.DOUBLE())
> .field("ptime", Types.SQL_TIMESTAMP()).build())
> .withProctimeAttribute("ptime")
> .build(); tableEnv.registerTableSource("flights", source ); {code}
> {{ }}
> {code:java}
> Kafka010JsonTableSource implement the DefinedRowtimeAttributes .
> so when TableSourceUtil call hasRowtimeAttribute(source)to check ,it will 
> call follow code
> /** Returns a list with all rowtime attribute names of the [[TableSource]]. */
> private def getRowtimeAttributes(tableSource: TableSource[_]): Array[String] 
> = {
>   tableSource match {
> case r: DefinedRowtimeAttributes =>
>   r.getRowtimeAttributeDescriptors.asScala.map(_.getAttributeName).toArray
> case _ =>
>   Array()
>   }
> }
>  r.getRowtimeAttributeDescriptors will throw NPE because of we use 
> ProctimeAttribute here
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9554) flink scala shell doesn't work in yarn mode

2018-06-07 Thread Jeff Zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505619#comment-16505619
 ] 

Jeff Zhang commented on FLINK-9554:
---

FLINK-8795 is about scala shell hangs when running flink job in local mode 
which is different from this ticket. So I will create this ticket to resolve 
the yarn issue separately 

> flink scala shell doesn't work in yarn mode
> ---
>
> Key: FLINK-9554
> URL: https://issues.apache.org/jira/browse/FLINK-9554
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.5.0
>Reporter: Jeff Zhang
>Priority: Major
>
> It still try to use StandaloneCluster even I specify it using yarn mode.
>  
> Command I Use: bin/start-scala-shell.sh yarn -n 1
>  
> {code:java}
> Starting Flink Shell:
> 2018-06-06 12:30:02,672 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.rpc.address, localhost
> 2018-06-06 12:30:02,673 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.rpc.port, 6123
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: jobmanager.heap.mb, 1024
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: taskmanager.heap.mb, 1024
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: taskmanager.numberOfTaskSlots, 1
> 2018-06-06 12:30:02,674 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: parallelism.default, 1
> 2018-06-06 12:30:02,675 INFO  
> org.apache.flink.configuration.GlobalConfiguration            - Loading 
> configuration property: rest.port, 8081
> Exception in thread "main" java.lang.UnsupportedOperationException: Can't 
> deploy a standalone cluster.
> at 
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:57)
> at 
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:31)
> at 
> org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:272)
> at 
> org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:164)
> at org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:194)
> at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:193)
> at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:135)
> at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9554) flink scala shell doesn't work in yarn mode

2018-06-07 Thread Jeff Zhang (JIRA)
Jeff Zhang created FLINK-9554:
-

 Summary: flink scala shell doesn't work in yarn mode
 Key: FLINK-9554
 URL: https://issues.apache.org/jira/browse/FLINK-9554
 Project: Flink
  Issue Type: Bug
  Components: Scala Shell
Affects Versions: 1.5.0
Reporter: Jeff Zhang


It still try to use StandaloneCluster even I specify it using yarn mode.
 
Command I Use: bin/start-scala-shell.sh yarn -n 1
 
{code:java}
Starting Flink Shell:
2018-06-06 12:30:02,672 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: jobmanager.rpc.address, localhost
2018-06-06 12:30:02,673 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: jobmanager.rpc.port, 6123
2018-06-06 12:30:02,674 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: jobmanager.heap.mb, 1024
2018-06-06 12:30:02,674 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: taskmanager.heap.mb, 1024
2018-06-06 12:30:02,674 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: taskmanager.numberOfTaskSlots, 1
2018-06-06 12:30:02,674 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: parallelism.default, 1
2018-06-06 12:30:02,675 INFO  
org.apache.flink.configuration.GlobalConfiguration            - Loading 
configuration property: rest.port, 8081
Exception in thread "main" java.lang.UnsupportedOperationException: Can't 
deploy a standalone cluster.
at 
org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:57)
at 
org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:31)
at 
org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:272)
at 
org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:164)
at org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:194)
at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:193)
at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:135)
at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9553) Migrate integration tests for DataSinkITCase

2018-06-07 Thread Deepak Sharma (JIRA)
Deepak Sharma created FLINK-9553:


 Summary: Migrate integration tests for DataSinkITCase
 Key: FLINK-9553
 URL: https://issues.apache.org/jira/browse/FLINK-9553
 Project: Flink
  Issue Type: Sub-task
Reporter: Deepak Sharma
Assignee: Deepak Sharma






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9552) NPE in SpanningRecordSerializer during checkpoint

2018-06-07 Thread Truong Duc Kien (JIRA)
Truong Duc Kien created FLINK-9552:
--

 Summary: NPE in SpanningRecordSerializer during checkpoint
 Key: FLINK-9552
 URL: https://issues.apache.org/jira/browse/FLINK-9552
 Project: Flink
  Issue Type: Bug
  Components: Type Serialization System
Affects Versions: 1.5.0
Reporter: Truong Duc Kien


We're encountering NPE intermittently inside SpanningRecordSerializer during 
checkpoint.

 
{code:java}
2018-06-08 08:31:35,741 [ka.actor.default-dispatcher-83] INFO  
o.a.f.r.e.ExecutionGraph IterationSource-22 (44/120) 
(c1b94ef849db0e5fb9fb7b85c17073ce) switched from RUNNING to FAILED.
java.lang.RuntimeException
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at 
org.apache.flink.streaming.runtime.tasks.StreamIterationHead.run(StreamIterationHead.java:91)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:98)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
... 5 more
2018-06-08 08:31:35,746 [ka.actor.default-dispatcher-83] INFO  
o.a.f.r.e.ExecutionGraph Job xxx (8a4eaf02c46dc21c7d6f3f70657dbb17) switched 
from state RUNNING to FAILING.
java.lang.RuntimeException
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at 
org.apache.flink.streaming.runtime.tasks.StreamIterationHead.run(StreamIterationHead.java:91)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:98)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
... 5 more
{code}
This issue is probably concurrency related, because the revelant Flink code 
seems to have proper null checking 

https://github.com/apache/flink/blob/fa024726bb801fc71cec5cc303cac1d4a03f555e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java#L98
{code:java}
// Copy from intermediate buffers to current target memory segment
if (targetBuffer != null) {
targetBuffer.append(lengthBuffer);
targetBuffer.append(dataBuffer);
targetBuffer.commit();
}
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4303) Add CEP examples

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505602#comment-16505602
 ] 

ASF GitHub Bot commented on FLINK-4303:
---

Github user medcv commented on a diff in the pull request:

https://github.com/apache/flink/pull/6136#discussion_r193934744
  
--- Diff: 
flink-examples/flink-examples-cep/src/main/java/org/apache/flink/cep/examples/java/monitoring/TemperatureMonitoring.java
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.examples.java.monitoring;
+
+import org.apache.flink.cep.CEP;
+import org.apache.flink.cep.PatternFlatSelectFunction;
+import org.apache.flink.cep.PatternSelectFunction;
+import org.apache.flink.cep.PatternStream;
+import 
org.apache.flink.cep.examples.java.monitoring.events.MonitoringEvent;
+import 
org.apache.flink.cep.examples.java.monitoring.events.TemperatureAlert;
+import 
org.apache.flink.cep.examples.java.monitoring.events.TemperatureEvent;
+import 
org.apache.flink.cep.examples.java.monitoring.events.TemperatureWarning;
+import 
org.apache.flink.cep.examples.java.monitoring.sources.MonitoringEventSource;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * CEP example monitoring program.
+ * This example program generates a stream of monitoring events which are 
analyzed using
+ * Flink's CEP library. The input event stream consists of temperature and 
power events
+ * from a set of racks. The goal is to detect when a rack is about to 
overheat.
+ * In order to do that, we create a CEP pattern which generates a 
TemperatureWarning
+ * whenever it sees two consecutive temperature events in a given time 
interval whose temperatures
+ * are higher than a given threshold value. A warning itself is not 
critical but if we see
+ * two warning for the same rack whose temperatures are rising, we want to 
generate an alert.
+ * This is achieved by defining another CEP pattern which analyzes the 
stream of generated
+ * temperature warnings.
+ */
+public class TemperatureMonitoring {
+
+   private static final double TEMPERATURE_THRESHOLD = 100;
+
+   public static void main(String[] args) throws Exception {
+   System.out.println("Executing temperature monitoring Java 
example.");
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   // Use ingestion time => TimeCharacteristic == EventTime + 
IngestionTimeExtractor
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   // Input stream of monitoring events
+   DataStream inputEventStream = 
env.addSource(new MonitoringEventSource())
+   .assignTimestampsAndWatermarks(new 
IngestionTimeExtractor<>());
+
+   // Warning pattern: Two consecutive temperature events whose 
temperature is higher
+   // than the given threshold appearing within a time interval of 
10 seconds
+   Pattern warningPattern = Pattern
+   .begin("first")
+   .subtype(TemperatureEvent.class)
+   .where(new SimpleCondition() {
+   @Override
+   public boolean filter(TemperatureEvent 
event) throws Exception {
+   return 

[GitHub] flink pull request #6136: FLINK-4303] [CEP] Add CEP examples

2018-06-07 Thread medcv
Github user medcv commented on a diff in the pull request:

https://github.com/apache/flink/pull/6136#discussion_r193934789
  
--- Diff: 
flink-examples/flink-examples-cep/src/main/scala/org/apache/flink/cep/examples/scala/monitoring/TemperatureMonitoring.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.examples.scala.monitoring
+
+import 
org.apache.flink.cep.examples.scala.monitoring.events.{MonitoringEvent, 
TemperatureAlert, TemperatureEvent, TemperatureWarning}
+import 
org.apache.flink.cep.examples.scala.monitoring.sources.MonitoringEventSource
+import org.apache.flink.cep.scala.CEP
+import org.apache.flink.cep.scala.pattern.Pattern
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.IngestionTimeExtractor
+import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment, createTypeInformation}
+import org.apache.flink.streaming.api.windowing.time.Time
+
+/**
+  * CEP example monitoring program.
+  * This example program generates a stream of monitoring events which are 
analyzed using
+  * Flink's CEP library. The input event stream consists of temperature 
and power events
+  * from a set of racks. The goal is to detect when a rack is about to 
overheat.
+  * In order to do that, we create a CEP pattern which generates a 
TemperatureWarning
+  * whenever it sees two consecutive temperature events in a given time 
interval whose temperatures
+  * are higher than a given threshold value. A warning itself is not 
critical but if we see
+  * two warning for the same rack whose temperatures are rising, we want 
to generate an alert.
+  * This is achieved by defining another CEP pattern which analyzes the 
stream of generated
+  * temperature warnings.
+  */
+object TemperatureMonitoring {
+
+  private val TEMPERATURE_THRESHOLD = 100
+
+  def main(args: Array[String]) {
+println("Executing temperature monitoring Scala example.")
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+// Use ingestion time => TimeCharacteristic == EventTime + 
IngestionTimeExtractor
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+// Input stream of monitoring events
+val inputEventStream = env.addSource(new MonitoringEventSource())
+  .assignTimestampsAndWatermarks(new 
IngestionTimeExtractor[MonitoringEvent])
+
+// Warning pattern: Two consecutive temperature events whose 
temperature is higher
+// than the given threshold appearing within a time interval of 10 
seconds
+val warningPattern = Pattern
+  .begin[MonitoringEvent]("first")
+.subtype(classOf[TemperatureEvent])
+.where(_.temperature > TEMPERATURE_THRESHOLD)
+  .next("second")
+.subtype(classOf[TemperatureEvent])
+.where(_.temperature > TEMPERATURE_THRESHOLD)
+  .within(Time.seconds(10))
+
+// Create a pattern stream from our warning pattern
+val tempPatternStream = CEP.pattern(inputEventStream.keyBy(_.rackID), 
warningPattern)
+
+// Generate temperature warnings for each matched warning pattern
+val warnings: DataStream[TemperatureWarning] = 
tempPatternStream.select( pattern => {
+val first = pattern("first").head.asInstanceOf[TemperatureEvent]
+val second = pattern("second").head.asInstanceOf[TemperatureEvent]
+new TemperatureWarning(first.rackID, (first.temperature + 
second.temperature) / 2)
+  }
+)
+
+// Alert pattern: Two consecutive temperature warnings
+// appearing within a time interval of 20 seconds
+val alertPattern = Pattern
+  .begin[TemperatureWarning]("first")
+  .next("second")
+  .within(Time.seconds(20))
+
+// Create a pattern stream from our alert pattern
+val alertPatternStream = 

[jira] [Commented] (FLINK-4303) Add CEP examples

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505601#comment-16505601
 ] 

ASF GitHub Bot commented on FLINK-4303:
---

Github user medcv commented on a diff in the pull request:

https://github.com/apache/flink/pull/6136#discussion_r193934789
  
--- Diff: 
flink-examples/flink-examples-cep/src/main/scala/org/apache/flink/cep/examples/scala/monitoring/TemperatureMonitoring.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.examples.scala.monitoring
+
+import 
org.apache.flink.cep.examples.scala.monitoring.events.{MonitoringEvent, 
TemperatureAlert, TemperatureEvent, TemperatureWarning}
+import 
org.apache.flink.cep.examples.scala.monitoring.sources.MonitoringEventSource
+import org.apache.flink.cep.scala.CEP
+import org.apache.flink.cep.scala.pattern.Pattern
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.IngestionTimeExtractor
+import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment, createTypeInformation}
+import org.apache.flink.streaming.api.windowing.time.Time
+
+/**
+  * CEP example monitoring program.
+  * This example program generates a stream of monitoring events which are 
analyzed using
+  * Flink's CEP library. The input event stream consists of temperature 
and power events
+  * from a set of racks. The goal is to detect when a rack is about to 
overheat.
+  * In order to do that, we create a CEP pattern which generates a 
TemperatureWarning
+  * whenever it sees two consecutive temperature events in a given time 
interval whose temperatures
+  * are higher than a given threshold value. A warning itself is not 
critical but if we see
+  * two warning for the same rack whose temperatures are rising, we want 
to generate an alert.
+  * This is achieved by defining another CEP pattern which analyzes the 
stream of generated
+  * temperature warnings.
+  */
+object TemperatureMonitoring {
+
+  private val TEMPERATURE_THRESHOLD = 100
+
+  def main(args: Array[String]) {
+println("Executing temperature monitoring Scala example.")
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+// Use ingestion time => TimeCharacteristic == EventTime + 
IngestionTimeExtractor
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+// Input stream of monitoring events
+val inputEventStream = env.addSource(new MonitoringEventSource())
+  .assignTimestampsAndWatermarks(new 
IngestionTimeExtractor[MonitoringEvent])
+
+// Warning pattern: Two consecutive temperature events whose 
temperature is higher
+// than the given threshold appearing within a time interval of 10 
seconds
+val warningPattern = Pattern
+  .begin[MonitoringEvent]("first")
+.subtype(classOf[TemperatureEvent])
+.where(_.temperature > TEMPERATURE_THRESHOLD)
+  .next("second")
+.subtype(classOf[TemperatureEvent])
+.where(_.temperature > TEMPERATURE_THRESHOLD)
+  .within(Time.seconds(10))
+
+// Create a pattern stream from our warning pattern
+val tempPatternStream = CEP.pattern(inputEventStream.keyBy(_.rackID), 
warningPattern)
+
+// Generate temperature warnings for each matched warning pattern
+val warnings: DataStream[TemperatureWarning] = 
tempPatternStream.select( pattern => {
+val first = pattern("first").head.asInstanceOf[TemperatureEvent]
+val second = pattern("second").head.asInstanceOf[TemperatureEvent]
+new TemperatureWarning(first.rackID, (first.temperature + 
second.temperature) / 2)
+  }
+)
+
+// Alert pattern: Two consecutive temperature warnings
+// appearing within a time interval of 20 seconds
+val alertPattern = 

[GitHub] flink pull request #6136: FLINK-4303] [CEP] Add CEP examples

2018-06-07 Thread medcv
Github user medcv commented on a diff in the pull request:

https://github.com/apache/flink/pull/6136#discussion_r193934744
  
--- Diff: 
flink-examples/flink-examples-cep/src/main/java/org/apache/flink/cep/examples/java/monitoring/TemperatureMonitoring.java
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.examples.java.monitoring;
+
+import org.apache.flink.cep.CEP;
+import org.apache.flink.cep.PatternFlatSelectFunction;
+import org.apache.flink.cep.PatternSelectFunction;
+import org.apache.flink.cep.PatternStream;
+import 
org.apache.flink.cep.examples.java.monitoring.events.MonitoringEvent;
+import 
org.apache.flink.cep.examples.java.monitoring.events.TemperatureAlert;
+import 
org.apache.flink.cep.examples.java.monitoring.events.TemperatureEvent;
+import 
org.apache.flink.cep.examples.java.monitoring.events.TemperatureWarning;
+import 
org.apache.flink.cep.examples.java.monitoring.sources.MonitoringEventSource;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * CEP example monitoring program.
+ * This example program generates a stream of monitoring events which are 
analyzed using
+ * Flink's CEP library. The input event stream consists of temperature and 
power events
+ * from a set of racks. The goal is to detect when a rack is about to 
overheat.
+ * In order to do that, we create a CEP pattern which generates a 
TemperatureWarning
+ * whenever it sees two consecutive temperature events in a given time 
interval whose temperatures
+ * are higher than a given threshold value. A warning itself is not 
critical but if we see
+ * two warning for the same rack whose temperatures are rising, we want to 
generate an alert.
+ * This is achieved by defining another CEP pattern which analyzes the 
stream of generated
+ * temperature warnings.
+ */
+public class TemperatureMonitoring {
+
+   private static final double TEMPERATURE_THRESHOLD = 100;
+
+   public static void main(String[] args) throws Exception {
+   System.out.println("Executing temperature monitoring Java 
example.");
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   // Use ingestion time => TimeCharacteristic == EventTime + 
IngestionTimeExtractor
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   // Input stream of monitoring events
+   DataStream inputEventStream = 
env.addSource(new MonitoringEventSource())
+   .assignTimestampsAndWatermarks(new 
IngestionTimeExtractor<>());
+
+   // Warning pattern: Two consecutive temperature events whose 
temperature is higher
+   // than the given threshold appearing within a time interval of 
10 seconds
+   Pattern warningPattern = Pattern
+   .begin("first")
+   .subtype(TemperatureEvent.class)
+   .where(new SimpleCondition() {
+   @Override
+   public boolean filter(TemperatureEvent 
event) throws Exception {
+   return event.getTemperature() > 
TEMPERATURE_THRESHOLD;
+   }
+   })
+   .next("second")
+   .subtype(TemperatureEvent.class)
+   

[jira] [Commented] (FLINK-4303) Add CEP examples

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505598#comment-16505598
 ] 

ASF GitHub Bot commented on FLINK-4303:
---

Github user medcv commented on the issue:

https://github.com/apache/flink/pull/6136
  
@kisimple Recently I started to work on same example. I ran your code on my 
local and it did work perfectly. I could see some checkstyle errors that I 
think travis might pick them up. 

I have a suggestion for the sake of example scenario. 
For the `alerts` event you using `LocalTime` which indicate when `alerts` 
occurred, it would be useful also add `warnings` timestamp to `alerts` event in 
order to show when racks temperature passed the threshold, something like this 
on 
[line](https://github.com/kisimple/flink/blob/5cd3a374b84b2a7aaedb4c4184caded073e19295/flink-examples/flink-examples-cep/src/main/java/org/apache/flink/cep/examples/java/monitoring/TemperatureMonitoring.java#L133)
 
`out.collect(new TemperatureAlert(first.getRackID(), second.getDatetime()));
`

of course it needs to update the `TemperatureAlert` model also.


> Add CEP examples
> 
>
> Key: FLINK-4303
> URL: https://issues.apache.org/jira/browse/FLINK-4303
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Timo Walther
>Assignee: blues zheng
>Priority: Major
>
> Neither CEP Java nor CEP Scala contain a runnable example. The example on the 
> website is also not runnable without adding some additional code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6136: FLINK-4303] [CEP] Add CEP examples

2018-06-07 Thread medcv
Github user medcv commented on the issue:

https://github.com/apache/flink/pull/6136
  
@kisimple Recently I started to work on same example. I ran your code on my 
local and it did work perfectly. I could see some checkstyle errors that I 
think travis might pick them up. 

I have a suggestion for the sake of example scenario. 
For the `alerts` event you using `LocalTime` which indicate when `alerts` 
occurred, it would be useful also add `warnings` timestamp to `alerts` event in 
order to show when racks temperature passed the threshold, something like this 
on 
[line](https://github.com/kisimple/flink/blob/5cd3a374b84b2a7aaedb4c4184caded073e19295/flink-examples/flink-examples-cep/src/main/java/org/apache/flink/cep/examples/java/monitoring/TemperatureMonitoring.java#L133)
 
`out.collect(new TemperatureAlert(first.getRackID(), second.getDatetime()));
`

of course it needs to update the `TemperatureAlert` model also.


---


[jira] [Closed] (FLINK-9482) Not applicable functions for TIME

2018-06-07 Thread Shuyi Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuyi Chen closed FLINK-9482.
-
Resolution: Fixed

> Not applicable functions for TIME
> -
>
> Key: FLINK-9482
> URL: https://issues.apache.org/jira/browse/FLINK-9482
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Viktor Vlasov
>Assignee: Viktor Vlasov
>Priority: Minor
>
> Due to work on https://issues.apache.org/jira/browse/FLINK-9432 I have faced 
> with question how to check DECADE function with tests in
> _org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala._
>  
> Because I have used CENTURY function as an example, first of all I have check 
> it. During the test I figured out that when we use it with TIME it returns 0.
> I suppose arguments for such functions (also it works for YEAR, MONTH, 
> MILLENNIUM, etc) need to be checked and throw some exception if type is not 
> suitable.
> As an example, in Apache Calcite project (checked in sqlline shell), when I 
> am trying to use CENTURY with TIME it throw:
> {code:java}
> java.lang.AssertionError: unexpected TIME
> {code}
> Need to determine, why such check is not exists and add it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9482) Not applicable functions for TIME

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505575#comment-16505575
 ] 

ASF GitHub Bot commented on FLINK-9482:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6121


> Not applicable functions for TIME
> -
>
> Key: FLINK-9482
> URL: https://issues.apache.org/jira/browse/FLINK-9482
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Viktor Vlasov
>Assignee: Viktor Vlasov
>Priority: Minor
>
> Due to work on https://issues.apache.org/jira/browse/FLINK-9432 I have faced 
> with question how to check DECADE function with tests in
> _org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala._
>  
> Because I have used CENTURY function as an example, first of all I have check 
> it. During the test I figured out that when we use it with TIME it returns 0.
> I suppose arguments for such functions (also it works for YEAR, MONTH, 
> MILLENNIUM, etc) need to be checked and throw some exception if type is not 
> suitable.
> As an example, in Apache Calcite project (checked in sqlline shell), when I 
> am trying to use CENTURY with TIME it throw:
> {code:java}
> java.lang.AssertionError: unexpected TIME
> {code}
> Need to determine, why such check is not exists and add it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6121: [FLINK-9482] [table] EXTRACT function argument val...

2018-06-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6121


---


[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability

2018-06-07 Thread Indrajit Roychoudhury (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505512#comment-16505512
 ] 

Indrajit Roychoudhury commented on FLINK-9061:
--

[~neoeahit] [~jgrier] I'll pick this up next week, basic structure exists so it 
won't take much time to alter it. So i'm targeting 2 weeks from now, worst case 
end of month.

> add entropy to s3 path for better scalability
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jamie Grier
>Assignee: Indrajit Roychoudhury
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability

2018-06-07 Thread Jamie Grier (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505461#comment-16505461
 ] 

Jamie Grier commented on FLINK-9061:


[~neoeahit] This will affect all versions.

> add entropy to s3 path for better scalability
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jamie Grier
>Assignee: Indrajit Roychoudhury
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability

2018-06-07 Thread Monal Daxini (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505447#comment-16505447
 ] 

Monal Daxini commented on FLINK-9061:
-

In addition to what [~stevenz3wu] and [~jgrier] suggest, it will be good to 
make the entropy generation pluggable. This way users can override the default 
entropy generation, if they need to. 
{code:java}
// code placeholder
state.backend.fs.checkpointdir.injectEntropy.strategy=com.foo.MyStaticEntropyGenerator
{code}

> add entropy to s3 path for better scalability
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jamie Grier
>Assignee: Indrajit Roychoudhury
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6138: [FLINK-9550][DOC]FlinkCEP snippet example has some...

2018-06-07 Thread medcv
GitHub user medcv opened a pull request:

https://github.com/apache/flink/pull/6138

[FLINK-9550][DOC]FlinkCEP snippet example has some syntax errors


## What is the purpose of the change

Fixing FlinkCEP snippet code syntax errors and data type mismatches 

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/medcv/flink FLINK-9550

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6138.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6138


commit 4a5bc91bc2c326c1a38fd231d24b300b491b75cc
Author: Yadan.JS 
Date:   2018-06-07T21:52:27Z

[FLINK-9550][DOC]FlinkCEP snippet example has some syntax errors




---


[jira] [Commented] (FLINK-9539) Integrate flink-shaded 4.0

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505396#comment-16505396
 ] 

ASF GitHub Bot commented on FLINK-9539:
---

Github user medcv commented on the issue:

https://github.com/apache/flink/pull/6128
  
+1


> Integrate flink-shaded 4.0
> --
>
> Key: FLINK-9539
> URL: https://issues.apache.org/jira/browse/FLINK-9539
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> With the recent release of flink-shaded 4.0 we should bump the versions for 
> all dependencies (except netty which is handled in FLINK-3952).
> We can now remove the exclusions from the jackson dependencies as they are 
> now properly hidden.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6128: [FLINK-9539][build] Integrate flink-shaded 4.0

2018-06-07 Thread medcv
Github user medcv commented on the issue:

https://github.com/apache/flink/pull/6128
  
+1


---


[jira] [Commented] (FLINK-9551) FlinkCEP Scala Combining Patterns table has a missing pattern

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505380#comment-16505380
 ] 

ASF GitHub Bot commented on FLINK-9551:
---

GitHub user medcv opened a pull request:

https://github.com/apache/flink/pull/6139

[FLINK-9551][DOCS]FlinkCEP Scala Combining Patterns table has a missing 
pattern


## What is the purpose of the change

in FlinkCEP documentation section related to Combining Patterns Scala Table 
has a missing patterns compare to Java table

`begin(#pattern_sequence)`
and also 
`begin()` pattern has missing `#name` params


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/medcv/flink FLINK-9551

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6139.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6139


commit 83d709f3ce9c73435d8a2c423f79c3d323a74604
Author: Yadan.JS 
Date:   2018-06-07T22:07:57Z

[FLINK-9551][DOCS]FlinkCEP Scala Combining Patterns table has a missing 
pattern




> FlinkCEP Scala Combining Patterns table has a missing pattern
> -
>
> Key: FLINK-9551
> URL: https://issues.apache.org/jira/browse/FLINK-9551
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Yazdan Shirvany
>Assignee: Yazdan Shirvany
>Priority: Minor
>
> in FlinkCEP documentation section related to {{Combining Patterns}} Scala 
> Table has a missing patterns compare to Java table
> {{begin(#pattern_sequence)}}
> and also 
> {{begin()}} pattern has missing {{#name}} params



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6139: [FLINK-9551][DOCS]FlinkCEP Scala Combining Pattern...

2018-06-07 Thread medcv
GitHub user medcv opened a pull request:

https://github.com/apache/flink/pull/6139

[FLINK-9551][DOCS]FlinkCEP Scala Combining Patterns table has a missing 
pattern


## What is the purpose of the change

in FlinkCEP documentation section related to Combining Patterns Scala Table 
has a missing patterns compare to Java table

`begin(#pattern_sequence)`
and also 
`begin()` pattern has missing `#name` params


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/medcv/flink FLINK-9551

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6139.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6139


commit 83d709f3ce9c73435d8a2c423f79c3d323a74604
Author: Yadan.JS 
Date:   2018-06-07T22:07:57Z

[FLINK-9551][DOCS]FlinkCEP Scala Combining Patterns table has a missing 
pattern




---


[jira] [Commented] (FLINK-9550) FlinkCEP snippet example has some syntax errors

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505367#comment-16505367
 ] 

ASF GitHub Bot commented on FLINK-9550:
---

GitHub user medcv opened a pull request:

https://github.com/apache/flink/pull/6138

[FLINK-9550][DOC]FlinkCEP snippet example has some syntax errors


## What is the purpose of the change

Fixing FlinkCEP snippet code syntax errors and data type mismatches 

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/medcv/flink FLINK-9550

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6138.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6138


commit 4a5bc91bc2c326c1a38fd231d24b300b491b75cc
Author: Yadan.JS 
Date:   2018-06-07T21:52:27Z

[FLINK-9550][DOC]FlinkCEP snippet example has some syntax errors




> FlinkCEP snippet example has some syntax errors
> ---
>
> Key: FLINK-9550
> URL: https://issues.apache.org/jira/browse/FLINK-9550
> Project: Flink
>  Issue Type: Improvement
>Reporter: Yazdan Shirvany
>Assignee: Yazdan Shirvany
>Priority: Minor
>
> I tried to work with FlinckCEP documentarians and my assumption was the 
> snippet example should work but I got some syntax errors that needed to be 
> fix.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9551) FlinkCEP Scala Combining Patterns table has a missing pattern

2018-06-07 Thread Yazdan Shirvany (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yazdan Shirvany updated FLINK-9551:
---
Summary: FlinkCEP Scala Combining Patterns table has a missing pattern  
(was: FlinkCEP Scala Combining Patterns table has two missing pattern)

> FlinkCEP Scala Combining Patterns table has a missing pattern
> -
>
> Key: FLINK-9551
> URL: https://issues.apache.org/jira/browse/FLINK-9551
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Yazdan Shirvany
>Assignee: Yazdan Shirvany
>Priority: Minor
>
> in FlinkCEP documentation section related to {{Combining Patterns}} Scala 
> Table has a missing patterns compare to Java table
> {{begin(#pattern_sequence)}}
> and also 
> {{begin()}} pattern has missing {{#name}} params



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9551) FlinkCEP Scala Combining Patterns table has two missing pattern

2018-06-07 Thread Yazdan Shirvany (JIRA)
Yazdan Shirvany created FLINK-9551:
--

 Summary: FlinkCEP Scala Combining Patterns table has two missing 
pattern
 Key: FLINK-9551
 URL: https://issues.apache.org/jira/browse/FLINK-9551
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Yazdan Shirvany
Assignee: Yazdan Shirvany


in FlinkCEP documentation section related to {{Combining Patterns}} Scala Table 
has a missing patterns compare to Java table

{{begin(#pattern_sequence)}}
and also 
{{begin()}} pattern has missing {{#name}} params




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9550) FlinkCEP snippet example has some syntax errors

2018-06-07 Thread Yazdan Shirvany (JIRA)
Yazdan Shirvany created FLINK-9550:
--

 Summary: FlinkCEP snippet example has some syntax errors
 Key: FLINK-9550
 URL: https://issues.apache.org/jira/browse/FLINK-9550
 Project: Flink
  Issue Type: Improvement
Reporter: Yazdan Shirvany
Assignee: Yazdan Shirvany


I tried to work with FlinckCEP documentarians and my assumption was the snippet 
example should work but I got some syntax errors that needed to be fix.  




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9549) Fix FlickCEP Docs broken link and minor style changes

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505345#comment-16505345
 ] 

ASF GitHub Bot commented on FLINK-9549:
---

GitHub user medcv opened a pull request:

https://github.com/apache/flink/pull/6137

[FLINK-9549][DOC]Fix FlickCEP Docs broken link and minor style changes


## What is the purpose of the change

Fixing FlickCEP broken link and minor style changes

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/medcv/flink FLINK-9549

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6137.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6137


commit 2a9b6ff7b4dc4e2ec624ed18ba37b46d82f93e6d
Author: Yadan.JS 
Date:   2018-06-07T21:33:49Z

[FLINK-9549][DOC]Fix FlickCEP Docs broken link and minor style changes




> Fix FlickCEP Docs broken link and minor style changes
> -
>
> Key: FLINK-9549
> URL: https://issues.apache.org/jira/browse/FLINK-9549
> Project: Flink
>  Issue Type: Improvement
>Reporter: Yazdan Shirvany
>Assignee: Yazdan Shirvany
>Priority: Trivial
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6137: [FLINK-9549][DOC]Fix FlickCEP Docs broken link and...

2018-06-07 Thread medcv
GitHub user medcv opened a pull request:

https://github.com/apache/flink/pull/6137

[FLINK-9549][DOC]Fix FlickCEP Docs broken link and minor style changes


## What is the purpose of the change

Fixing FlickCEP broken link and minor style changes

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/medcv/flink FLINK-9549

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6137.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6137


commit 2a9b6ff7b4dc4e2ec624ed18ba37b46d82f93e6d
Author: Yadan.JS 
Date:   2018-06-07T21:33:49Z

[FLINK-9549][DOC]Fix FlickCEP Docs broken link and minor style changes




---


[jira] [Created] (FLINK-9549) Fix FlickCEP Docs broken link and minor style changes

2018-06-07 Thread Yazdan Shirvany (JIRA)
Yazdan Shirvany created FLINK-9549:
--

 Summary: Fix FlickCEP Docs broken link and minor style changes
 Key: FLINK-9549
 URL: https://issues.apache.org/jira/browse/FLINK-9549
 Project: Flink
  Issue Type: Improvement
Reporter: Yazdan Shirvany
Assignee: Yazdan Shirvany






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6131: [hotfix][docs] Fix Table API scala example code

2018-06-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6131


---


[jira] [Created] (FLINK-9548) Flink Apache Kudu Connector

2018-06-07 Thread Sandish Kumar HN (JIRA)
Sandish Kumar HN created FLINK-9548:
---

 Summary: Flink Apache Kudu Connector
 Key: FLINK-9548
 URL: https://issues.apache.org/jira/browse/FLINK-9548
 Project: Flink
  Issue Type: New Feature
Reporter: Sandish Kumar HN


Flink Apache Kudu Connector will be good addition. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4303) Add CEP examples

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505047#comment-16505047
 ] 

ASF GitHub Bot commented on FLINK-4303:
---

Github user kisimple commented on the issue:

https://github.com/apache/flink/pull/6136
  
cc @kl0u 


> Add CEP examples
> 
>
> Key: FLINK-4303
> URL: https://issues.apache.org/jira/browse/FLINK-4303
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Timo Walther
>Assignee: blues zheng
>Priority: Major
>
> Neither CEP Java nor CEP Scala contain a runnable example. The example on the 
> website is also not runnable without adding some additional code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6136: FLINK-4303] [CEP] Add CEP examples

2018-06-07 Thread kisimple
Github user kisimple commented on the issue:

https://github.com/apache/flink/pull/6136
  
cc @kl0u 


---


[jira] [Commented] (FLINK-4303) Add CEP examples

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505046#comment-16505046
 ] 

ASF GitHub Bot commented on FLINK-4303:
---

GitHub user kisimple opened a pull request:

https://github.com/apache/flink/pull/6136

FLINK-4303] [CEP] Add CEP examples

## What is the purpose of the change

Currently neither CEP Java nor CEP Scala contain a runnable example. This 
PR fixes the problem by adding a `flink-examples-cep` module. The change is 
based on #2937

## Brief change log

  - Add a `flink-examples-cep` module
  - Add a `TemperatureMonitoring.java` example
  - Add a `TemperatureMonitoring.scala` example

## Verifying this change

  - Build the project and run the example by the following commands:
`./bin/flink run -c 
org.apache.flink.cep.examples.java.monitoring.TemperatureMonitoring 
./examples/cep/flink-examples-cep-with-dependencies.jar`
and
`./bin/flink run -c 
org.apache.flink.cep.examples.scala.monitoring.TemperatureMonitoring 
./examples/cep/flink-examples-cep-with-dependencies.jar`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kisimple/flink FLINK-4303

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6136.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6136


commit f9f382780e0975c83f2a957132c1c09c769cf3a3
Author: Aleksandr Chermenin 
Date:   2016-12-05T09:44:39Z

[FLINK-4303] Added examples for CEP library.

commit 7b96b9d5ffd221c0ff4169ac84c601eb34f95876
Author: Aleksandr Chermenin 
Date:   2016-12-05T10:09:50Z

[FLINK-4303] Fixed code style.

commit f26e7f403b7d7a74cd9a0d129086468789aa2747
Author: Aleksandr Chermenin 
Date:   2016-12-05T10:28:59Z

[FLINK-4303] Another small code style fix.

commit 3a214a38f4cda5a3e965ce4512b960ac5a7fe59f
Author: Aleksandr Chermenin 
Date:   2016-12-05T11:24:14Z

[FLINK-4303] Fixed Scala code style.

commit 134492dc23a5eaa2173669ed213c3f933dcc4391
Author: Aleksandr Chermenin 
Date:   2017-02-22T06:59:28Z

[FLINK-4303] Fixed hashCode methods for events.

commit ccf26763acf363134ef4bfa5127c36af226a3ed2
Author: blueszheng 
Date:   2018-06-07T17:35:18Z

[FLINK-4303] Add CEP examples




> Add CEP examples
> 
>
> Key: FLINK-4303
> URL: https://issues.apache.org/jira/browse/FLINK-4303
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Timo Walther
>Assignee: blues zheng
>Priority: Major
>
> Neither CEP Java nor CEP Scala contain a runnable example. The example on the 
> website is also not runnable without adding some additional code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6136: FLINK-4303] [CEP] Add CEP examples

2018-06-07 Thread kisimple
GitHub user kisimple opened a pull request:

https://github.com/apache/flink/pull/6136

FLINK-4303] [CEP] Add CEP examples

## What is the purpose of the change

Currently neither CEP Java nor CEP Scala contain a runnable example. This 
PR fixes the problem by adding a `flink-examples-cep` module. The change is 
based on #2937

## Brief change log

  - Add a `flink-examples-cep` module
  - Add a `TemperatureMonitoring.java` example
  - Add a `TemperatureMonitoring.scala` example

## Verifying this change

  - Build the project and run the example by the following commands:
`./bin/flink run -c 
org.apache.flink.cep.examples.java.monitoring.TemperatureMonitoring 
./examples/cep/flink-examples-cep-with-dependencies.jar`
and
`./bin/flink run -c 
org.apache.flink.cep.examples.scala.monitoring.TemperatureMonitoring 
./examples/cep/flink-examples-cep-with-dependencies.jar`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kisimple/flink FLINK-4303

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6136.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6136


commit f9f382780e0975c83f2a957132c1c09c769cf3a3
Author: Aleksandr Chermenin 
Date:   2016-12-05T09:44:39Z

[FLINK-4303] Added examples for CEP library.

commit 7b96b9d5ffd221c0ff4169ac84c601eb34f95876
Author: Aleksandr Chermenin 
Date:   2016-12-05T10:09:50Z

[FLINK-4303] Fixed code style.

commit f26e7f403b7d7a74cd9a0d129086468789aa2747
Author: Aleksandr Chermenin 
Date:   2016-12-05T10:28:59Z

[FLINK-4303] Another small code style fix.

commit 3a214a38f4cda5a3e965ce4512b960ac5a7fe59f
Author: Aleksandr Chermenin 
Date:   2016-12-05T11:24:14Z

[FLINK-4303] Fixed Scala code style.

commit 134492dc23a5eaa2173669ed213c3f933dcc4391
Author: Aleksandr Chermenin 
Date:   2017-02-22T06:59:28Z

[FLINK-4303] Fixed hashCode methods for events.

commit ccf26763acf363134ef4bfa5127c36af226a3ed2
Author: blueszheng 
Date:   2018-06-07T17:35:18Z

[FLINK-4303] Add CEP examples




---


[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability

2018-06-07 Thread Vipul Singh (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505042#comment-16505042
 ] 

Vipul Singh commented on FLINK-9061:


We are also seem to be affected by this. I wanted to check regarding the 
timelines on this feature. 

[~jgrier] qq wrt to affects versions: Does this only affect >=1.4 ? we seem to 
be seeing a similar issue for flink 1.3

> add entropy to s3 path for better scalability
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jamie Grier
>Assignee: Indrajit Roychoudhury
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9547) CEP pattern not called on windowed stream

2018-06-07 Thread Lucas Resch (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505021#comment-16505021
 ] 

Lucas Resch edited comment on FLINK-9547 at 6/7/18 6:07 PM:


[~dawidwys] I created a small example that does something similar. Somehow the 
behavior is different though. Now it doesn't call the initial pattern but the 
one on the windowed stream is called. Something is definitely wrong.

 
{code:java}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource objectDataStreamSource = env.fromElements(
1, 2, 3, 4, 5, 6, 7, 8, 9, 10
);

SingleOutputStreamOperator forces = objectDataStreamSource
.filter((FilterFunction) Objects::nonNull)
.process(new ProcessFunction() {
@Override
public void processElement(Integer value, Context ctx, 
Collector out) throws Exception {
out.collect(value.longValue());
}
});

Pattern forcesMock = Pattern.begin("start").where(new 
SimpleCondition() {
@Override
public boolean filter(Long value) {
return true;
}
});

CEP.pattern(forces, forcesMock)
.select(new PatternSelectFunction() {
@Override
public String select(Map> pattern) throws 
Exception {
return String.format("Prints %d as expected", 
pattern.get("start").get(0));
}
}).print();

SingleOutputStreamOperator intervals = forces
.countWindowAll(2, 1)
.process(new ProcessAllWindowFunction() {
@Override
public void process(Context context, Iterable elements, 
Collector out) throws Exception {
List items = new ArrayList<>();
elements.forEach(items::add);
if (items.size() == 2) {
out.collect(items.get(0));
}
}
});

Pattern intervalMock = Pattern.begin("start").where(new 
SimpleCondition() {
@Override
public boolean filter(Long value) throws Exception {
return true;
}
});

CEP.pattern(intervals, intervalMock)
.select(new PatternSelectFunction() {
@Override
public String select(Map> pattern) throws 
Exception {
return String.format("Doesn't print %d", 
pattern.get("start").get(0));
}
}).print();

env.execute();
{code}


was (Author: mlnotw):
[~dawidwys] I created a small example that does something similar. Somehow the 
behavior is different though. Now it doesn't call the initial pattern but the 
one on the windowed stream is called. Something is definitely wrong.

 
{code:java}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource objectDataStreamSource = env.fromElements(
1, 2, 3, 4, 5, 6, 7, 8, 9, 10
);

SingleOutputStreamOperator forces = objectDataStreamSource
.filter((FilterFunction) Objects::nonNull)
.process(new ProcessFunction() {
@Override
public void processElement(Integer value, Context ctx, 
Collector out) throws Exception {
out.collect(value.longValue());
}
});

Pattern forcesMock = Pattern.begin("start").where(new 
SimpleCondition() {
@Override
public boolean filter(Long value) {
return true;
}
});

CEP.pattern(forces, forcesMock)
.select(new PatternSelectFunction() {
@Override
public String select(Map> pattern) throws 
Exception {
return String.format("Prints %d as expected", 
pattern.get("start").get(0));
}
}).print();

// Create another stream based on a sliding window over the input stream
SingleOutputStreamOperator intervals = forces
.countWindowAll(2, 1)
.process(new ProcessAllWindowFunction() {
@Override
public void process(Context context, Iterable elements, 
Collector out) throws Exception {
List items = new ArrayList<>();
elements.forEach(items::add);
if (items.size() == 2) {
out.collect(items.get(0));
}
}
});

Pattern intervalMock = Pattern.begin("start").where(new 
SimpleCondition() {
@Override
public boolean filter(Long value) throws Exception {
return true;
}
});

CEP.pattern(intervals, intervalMock)
.select(new PatternSelectFunction() {
@Override
public String select(Map> pattern) throws 
Exception {
return String.format("Doesn't print %d", 
pattern.get("start").get(0));
}
}).print();

env.execute();
{code}

> CEP pattern not called on windowed stream
> 

[jira] [Comment Edited] (FLINK-9547) CEP pattern not called on windowed stream

2018-06-07 Thread Lucas Resch (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505021#comment-16505021
 ] 

Lucas Resch edited comment on FLINK-9547 at 6/7/18 6:06 PM:


[~dawidwys] I created a small example that does something similar. Somehow the 
behavior is different though. Now it doesn't call the initial pattern but the 
one on the windowed stream is called. Something is definitely wrong.

 
{code:java}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource objectDataStreamSource = env.fromElements(
1, 2, 3, 4, 5, 6, 7, 8, 9, 10
);

SingleOutputStreamOperator forces = objectDataStreamSource
.filter((FilterFunction) Objects::nonNull)
.process(new ProcessFunction() {
@Override
public void processElement(Integer value, Context ctx, 
Collector out) throws Exception {
out.collect(value.longValue());
}
});

Pattern forcesMock = Pattern.begin("start").where(new 
SimpleCondition() {
@Override
public boolean filter(Long value) {
return true;
}
});

CEP.pattern(forces, forcesMock)
.select(new PatternSelectFunction() {
@Override
public String select(Map> pattern) throws 
Exception {
return String.format("Prints %d as expected", 
pattern.get("start").get(0));
}
}).print();

// Create another stream based on a sliding window over the input stream
SingleOutputStreamOperator intervals = forces
.countWindowAll(2, 1)
.process(new ProcessAllWindowFunction() {
@Override
public void process(Context context, Iterable elements, 
Collector out) throws Exception {
List items = new ArrayList<>();
elements.forEach(items::add);
if (items.size() == 2) {
out.collect(items.get(0));
}
}
});

Pattern intervalMock = Pattern.begin("start").where(new 
SimpleCondition() {
@Override
public boolean filter(Long value) throws Exception {
return true;
}
});

CEP.pattern(intervals, intervalMock)
.select(new PatternSelectFunction() {
@Override
public String select(Map> pattern) throws 
Exception {
return String.format("Doesn't print %d", 
pattern.get("start").get(0));
}
}).print();

env.execute();
{code}


was (Author: mlnotw):
I created a small example that does something similar. Somehow the behavior is 
different though. Now it doesn't call the initial pattern but the one on the 
windowed stream is called. Something is definitely wrong.

 
{code:java}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource objectDataStreamSource = env.fromElements(
1, 2, 3, 4, 5, 6, 7, 8, 9, 10
);

SingleOutputStreamOperator forces = objectDataStreamSource
.filter((FilterFunction) Objects::nonNull)
.process(new ProcessFunction() {
@Override
public void processElement(Integer value, Context ctx, 
Collector out) throws Exception {
out.collect(value.longValue());
}
});

Pattern forcesMock = Pattern.begin("start").where(new 
SimpleCondition() {
@Override
public boolean filter(Long value) {
return true;
}
});

CEP.pattern(forces, forcesMock)
.select(new PatternSelectFunction() {
@Override
public String select(Map> pattern) throws 
Exception {
return String.format("Prints %d as expected", 
pattern.get("start").get(0));
}
}).print();

// Create another stream based on a sliding window over the input stream
SingleOutputStreamOperator intervals = forces
.countWindowAll(2, 1)
.process(new ProcessAllWindowFunction() {
@Override
public void process(Context context, Iterable elements, 
Collector out) throws Exception {
List items = new ArrayList<>();
elements.forEach(items::add);
if (items.size() == 2) {
out.collect(items.get(0));
}
}
});

Pattern intervalMock = Pattern.begin("start").where(new 
SimpleCondition() {
@Override
public boolean filter(Long value) throws Exception {
return true;
}
});

CEP.pattern(intervals, intervalMock)
.select(new PatternSelectFunction() {
@Override
public String select(Map> pattern) throws 
Exception {
return String.format("Doesn't print %d", 
pattern.get("start").get(0));
}
}).print();

env.execute();
{code}

> 

[jira] [Commented] (FLINK-9547) CEP pattern not called on windowed stream

2018-06-07 Thread Lucas Resch (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505021#comment-16505021
 ] 

Lucas Resch commented on FLINK-9547:


I created a small example that does something similar. Somehow the behavior is 
different though. Now it doesn't call the initial pattern but the one on the 
windowed stream is called. Something is definitely wrong.

 
{code:java}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource objectDataStreamSource = env.fromElements(
1, 2, 3, 4, 5, 6, 7, 8, 9, 10
);

SingleOutputStreamOperator forces = objectDataStreamSource
.filter((FilterFunction) Objects::nonNull)
.process(new ProcessFunction() {
@Override
public void processElement(Integer value, Context ctx, 
Collector out) throws Exception {
out.collect(value.longValue());
}
});

Pattern forcesMock = Pattern.begin("start").where(new 
SimpleCondition() {
@Override
public boolean filter(Long value) {
return true;
}
});

CEP.pattern(forces, forcesMock)
.select(new PatternSelectFunction() {
@Override
public String select(Map> pattern) throws 
Exception {
return String.format("Prints %d as expected", 
pattern.get("start").get(0));
}
}).print();

// Create another stream based on a sliding window over the input stream
SingleOutputStreamOperator intervals = forces
.countWindowAll(2, 1)
.process(new ProcessAllWindowFunction() {
@Override
public void process(Context context, Iterable elements, 
Collector out) throws Exception {
List items = new ArrayList<>();
elements.forEach(items::add);
if (items.size() == 2) {
out.collect(items.get(0));
}
}
});

Pattern intervalMock = Pattern.begin("start").where(new 
SimpleCondition() {
@Override
public boolean filter(Long value) throws Exception {
return true;
}
});

CEP.pattern(intervals, intervalMock)
.select(new PatternSelectFunction() {
@Override
public String select(Map> pattern) throws 
Exception {
return String.format("Doesn't print %d", 
pattern.get("start").get(0));
}
}).print();

env.execute();
{code}

> CEP pattern not called on windowed stream
> -
>
> Key: FLINK-9547
> URL: https://issues.apache.org/jira/browse/FLINK-9547
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.2, 1.5.0
>Reporter: Lucas Resch
>Priority: Major
>
> When trying to match a pattern on a stream that was windowed the pattern will 
> not be called. The following shows example code where the issue was noticed:
> {code:java}
> // Set up stream
> SingleOutputStreamOperator forces = ...
> .filter(new FilterForcesFunction())
> .process(new ProcessForcesFunction());
> // Define mock pattern
> Pattern forcesMock = Pattern.begin("start").where(new 
> SimpleCondition() {
> @Override
> public boolean filter(ForceZ value) {
> // This is called as expected
> return true;
> }
> });
> // Print pattern results
> // This actually prints all incoming events as expected
> CEP.pattern(forcesMock, mock)
> .select(new PatternSelectFunction() {
> @Override
> public ForceZ select(Map> pattern){
> return pattern.get("start").get(0);
> }
> }).print();
> // Create another stream based on a sliding window over the input stream
> SingleOutputStreamOperator intervals = forces
> .countWindowAll(2, 1)
> .process(new ForceWindowFunction());
> // Define mock pattern
> Pattern intervalMock = 
> Pattern.begin("start").where(new SimpleCondition() {
> @Override
> public boolean filter(Interval value) throws Exception {
> // This is never called
> return true;
> }
> });
> // Print pattern results
> // Doesn't print anything since the mock condition is never called
> CEP.pattern(intervals, intervalMock)
> .select(new PatternSelectFunction() {
> @Override
> public Interval select(Map> pattern) 
> throws Exception {
> return pattern.get("start").get(0);
> }
> }).print();
> {code}
> Either I'm doing something wrong or this is a major bug.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6130: [FLINK-9545] Support read a file multiple times in Flink ...

2018-06-07 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6130
  
You're effectively only explaining what this feature is, but not why it is 
actually needed. We have to gauge whether this feature is useful for other 
users as well before we decide to maintain it. 


---


[jira] [Commented] (FLINK-9545) Support read a file multiple times in Flink DataStream

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505017#comment-16505017
 ] 

ASF GitHub Bot commented on FLINK-9545:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6130
  
You're effectively only explaining what this feature is, but not why it is 
actually needed. We have to gauge whether this feature is useful for other 
users as well before we decide to maintain it. 


> Support read a file multiple times in Flink DataStream 
> ---
>
> Key: FLINK-9545
> URL: https://issues.apache.org/jira/browse/FLINK-9545
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.6.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.6.0
>
>
> Motivation: We have the requirements to read a bunch files, each file to read 
> multiple times, to feed our streams
> Specifically we need {{StreamExecutionEnvironment.readFile/readTextFile}} to 
> be able to read a file for a specified {{N}} times, but currently it only 
> supports reading file once.
> We've implemented this internally. Would be good to get it back to the 
> community version. This jira is to add support for the feature. 
> Plan:
> add a new processing mode as PROCESSING_N_TIMES, and add additional parameter 
> {{numTimes}} for {{StreamExecutionEnvironment.readFile/readTextFile}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6130: [FLINK-9545] Support read a file multiple times in Flink ...

2018-06-07 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/6130
  
@aljoscha 

Motivation: We have the requirements to read a bunch files, each file to 
read multiple times, to feed our streams

Specifically we need `StreamExecutionEnvironment.readFile/readTextFile` to 
be able to read a file for a specified `N` times, but currently it only 
supports reading file once.

We've implemented this internally. Would be good to get it back to the 
community version. This jira is to add support for the feature. 


---


[jira] [Commented] (FLINK-9545) Support read a file multiple times in Flink DataStream

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504916#comment-16504916
 ] 

ASF GitHub Bot commented on FLINK-9545:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/6130
  
@aljoscha 

Motivation: We have the requirements to read a bunch files, each file to 
read multiple times, to feed our streams

Specifically we need `StreamExecutionEnvironment.readFile/readTextFile` to 
be able to read a file for a specified `N` times, but currently it only 
supports reading file once.

We've implemented this internally. Would be good to get it back to the 
community version. This jira is to add support for the feature. 


> Support read a file multiple times in Flink DataStream 
> ---
>
> Key: FLINK-9545
> URL: https://issues.apache.org/jira/browse/FLINK-9545
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.6.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.6.0
>
>
> Motivation: We have the requirements to read a bunch files, each file to read 
> multiple times, to feed our streams
> Specifically we need {{StreamExecutionEnvironment.readFile/readTextFile}} to 
> be able to read a file for a specified {{N}} times, but currently it only 
> supports reading file once.
> We've implemented this internally. Would be good to get it back to the 
> community version. This jira is to add support for the feature. 
> Plan:
> add a new processing mode as PROCESSING_N_TIMES, and add additional parameter 
> {{numTimes}} for {{StreamExecutionEnvironment.readFile/readTextFile}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9545) Support read a file multiple times in Flink DataStream

2018-06-07 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-9545:

Description: 
Motivation: We have the requirements to read a bunch files, each file to read 
multiple times, to feed our streams

Specifically we need {{StreamExecutionEnvironment.readFile/readTextFile}} to be 
able to read a file for a specified {{N}} times, but currently it only supports 
reading file once.

We've implemented this internally. Would be good to get it back to the 
community version. This jira is to add support for the feature. 

Plan:

add a new processing mode as PROCESSING_N_TIMES, and add additional parameter 
{{numTimes}} for {{StreamExecutionEnvironment.readFile/readTextFile}}

  was:
Motivation: We have the requirements to read a bunch files, each file to read 
multiple times, to feed our streams

Specifically we need {{StreamExecutionEnvironment.readFile/readTextFile}} to be 
able to read a file for a specified {{N}} times, but currently it only supports 
reading file once.

We've implemented this internally. Would be good to back it back to the 
community version.

Feature: add support for the feature.

Plan:

add a new processing mode as PROCESSING_N_TIMES, and add additional parameter 
{{numTimes}} for {{StreamExecutionEnvironment.readFile/readTextFile}}


> Support read a file multiple times in Flink DataStream 
> ---
>
> Key: FLINK-9545
> URL: https://issues.apache.org/jira/browse/FLINK-9545
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.6.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.6.0
>
>
> Motivation: We have the requirements to read a bunch files, each file to read 
> multiple times, to feed our streams
> Specifically we need {{StreamExecutionEnvironment.readFile/readTextFile}} to 
> be able to read a file for a specified {{N}} times, but currently it only 
> supports reading file once.
> We've implemented this internally. Would be good to get it back to the 
> community version. This jira is to add support for the feature. 
> Plan:
> add a new processing mode as PROCESSING_N_TIMES, and add additional parameter 
> {{numTimes}} for {{StreamExecutionEnvironment.readFile/readTextFile}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9545) Support read a file multiple times in Flink DataStream

2018-06-07 Thread Bowen Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504913#comment-16504913
 ] 

Bowen Li commented on FLINK-9545:
-

[~aljoscha] I updated the description

> Support read a file multiple times in Flink DataStream 
> ---
>
> Key: FLINK-9545
> URL: https://issues.apache.org/jira/browse/FLINK-9545
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.6.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.6.0
>
>
> Motivation: We have the requirements to read a bunch files, each file to read 
> multiple times, to feed our streams
> Specifically we need {{StreamExecutionEnvironment.readFile/readTextFile}} to 
> be able to read a file for a specified {{N}} times, but currently it only 
> supports reading file once.
> We've implemented this internally. Would be good to get it back to the 
> community version. This jira is to add support for the feature. 
> Plan:
> add a new processing mode as PROCESSING_N_TIMES, and add additional parameter 
> {{numTimes}} for {{StreamExecutionEnvironment.readFile/readTextFile}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9545) Support read a file multiple times in Flink DataStream

2018-06-07 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-9545:

Description: 
Motivation: We have the requirements to read a bunch files, each file to read 
multiple times, to feed our streams

Specifically we need {{StreamExecutionEnvironment.readFile/readTextFile}} to be 
able to read a file for a specified {{N}} times, but currently it only supports 
reading file once.

We've implemented this internally. Would be good to back it back to the 
community version.

Feature: add support for the feature.

Plan:

add a new processing mode as PROCESSING_N_TIMES, and add additional parameter 
{{numTimes}} for {{StreamExecutionEnvironment.readFile/readTextFile}}

  was:
we need {{StreamExecutionEnvironment.readFile/readTextFile}} to read each file 
for N times, but currently it only supports reading file once.

add support for the feature.

Plan:

add a new processing mode as PROCESSING_N_TIMES, and add additional parameter 
{{numTimes}} for {{StreamExecutionEnvironment.readFile/readTextFile}}


> Support read a file multiple times in Flink DataStream 
> ---
>
> Key: FLINK-9545
> URL: https://issues.apache.org/jira/browse/FLINK-9545
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.6.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.6.0
>
>
> Motivation: We have the requirements to read a bunch files, each file to read 
> multiple times, to feed our streams
> Specifically we need {{StreamExecutionEnvironment.readFile/readTextFile}} to 
> be able to read a file for a specified {{N}} times, but currently it only 
> supports reading file once.
> We've implemented this internally. Would be good to back it back to the 
> community version.
> Feature: add support for the feature.
> Plan:
> add a new processing mode as PROCESSING_N_TIMES, and add additional parameter 
> {{numTimes}} for {{StreamExecutionEnvironment.readFile/readTextFile}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9538) Make KeyedStateFunction an interface

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504863#comment-16504863
 ] 

ASF GitHub Bot commented on FLINK-9538:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/6134
  
Sorry @yanghua , will have a look tomorrow. You really don't need to ping 
me every hour, will get to it as soon as I have some time.


> Make KeyedStateFunction an interface
> 
>
> Key: FLINK-9538
> URL: https://issues.apache.org/jira/browse/FLINK-9538
> Project: Flink
>  Issue Type: Improvement
>Reporter: Dawid Wysakowicz
>Assignee: vinoyang
>Priority: Major
>
> I suggest to change the KeyedStateFunction from abstract class to interface 
> (FunctionalInterface in particular) to enable passing lambdas.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6134: [FLINK-9538] Make KeyedStateFunction an interface

2018-06-07 Thread dawidwys
Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/6134
  
Sorry @yanghua , will have a look tomorrow. You really don't need to ping 
me every hour, will get to it as soon as I have some time.


---


[GitHub] flink issue #6134: [FLINK-9538] Make KeyedStateFunction an interface

2018-06-07 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6134
  
cc @dawidwys refactored code, please review again~


---


[jira] [Commented] (FLINK-9538) Make KeyedStateFunction an interface

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504859#comment-16504859
 ] 

ASF GitHub Bot commented on FLINK-9538:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6134
  
cc @dawidwys refactored code, please review again~


> Make KeyedStateFunction an interface
> 
>
> Key: FLINK-9538
> URL: https://issues.apache.org/jira/browse/FLINK-9538
> Project: Flink
>  Issue Type: Improvement
>Reporter: Dawid Wysakowicz
>Assignee: vinoyang
>Priority: Major
>
> I suggest to change the KeyedStateFunction from abstract class to interface 
> (FunctionalInterface in particular) to enable passing lambdas.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9547) CEP pattern not called on windowed stream

2018-06-07 Thread Dawid Wysakowicz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504834#comment-16504834
 ] 

Dawid Wysakowicz commented on FLINK-9547:
-

Hi [~MLNotW]. Could you provide a runnable example that we could use to 
reproduce this bug? With inputs and ProcessFunctions?

> CEP pattern not called on windowed stream
> -
>
> Key: FLINK-9547
> URL: https://issues.apache.org/jira/browse/FLINK-9547
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.2, 1.5.0
>Reporter: Lucas Resch
>Priority: Major
>
> When trying to match a pattern on a stream that was windowed the pattern will 
> not be called. The following shows example code where the issue was noticed:
> {code:java}
> // Set up stream
> SingleOutputStreamOperator forces = ...
> .filter(new FilterForcesFunction())
> .process(new ProcessForcesFunction());
> // Define mock pattern
> Pattern forcesMock = Pattern.begin("start").where(new 
> SimpleCondition() {
> @Override
> public boolean filter(ForceZ value) {
> // This is called as expected
> return true;
> }
> });
> // Print pattern results
> // This actually prints all incoming events as expected
> CEP.pattern(forcesMock, mock)
> .select(new PatternSelectFunction() {
> @Override
> public ForceZ select(Map> pattern){
> return pattern.get("start").get(0);
> }
> }).print();
> // Create another stream based on a sliding window over the input stream
> SingleOutputStreamOperator intervals = forces
> .countWindowAll(2, 1)
> .process(new ForceWindowFunction());
> // Define mock pattern
> Pattern intervalMock = 
> Pattern.begin("start").where(new SimpleCondition() {
> @Override
> public boolean filter(Interval value) throws Exception {
> // This is never called
> return true;
> }
> });
> // Print pattern results
> // Doesn't print anything since the mock condition is never called
> CEP.pattern(intervals, intervalMock)
> .select(new PatternSelectFunction() {
> @Override
> public Interval select(Map> pattern) 
> throws Exception {
> return pattern.get("start").get(0);
> }
> }).print();
> {code}
> Either I'm doing something wrong or this is a major bug.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9168) Pulsar Sink Connector

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504826#comment-16504826
 ] 

ASF GitHub Bot commented on FLINK-9168:
---

Github user surryr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5845#discussion_r193793061
  
--- Diff: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
 ---
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pulsar;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import org.apache.flink.util.SerializableObject;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Flink Sink to produce data into a Pulsar topic.
+ */
+public class FlinkPulsarProducer
+   extends RichSinkFunction
+   implements CheckpointedFunction {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkPulsarProducer.class);
+
+   /**
+* The pulsar service url.
+*/
+   protected final String serviceUrl;
+
+   /**
+* User defined configuration for the producer.
+*/
+   protected final ProducerConfiguration producerConfig;
+
+   /**
+* The name of the default topic this producer is writing data to.
+*/
+   protected final String defaultTopicName;
+
+   /**
+* (Serializable) SerializationSchema for turning objects used with 
Flink into.
+* byte[] for Pulsar.
+*/
+   protected final SerializationSchema schema;
+
+   /**
+* User-provided key extractor for assigning a key to a pulsar message.
+*/
+   protected final PulsarKeyExtractor flinkPulsarKeyExtractor;
+
+   /**
+* Produce Mode.
+*/
+   protected PulsarProduceMode produceMode = 
PulsarProduceMode.AT_LEAST_ONE;
+
+   /**
+* If true, the producer will wait until all outstanding records have 
been send to the broker.
+*/
+   protected boolean flushOnCheckpoint;
+
+   //  Runtime fields 
--
+
+   /** Pulsar Producer instance. */
+   protected transient Producer producer;
+
+   /** The callback than handles error propagation or logging callbacks. */
+   protected transient Function successCallback = 
msgId -> {
+   acknowledgeMessage();
+   return msgId;
+   };
+
+   protected transient Function failureCallback;
+
+   /** Errors encountered in the async producer are stored here. */
+   protected transient volatile Exception asyncException;
+
+   /** Lock for accessing the pending records. */
+   protected final SerializableObject pendingRecordsLock = new 
SerializableObject();
+

[GitHub] flink pull request #5845: [FLINK-9168][flink-connectors]Pulsar Sink connecto...

2018-06-07 Thread surryr
Github user surryr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5845#discussion_r193793061
  
--- Diff: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
 ---
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pulsar;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
+import org.apache.flink.util.SerializableObject;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Flink Sink to produce data into a Pulsar topic.
+ */
+public class FlinkPulsarProducer
+   extends RichSinkFunction
+   implements CheckpointedFunction {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkPulsarProducer.class);
+
+   /**
+* The pulsar service url.
+*/
+   protected final String serviceUrl;
+
+   /**
+* User defined configuration for the producer.
+*/
+   protected final ProducerConfiguration producerConfig;
+
+   /**
+* The name of the default topic this producer is writing data to.
+*/
+   protected final String defaultTopicName;
+
+   /**
+* (Serializable) SerializationSchema for turning objects used with 
Flink into.
+* byte[] for Pulsar.
+*/
+   protected final SerializationSchema schema;
+
+   /**
+* User-provided key extractor for assigning a key to a pulsar message.
+*/
+   protected final PulsarKeyExtractor flinkPulsarKeyExtractor;
+
+   /**
+* Produce Mode.
+*/
+   protected PulsarProduceMode produceMode = 
PulsarProduceMode.AT_LEAST_ONE;
+
+   /**
+* If true, the producer will wait until all outstanding records have 
been send to the broker.
+*/
+   protected boolean flushOnCheckpoint;
+
+   //  Runtime fields 
--
+
+   /** Pulsar Producer instance. */
+   protected transient Producer producer;
+
+   /** The callback than handles error propagation or logging callbacks. */
+   protected transient Function successCallback = 
msgId -> {
+   acknowledgeMessage();
+   return msgId;
+   };
+
+   protected transient Function failureCallback;
+
+   /** Errors encountered in the async producer are stored here. */
+   protected transient volatile Exception asyncException;
+
+   /** Lock for accessing the pending records. */
+   protected final SerializableObject pendingRecordsLock = new 
SerializableObject();
+
+   /** Number of unacknowledged records. */
+   protected long pendingRecords;
+
+   public FlinkPulsarProducer(String serviceUrl,
+   String defaultTopicName,
+  

[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-07 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504820#comment-16504820
 ] 

swy commented on FLINK-9506:


thank for response [~sihuazhou], the key length is around 50 chars. We will 
change to hashCode as suggested and test again :)

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9547) CEP pattern not called on windowed stream

2018-06-07 Thread Lucas Resch (JIRA)
Lucas Resch created FLINK-9547:
--

 Summary: CEP pattern not called on windowed stream
 Key: FLINK-9547
 URL: https://issues.apache.org/jira/browse/FLINK-9547
 Project: Flink
  Issue Type: Bug
  Components: CEP
Affects Versions: 1.5.0, 1.3.2
Reporter: Lucas Resch


When trying to match a pattern on a stream that was windowed the pattern will 
not be called. The following shows example code where the issue was noticed:
{code:java}
// Set up stream
SingleOutputStreamOperator forces = ...
.filter(new FilterForcesFunction())
.process(new ProcessForcesFunction());

// Define mock pattern
Pattern forcesMock = Pattern.begin("start").where(new 
SimpleCondition() {
@Override
public boolean filter(ForceZ value) {
// This is called as expected
return true;
}
});

// Print pattern results
// This actually prints all incoming events as expected
CEP.pattern(forcesMock, mock)
.select(new PatternSelectFunction() {
@Override
public ForceZ select(Map> pattern){
return pattern.get("start").get(0);
}
}).print();

// Create another stream based on a sliding window over the input stream
SingleOutputStreamOperator intervals = forces
.countWindowAll(2, 1)
.process(new ForceWindowFunction());

// Define mock pattern
Pattern intervalMock = 
Pattern.begin("start").where(new SimpleCondition() {
@Override
public boolean filter(Interval value) throws Exception {
// This is never called
return true;
}
});

// Print pattern results
// Doesn't print anything since the mock condition is never called
CEP.pattern(intervals, intervalMock)
.select(new PatternSelectFunction() {
@Override
public Interval select(Map> pattern) throws 
Exception {
return pattern.get("start").get(0);
}
}).print();
{code}
Either I'm doing something wrong or this is a major bug.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-07 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504729#comment-16504729
 ] 

Sihua Zhou commented on FLINK-9506:
---

Hi [~yow] From the top of my head, I list answers here:

- >> 1. Just to confirm, RocksDB is needed to setup in every TM machine? Any 
other option?

RocksDB is needed to setup in every sub-tasks that use the KeyedState if you 
are using RocksDB backend.

- >> 2. What is the recommendation for RocksDB's statebackend? We are using 
tmpfs with checkpoint now with savepoint persists to hdfs.

Q1. I think the default configuration of the RocksDB backend is quite good for 
the most of the jobs.
Q2. I'm not sure whether I got you correctly, the savepoint is triggered 
manually, and checkpoint is triggered automatically, you means that you trigger 
the savepoint manually periodically?

- >> 3. By source code, rocksdb options like parallelism and certain predefined 
option could be configured, any corresponding parameter in flink_config.yaml?

AFAIK, RocksDB's options need to set in source code if you need to special it. 
The default parallelism of the operator can be configured in flink-conf.yaml

- >> 4. related to your RocksDB config.

I see you are using "file:///tmp/rocksdb_simple_example/checkpoints" as the 
checkpoint directory, I'm not sure if it's accessible to all TMs. If yes, I 
think that is ok, and also I didn't see your checkpoint interval...

BTW, you said you are using the {{r.getUNIQUE_KEY();}}  as the key, I'm a bit 
curious about it's length in general. If it's too long and if you don't need an 
exactly result, you could use the {{r.getUNIQUE_KEY().hashCode();}} instead, 
that may also help to improve the performance. And in fact, I also agree with 
[~kkrugler] that this type of question is best asked in the user mail list, 
that way more people could take part in and you might also get more ideals from 
them. ;)



> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9262) KeyedOneInputStreamOperatorTestHarness throws NPE creating snapshot

2018-06-07 Thread Aljoscha Krettek (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504725#comment-16504725
 ] 

Aljoscha Krettek commented on FLINK-9262:
-

What dependencies do you have set in your pom?

I think you need
{code}

 org.apache.flink
 flink-test-utils-junit
 ${flink.version}



 org.mockito
 mockito-all
 1.10.19
 jar
 test



  org.apache.flink
  flink-streaming-java_2.11
  ${flink.version}
  test
  test-jar



  org.apache.flink
  flink-runtime_2.11
  ${flink.version}
  test
  test-jar


{code}

> KeyedOneInputStreamOperatorTestHarness throws NPE creating snapshot
> ---
>
> Key: FLINK-9262
> URL: https://issues.apache.org/jira/browse/FLINK-9262
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming, Tests
>Affects Versions: 1.4.0
> Environment: macOS X High Sierra 10.13.4
> (ancient) Eclipse Luna v.4.4.1 
> JRE System Library [Java SE 8 [1.8.0_131]]
> Java 8 Update 171 build 11
>Reporter: Chris Schneider
>Priority: Blocker
>
> Although KeyedOneInputStreamOperatorTestHarness and other 
> AbstractStreamOperatorTestHarness subclasses are not yet part of the public 
> Flink API, we have been trying to make use of them for unit testing our map 
> functions. The following code throws NPE from the attempt to collect a 
> snapshot on Flink 1.4.0 (even after applying [the 
> fix|https://github.com/apache/flink/pull/5193/commits/ba676d7de5536e32e0c48c3db511bec1758f4e80]
>  for FLINK-8268), but appears to work properly on Flink 1.5-SNAPSHOT:
> {code:java}
> package com.scaleunlimited.flinkcrawler.functions;
> import org.apache.flink.api.common.functions.RichFlatMapFunction;
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.java.functions.KeySelector;
> import org.apache.flink.streaming.api.operators.StreamFlatMap;
> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
> import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
> import org.apache.flink.util.Collector;
> import org.junit.Test;
> public class FlinkIssueTest {
>     
>     @SuppressWarnings("serial")
>     private static class MyProcessFunction extends 
> RichFlatMapFunction {
>     @Override
>     public void flatMap(String input, Collector collector) throws 
> Exception {
>     collector.collect(input);
>     }
>     }
>     
>     @SuppressWarnings({
>     "serial", "hiding"
>     })
>     private static class MyKeySelector implements KeySelector String> {
>     @Override
>     public String getKey(String input) throws Exception {
>     return input;
>     }
>     }
>     @Test
>     public void test() throws Throwable {
>     KeyedOneInputStreamOperatorTestHarness 
> testHarness =
>     new KeyedOneInputStreamOperatorTestHarness String>(
>     new StreamFlatMap<>(new MyProcessFunction()),
>     new MyKeySelector(),
>     BasicTypeInfo.STRING_TYPE_INFO,
>     1,
>     1,
>     0);
>     testHarness.setup();
>     testHarness.open();
>     
>     for (int i = 0; i < 10; i++) {
>     String urlString = String.format("https://domain-%d.com/page1;, 
> i);
>     testHarness.processElement(new StreamRecord<>(urlString));
>     }
>     testHarness.snapshot(0L, 0L);
>     }
> }
> {code}
> Output:
> {noformat}
> java.lang.Exception: Could not complete snapshot 0 for operator MockTask 
> (1/1).
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>     at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
>     at 
> com.scaleunlimited.flinkcrawler.functions.FlinkIssueTest.test(FlinkIssueTest.java:51)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>     at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>     at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>     at 
> 

[jira] [Commented] (FLINK-9540) Apache Flink 1.4.2 S3 Hadoop library for Hadoop 2.7 is built for Hadoop 2.8 and fails

2018-06-07 Thread Razvan (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504723#comment-16504723
 ] 

Razvan commented on FLINK-9540:
---

Hi [~aljoscha],

  Not sure I understand the question :) So what I'm suggesting is 
flink-s3-fs-hadoop-1.4.2.jar delivered as built for Hadoop 2.7 is actually 
built for Hadoop 2.8 and Apache Flink cluster won't start with error mentioned 
above. These jars require a lot of other jars (which list is not accurate in 
documentation btw) to enable Apache Flink to work with S3.

> Apache Flink 1.4.2 S3 Hadoop library for Hadoop 2.7 is built for Hadoop 2.8 
> and fails
> -
>
> Key: FLINK-9540
> URL: https://issues.apache.org/jira/browse/FLINK-9540
> Project: Flink
>  Issue Type: Bug
>Reporter: Razvan
>Priority: Blocker
>
> Download Apache Flink 1.4.2 for Hadoop 2.7 or earlier. Go to opt/ inside 
> flink-s3-fs-hadoop-1.4.2.jar, open common version info .properties inside you 
> get:
>  
> #
>  # Licensed to the Apache Software Foundation (ASF) under one
>  # or more contributor license agreements. See the NOTICE file
>  # distributed with this work for additional information
>  # regarding copyright ownership. The ASF licenses this file
>  # to you under the Apache License, Version 2.0 (the
>  # "License"); you may not use this file except in compliance
>  # with the License. You may obtain a copy of the License at
>  #
>  # [http://www.apache.org/licenses/LICENSE-2.0]
>  #
>  # Unless required by applicable law or agreed to in writing, software
>  # distributed under the License is distributed on an "AS IS" BASIS,
>  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  # See the License for the specific language governing permissions and
>  # limitations under the License.
>  #
> *version=2.8.1*
>  revision=1e6296df38f9cd3d9581c8af58a2a03a6e4312be
>  *branch=branch-2.8.1-private*
>  user=vinodkv
>  date=2017-06-07T21:22Z
>  *url=[https://git-wip-us.apache.org/repos/asf/hadoop.git*]
>  srcChecksum=60125541c2b3e266cbf3becc5bda666
>  protocVersion=2.5.0
>  
> This will fail to work with dependencies described in 
> ([https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html)]
>  
> "Depending on which file system you use, please add the following 
> dependencies. You can find these as part of the Hadoop binaries in 
> {{hadoop-2.7/share/hadoop/tools/lib}}:
>  * {{S3AFileSystem}}:
>  ** {{hadoop-aws-2.7.3.jar}}
>  ** {{aws-java-sdk-s3-1.11.183.jar}} and its dependencies:
>  *** {{aws-java-sdk-core-1.11.183.jar}}
>  *** {{aws-java-sdk-kms-1.11.183.jar}}
>  *** {{jackson-annotations-2.6.7.jar}}
>  *** {{jackson-core-2.6.7.jar}}
>  *** {{jackson-databind-2.6.7.jar}}
>  *** {{joda-time-2.8.1.jar}}
>  *** {{httpcore-4.4.4.jar}}
>  *** {{httpclient-4.5.3.jar}}
>  * {{NativeS3FileSystem}}:
>  ** {{hadoop-aws-2.7.3.jar}}
>  ** {{guava-11.0.2.jar"}}{{}}
>  
> {{  I presume the build task is flawed, it should be built for Apache Hadoop 
> 2.7 can someone please check it?}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504685#comment-16504685
 ] 

ASF GitHub Bot commented on FLINK-9444:
---

Github user tragicjun commented on the issue:

https://github.com/apache/flink/pull/6082
  
The latest commit uses **Types.OBJECT_ARRAY** to map Avro array type. 
Hence, Avro **GenericData.Array** has to be converted into regular java arrays 
back (see **AvroRowSerializationSchema**) and forth(see 
**AvroRowDeserializationSchema**). Moreover, nested record within Avro 
map/array is also supported.

The unit tests and my local integration tests have passed. Would you please 
review? @fhueske @twalthr @suez1224 


> KafkaAvroTableSource failed to work for map and array fields
> 
>
> Key: FLINK-9444
> URL: https://issues.apache.org/jira/browse/FLINK-9444
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Priority: Blocker
>  Labels: patch
> Fix For: 1.6.0
>
> Attachments: flink-9444.patch
>
>
> When some Avro schema has map/array fields and the corresponding TableSchema 
> declares *MapTypeInfo/ListTypeInfo* for these fields, an exception will be 
> thrown when registering the *KafkaAvroTableSource*, complaining like:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Type Map of table field 'event' does not match with type 
> GenericType of the field 'event' of the TableSource return 
> type.
>  at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71)
>  at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71)
>  at 
> org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:124)
>  at 
> org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:438)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...

2018-06-07 Thread tragicjun
Github user tragicjun commented on the issue:

https://github.com/apache/flink/pull/6082
  
The latest commit uses **Types.OBJECT_ARRAY** to map Avro array type. 
Hence, Avro **GenericData.Array** has to be converted into regular java arrays 
back (see **AvroRowSerializationSchema**) and forth(see 
**AvroRowDeserializationSchema**). Moreover, nested record within Avro 
map/array is also supported.

The unit tests and my local integration tests have passed. Would you please 
review? @fhueske @twalthr @suez1224 


---


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-07 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504675#comment-16504675
 ] 

swy commented on FLINK-9506:


[~sihuazhou] your idea is brilliant, but the first test result is not show too 
much of change surprisingly. Let's us do more test to confirm. But thank you! 
Do you mind to answer my questions above regarding to Rocksdb setup? I believe 
it is crucial in this performance test.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9503) Migrate integration tests for iterative aggregators

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504535#comment-16504535
 ] 

ASF GitHub Bot commented on FLINK-9503:
---

Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6129#discussion_r193707442
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
 ---
@@ -170,12 +162,12 @@ public void testAggregatorWithParameterForIterate() 
throws Exception {
new NegativeElementsConvergenceCriterion());
 
DataSet updatedDs = iteration.map(new 
SubtractOneMapWithParam());
-   iteration.closeWith(updatedDs).writeAsText(resultPath);
-   env.execute();
+   List result = iteration.closeWith(updatedDs).collect();
+   Collections.sort(result);
--- End diff --

@zentol the result need be sorted, otherwise the test would report error, 
because the result is not ordered, print this : 

```
java.lang.AssertionError: 
Expected :[1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5, 5]
Actual   :[4, 2, 3, 5, 2, 5, 5, 5, 1, 3, 4, 4, 3, 5, 4]
```


> Migrate integration tests for iterative aggregators
> ---
>
> Key: FLINK-9503
> URL: https://issues.apache.org/jira/browse/FLINK-9503
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Deepak Sharma
>Assignee: vinoyang
>Priority: Minor
>
> Migrate integration tests in org.apache.flink.test.iterative.aggregators to 
> use collect() instead of temp files. Related to parent jira.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6129: [FLINK-9503] Migrate integration tests for iterati...

2018-06-07 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6129#discussion_r193707442
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
 ---
@@ -170,12 +162,12 @@ public void testAggregatorWithParameterForIterate() 
throws Exception {
new NegativeElementsConvergenceCriterion());
 
DataSet updatedDs = iteration.map(new 
SubtractOneMapWithParam());
-   iteration.closeWith(updatedDs).writeAsText(resultPath);
-   env.execute();
+   List result = iteration.closeWith(updatedDs).collect();
+   Collections.sort(result);
--- End diff --

@zentol the result need be sorted, otherwise the test would report error, 
because the result is not ordered, print this : 

```
java.lang.AssertionError: 
Expected :[1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5, 5]
Actual   :[4, 2, 3, 5, 2, 5, 5, 5, 1, 3, 4, 4, 3, 5, 4]
```


---


[jira] [Commented] (FLINK-9503) Migrate integration tests for iterative aggregators

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504518#comment-16504518
 ] 

ASF GitHub Bot commented on FLINK-9503:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6129#discussion_r193702401
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
 ---
@@ -170,12 +162,12 @@ public void testAggregatorWithParameterForIterate() 
throws Exception {
new NegativeElementsConvergenceCriterion());
 
DataSet updatedDs = iteration.map(new 
SubtractOneMapWithParam());
-   iteration.closeWith(updatedDs).writeAsText(resultPath);
-   env.execute();
+   List result = iteration.closeWith(updatedDs).collect();
+   Collections.sort(result);
--- End diff --

still some sort calls left.


> Migrate integration tests for iterative aggregators
> ---
>
> Key: FLINK-9503
> URL: https://issues.apache.org/jira/browse/FLINK-9503
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Deepak Sharma
>Assignee: vinoyang
>Priority: Minor
>
> Migrate integration tests in org.apache.flink.test.iterative.aggregators to 
> use collect() instead of temp files. Related to parent jira.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6129: [FLINK-9503] Migrate integration tests for iterati...

2018-06-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6129#discussion_r193702401
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
 ---
@@ -170,12 +162,12 @@ public void testAggregatorWithParameterForIterate() 
throws Exception {
new NegativeElementsConvergenceCriterion());
 
DataSet updatedDs = iteration.map(new 
SubtractOneMapWithParam());
-   iteration.closeWith(updatedDs).writeAsText(resultPath);
-   env.execute();
+   List result = iteration.closeWith(updatedDs).collect();
+   Collections.sort(result);
--- End diff --

still some sort calls left.


---


[jira] [Commented] (FLINK-9503) Migrate integration tests for iterative aggregators

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504515#comment-16504515
 ] 

ASF GitHub Bot commented on FLINK-9503:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6129#discussion_r193701993
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
 ---
@@ -117,7 +107,9 @@ public boolean filter(Long value) throws Exception {
}
}).withBroadcastSet(solution, "SOLUTION")).output(new 
DiscardingOutputFormat());
env.execute();
-   expected = testString; // this will be a useless verification 
now.
+   String expected = testString; // this will be a useless 
verification now.
+
+   compareResultsByLinesInMemory(expected, resultPath);
--- End diff --

this is unnecessary and can be removed as the actual check is done in the 
function.


> Migrate integration tests for iterative aggregators
> ---
>
> Key: FLINK-9503
> URL: https://issues.apache.org/jira/browse/FLINK-9503
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Deepak Sharma
>Assignee: vinoyang
>Priority: Minor
>
> Migrate integration tests in org.apache.flink.test.iterative.aggregators to 
> use collect() instead of temp files. Related to parent jira.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6129: [FLINK-9503] Migrate integration tests for iterati...

2018-06-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6129#discussion_r193701993
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
 ---
@@ -117,7 +107,9 @@ public boolean filter(Long value) throws Exception {
}
}).withBroadcastSet(solution, "SOLUTION")).output(new 
DiscardingOutputFormat());
env.execute();
-   expected = testString; // this will be a useless verification 
now.
+   String expected = testString; // this will be a useless 
verification now.
+
+   compareResultsByLinesInMemory(expected, resultPath);
--- End diff --

this is unnecessary and can be removed as the actual check is done in the 
function.


---


[jira] [Commented] (FLINK-9538) Make KeyedStateFunction an interface

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504512#comment-16504512
 ] 

ASF GitHub Bot commented on FLINK-9538:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6134
  
cc @dawidwys 


> Make KeyedStateFunction an interface
> 
>
> Key: FLINK-9538
> URL: https://issues.apache.org/jira/browse/FLINK-9538
> Project: Flink
>  Issue Type: Improvement
>Reporter: Dawid Wysakowicz
>Assignee: vinoyang
>Priority: Major
>
> I suggest to change the KeyedStateFunction from abstract class to interface 
> (FunctionalInterface in particular) to enable passing lambdas.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6134: [FLINK-9538] Make KeyedStateFunction an interface

2018-06-07 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6134
  
cc @dawidwys 


---


[jira] [Commented] (FLINK-9528) Incorrect results: Filter does not treat Upsert messages correctly.

2018-06-07 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504489#comment-16504489
 ] 

Fabian Hueske commented on FLINK-9528:
--

True, highly-selective filters would definitely be an issue. However, most 
operators could handle duplicates quite well and would not need to have them 
filtered because they have the necessary state anyway. Only sinks are affected 
by this. Why not add a duplicate filter for upsert sinks? This would keep the 
filter stateless and efficient.

The duplicate filter for the upsert sink would need a boolean flag per key, and 
the number of keys depends on the selectivity of the filter, i.e., the size of 
the state is proportional to the gains. Later we could improve it by using a 
probabilistic filter (proposed in FLINK-8601).

> Incorrect results: Filter does not treat Upsert messages correctly.
> ---
>
> Key: FLINK-9528
> URL: https://issues.apache.org/jira/browse/FLINK-9528
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.3, 1.5.0, 1.4.2
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Critical
>
> Currently, Filters (i.e., Calcs with predicates) do not distinguish between 
> retraction and upsert mode. A Calc looks at record (regardless of its update 
> semantics) and either discard it (predicate evaluates to false) or pass it on 
> (predicate evaluates to true).
> This works fine for messages with retraction semantics but is not correct for 
> upsert messages.
> The following test case (can be pasted into {{TableSinkITCase}}) shows the 
> problem:
> {code:java}
>   @Test
>   def testUpsertsWithFilter(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.getConfig.enableObjectReuse()
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val t = StreamTestData.get3TupleDataStream(env)
>   .assignAscendingTimestamps(_._1.toLong)
>   .toTable(tEnv, 'id, 'num, 'text)
> t.select('text.charLength() as 'len)
>   .groupBy('len)
>   .select('len, 'len.count as 'cnt)
>   // .where('cnt < 7)
>   .writeToSink(new TestUpsertSink(Array("len"), false))
> env.execute()
> val results = RowCollector.getAndClearValues
> val retracted = RowCollector.upsertResults(results, Array(0)).sorted
> val expectedWithoutFilter = List(
>   "2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted
> val expectedWithFilter = List(
> "2,1", "5,1", "11,1", "14,1", "25,1").sorted
> assertEquals(expectedWithoutFilter, retracted)
> // assertEquals(expectedWithFilter, retracted)
>   }
> {code}
> When we add a filter on the aggregation result, we would expect that all rows 
> that do not fulfill the condition are removed from the result. However, the 
> filter only removes the upsert message such that the previous version remains 
> in the result.
> One solution could be to make a filter aware of the update semantics (retract 
> or upsert) and convert the upsert message into a delete message if the 
> predicate evaluates to false.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9528) Incorrect results: Filter does not treat Upsert messages correctly.

2018-06-07 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504471#comment-16504471
 ] 

Hequn Cheng commented on FLINK-9528:


Hi, [~fhueske]. Considering the anti-spamming scenario, users probably just 
want to get top 1% data from the result of group by, the rest 99% of the data 
will all become delete messages after the {{Filter}}. This would be a disaster 
for a storage. Especailly for the case, most of the coming keys are new ones 
and can not be swallowed by a cache.

Adding a GroupByHaving operator is a good way but this only cover the case that 
filter can be pushed down into group by. If a filter can not be pushed down, we 
can turn the upsert stream into a retract stream to ensure correctness(or throw 
exception and inform user to use {{RetractTableSink}}). What do you think?

> Incorrect results: Filter does not treat Upsert messages correctly.
> ---
>
> Key: FLINK-9528
> URL: https://issues.apache.org/jira/browse/FLINK-9528
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.3, 1.5.0, 1.4.2
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Critical
>
> Currently, Filters (i.e., Calcs with predicates) do not distinguish between 
> retraction and upsert mode. A Calc looks at record (regardless of its update 
> semantics) and either discard it (predicate evaluates to false) or pass it on 
> (predicate evaluates to true).
> This works fine for messages with retraction semantics but is not correct for 
> upsert messages.
> The following test case (can be pasted into {{TableSinkITCase}}) shows the 
> problem:
> {code:java}
>   @Test
>   def testUpsertsWithFilter(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.getConfig.enableObjectReuse()
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val t = StreamTestData.get3TupleDataStream(env)
>   .assignAscendingTimestamps(_._1.toLong)
>   .toTable(tEnv, 'id, 'num, 'text)
> t.select('text.charLength() as 'len)
>   .groupBy('len)
>   .select('len, 'len.count as 'cnt)
>   // .where('cnt < 7)
>   .writeToSink(new TestUpsertSink(Array("len"), false))
> env.execute()
> val results = RowCollector.getAndClearValues
> val retracted = RowCollector.upsertResults(results, Array(0)).sorted
> val expectedWithoutFilter = List(
>   "2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted
> val expectedWithFilter = List(
> "2,1", "5,1", "11,1", "14,1", "25,1").sorted
> assertEquals(expectedWithoutFilter, retracted)
> // assertEquals(expectedWithFilter, retracted)
>   }
> {code}
> When we add a filter on the aggregation result, we would expect that all rows 
> that do not fulfill the condition are removed from the result. However, the 
> filter only removes the upsert message such that the previous version remains 
> in the result.
> One solution could be to make a filter aware of the update semantics (retract 
> or upsert) and convert the upsert message into a delete message if the 
> predicate evaluates to false.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9545) Support read a file multiple times in Flink DataStream

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504452#comment-16504452
 ] 

ASF GitHub Bot commented on FLINK-9545:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/6130
  
I posted on the Jira issue: 
https://issues.apache.org/jira/browse/FLINK-9545?focusedCommentId=16504451=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16504451:
 What's the motivation for this feature?


> Support read a file multiple times in Flink DataStream 
> ---
>
> Key: FLINK-9545
> URL: https://issues.apache.org/jira/browse/FLINK-9545
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.6.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.6.0
>
>
> we need {{StreamExecutionEnvironment.readFile/readTextFile}} to read each 
> file for N times, but currently it only supports reading file once.
> add support for the feature.
> Plan:
> add a new processing mode as PROCESSING_N_TIMES, and add additional parameter 
> {{numTimes}} for {{StreamExecutionEnvironment.readFile/readTextFile}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-07 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503505#comment-16503505
 ] 

Sihua Zhou edited comment on FLINK-9506 at 6/7/18 9:25 AM:
---

[~yow] Maybe there is one more optimization that could have a try, I see you 
are using the ReduceState in your code just to accumulate the 
`record.getInt("I_PRIMARY_UNITS")` and collect the result in `onTimer()`. For 
the ReduceState it works as follows:

- get the "old result" from RocksDB.
- reduce the "old result" with the input, and put the "new result" back to 
RocksDB.

that means for input record in processElement(), it needs to do a `get` and a 
`put` to RocksDB. And the `get` cost much more then `put`. I would suggest to 
use the ListState instead. With using ListState, what you need to do are:

- Performing {{ListState.add(record)}} in {{processElement()}}, since the 
`ListState.add()` is cheap as it only put the record into Rocks.
- Performing reducing in {{OnTimer()}}, the reducing might look as follow:
{code:java}
List< JSONObject> records = listState.get();
for (JSonObject jsonObj : records) {
// do accumulation
}
out.collect(result);
{code}

In this way, for every key every second, you only need to do one read operation 
of RocksDB.





was (Author: sihuazhou):
[~yow] Maybe there is one more optimization that could have a try, I see you 
are using the ReduceState in your code just to accumulate the 
`record.getInt("I_PRIMARY_UNITS")` and collect the result in `onTimer()`. For 
the ReduceState it works as follows:

- get the "old result" from RocksDB.
- reduce the "old result" with the input, and put the "new result" back to 
RocksDB.

that means for input record in processElement(), it needs to do a `get` and a 
`put` to RocksDB. And the `get` cost much more then `put`. I would suggest to 
use the ListState instead. With using ListState, what you need to do are:

- Performing {{ListState.add(record)}} in {{processElement()}}, since the 
`ListState.add()` is cheap as it not put the record into Rocks.
- Performing reducing in {{OnTimer()}}, the reducing might look as follow:
{code:java}
List< JSONObject> records = listState.get();
for (JSonObject jsonObj : records) {
// do accumulation
}
out.collect(result);
{code}

In this way, for every key very seconds, you only need to do one read operation 
of RocksDB.




> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6130: [FLINK-9545] Support read a file multiple times in Flink ...

2018-06-07 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/6130
  
I posted on the Jira issue: 
https://issues.apache.org/jira/browse/FLINK-9545?focusedCommentId=16504451=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16504451:
 What's the motivation for this feature?


---


[jira] [Commented] (FLINK-9545) Support read a file multiple times in Flink DataStream

2018-06-07 Thread Aljoscha Krettek (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504451#comment-16504451
 ] 

Aljoscha Krettek commented on FLINK-9545:
-

[~phoenixjiangnan] What is the motivation for this feature?

> Support read a file multiple times in Flink DataStream 
> ---
>
> Key: FLINK-9545
> URL: https://issues.apache.org/jira/browse/FLINK-9545
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.6.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.6.0
>
>
> we need {{StreamExecutionEnvironment.readFile/readTextFile}} to read each 
> file for N times, but currently it only supports reading file once.
> add support for the feature.
> Plan:
> add a new processing mode as PROCESSING_N_TIMES, and add additional parameter 
> {{numTimes}} for {{StreamExecutionEnvironment.readFile/readTextFile}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8067) User code ClassLoader not set before calling ProcessingTimeCallback

2018-06-07 Thread Aljoscha Krettek (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8067:

Fix Version/s: 1.6.0

> User code ClassLoader not set before calling ProcessingTimeCallback
> ---
>
> Key: FLINK-8067
> URL: https://issues.apache.org/jira/browse/FLINK-8067
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: vinoyang
>Priority: Minor
> Fix For: 1.6.0, 1.5.1
>
>
> The user code ClassLoader is not set as the context ClassLoader for the 
> thread invoking {{ProcessingTimeCallback#onProcessingTime(long timestamp)}}:
> https://github.com/apache/flink/blob/84a07a34ac22af14f2dd0319447ca5f45de6d0bb/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L222
> This is problematic, for example, if user code dynamically loads classes in 
> {{ProcessFunction#onTimer(long timestamp, OnTimerContext ctx, Collector 
> out)}} using the current thread's context ClassLoader (also see FLINK-8005).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9482) Not applicable functions for TIME

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504439#comment-16504439
 ] 

ASF GitHub Bot commented on FLINK-9482:
---

Github user bioker commented on a diff in the pull request:

https://github.com/apache/flink/pull/6121#discussion_r193677135
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
 ---
@@ -98,16 +98,45 @@ class ScalarFunctionsValidationTest extends 
ScalarTypesTestBase {
 testSqlApi("TIMESTAMPADD(YEAR, 1.0, timestamp '2016-02-24 12:42:25')", 
"2016-06-16")
   }
 
-  @Test(expected = classOf[CodeGenException])
+  @Test(expected = classOf[ValidationException])
   def testDOWWithTimeWhichIsUnsupported(): Unit = {
 testSqlApi("EXTRACT(DOW FROM TIME '12:42:25')", "0")
   }
 
-  @Test(expected = classOf[CodeGenException])
+  @Test(expected = classOf[ValidationException])
   def testDOYWithTimeWhichIsUnsupported(): Unit = {
 testSqlApi("EXTRACT(DOY FROM TIME '12:42:25')", "0")
   }
 
+  def testExtractFromTimeZeroResult(unit: TimeUnit): Unit = {
--- End diff --

Fixed


> Not applicable functions for TIME
> -
>
> Key: FLINK-9482
> URL: https://issues.apache.org/jira/browse/FLINK-9482
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Viktor Vlasov
>Assignee: Viktor Vlasov
>Priority: Minor
>
> Due to work on https://issues.apache.org/jira/browse/FLINK-9432 I have faced 
> with question how to check DECADE function with tests in
> _org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala._
>  
> Because I have used CENTURY function as an example, first of all I have check 
> it. During the test I figured out that when we use it with TIME it returns 0.
> I suppose arguments for such functions (also it works for YEAR, MONTH, 
> MILLENNIUM, etc) need to be checked and throw some exception if type is not 
> suitable.
> As an example, in Apache Calcite project (checked in sqlline shell), when I 
> am trying to use CENTURY with TIME it throw:
> {code:java}
> java.lang.AssertionError: unexpected TIME
> {code}
> Need to determine, why such check is not exists and add it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6121: [FLINK-9482] [table] EXTRACT function argument val...

2018-06-07 Thread bioker
Github user bioker commented on a diff in the pull request:

https://github.com/apache/flink/pull/6121#discussion_r193677135
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
 ---
@@ -98,16 +98,45 @@ class ScalarFunctionsValidationTest extends 
ScalarTypesTestBase {
 testSqlApi("TIMESTAMPADD(YEAR, 1.0, timestamp '2016-02-24 12:42:25')", 
"2016-06-16")
   }
 
-  @Test(expected = classOf[CodeGenException])
+  @Test(expected = classOf[ValidationException])
   def testDOWWithTimeWhichIsUnsupported(): Unit = {
 testSqlApi("EXTRACT(DOW FROM TIME '12:42:25')", "0")
   }
 
-  @Test(expected = classOf[CodeGenException])
+  @Test(expected = classOf[ValidationException])
   def testDOYWithTimeWhichIsUnsupported(): Unit = {
 testSqlApi("EXTRACT(DOY FROM TIME '12:42:25')", "0")
   }
 
+  def testExtractFromTimeZeroResult(unit: TimeUnit): Unit = {
--- End diff --

Fixed


---


[jira] [Commented] (FLINK-9546) The heartbeatTimeoutIntervalMs of HeartbeatMonitor should be larger than 0

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504408#comment-16504408
 ] 

ASF GitHub Bot commented on FLINK-9546:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6135
  
cc @tillrohrmann 


> The heartbeatTimeoutIntervalMs of HeartbeatMonitor should be larger than 0
> --
>
> Key: FLINK-9546
> URL: https://issues.apache.org/jira/browse/FLINK-9546
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Minor
>
> The heartbeatTimeoutIntervalMs of HeartbeatMonitor should be larger than 0, 
> currently the arg check looks like
> {code:java}
> Preconditions.checkArgument(heartbeatTimeoutIntervalMs >= 0L, "The heartbeat 
> timeout interval has to be larger than 0.");
> {code}
> it should be
> {code:java}
> Preconditions.checkArgument(heartbeatTimeoutIntervalMs > 0L, "The heartbeat 
> timeout interval has to be larger than 0.");
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6135: [FLINK-9546][core] Fix the checking of heartbeatTi...

2018-06-07 Thread sihuazhou
GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/6135

[FLINK-9546][core] Fix the checking of heartbeatTimeoutIntervalMs in 
HeartbeatMonitor.

## What is the purpose of the change

The heartbeatTimeoutIntervalMs of HeartbeatMonitor should be larger than 0, 
currently the arg check looks like

```java
Preconditions.checkArgument(heartbeatTimeoutIntervalMs >= 0L, "The 
heartbeat timeout interval has to be larger than 0.");
```

it should be

```java
Preconditions.checkArgument(heartbeatTimeoutIntervalMs > 0L, "The heartbeat 
timeout interval has to be larger than 0.");
```

## Brief change log

  - *Fix the checking of heartbeatTimeoutIntervalMs in HeartbeatMonitor.*


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

No

## Documentation

No

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink FLINK-9546

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6135.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6135


commit 5b7fab68afa434014308f2994a4ece89681c596d
Author: sihuazhou 
Date:   2018-06-07T08:26:20Z

fix the checking of heartbeatTimeoutIntervalMs in HeartbeatMonitor.




---


[jira] [Commented] (FLINK-9546) The heartbeatTimeoutIntervalMs of HeartbeatMonitor should be larger than 0

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504407#comment-16504407
 ] 

ASF GitHub Bot commented on FLINK-9546:
---

GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/6135

[FLINK-9546][core] Fix the checking of heartbeatTimeoutIntervalMs in 
HeartbeatMonitor.

## What is the purpose of the change

The heartbeatTimeoutIntervalMs of HeartbeatMonitor should be larger than 0, 
currently the arg check looks like

```java
Preconditions.checkArgument(heartbeatTimeoutIntervalMs >= 0L, "The 
heartbeat timeout interval has to be larger than 0.");
```

it should be

```java
Preconditions.checkArgument(heartbeatTimeoutIntervalMs > 0L, "The heartbeat 
timeout interval has to be larger than 0.");
```

## Brief change log

  - *Fix the checking of heartbeatTimeoutIntervalMs in HeartbeatMonitor.*


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

No

## Documentation

No

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink FLINK-9546

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6135.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6135


commit 5b7fab68afa434014308f2994a4ece89681c596d
Author: sihuazhou 
Date:   2018-06-07T08:26:20Z

fix the checking of heartbeatTimeoutIntervalMs in HeartbeatMonitor.




> The heartbeatTimeoutIntervalMs of HeartbeatMonitor should be larger than 0
> --
>
> Key: FLINK-9546
> URL: https://issues.apache.org/jira/browse/FLINK-9546
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Minor
>
> The heartbeatTimeoutIntervalMs of HeartbeatMonitor should be larger than 0, 
> currently the arg check looks like
> {code:java}
> Preconditions.checkArgument(heartbeatTimeoutIntervalMs >= 0L, "The heartbeat 
> timeout interval has to be larger than 0.");
> {code}
> it should be
> {code:java}
> Preconditions.checkArgument(heartbeatTimeoutIntervalMs > 0L, "The heartbeat 
> timeout interval has to be larger than 0.");
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6135: [FLINK-9546][core] Fix the checking of heartbeatTimeoutIn...

2018-06-07 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6135
  
cc @tillrohrmann 


---


[jira] [Commented] (FLINK-9498) Disable dependency convergence for "flink-end-to-end-tests"

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504406#comment-16504406
 ] 

ASF GitHub Bot commented on FLINK-9498:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/6116
  
I think the problem here is not that some of your versions are conflicting 
with flink, but that your dependencies are on their own conflicting. When I 
check:

> mvn -Dhadoop.version=2.7.0 dependency:tree -pl 
flink-shaded-hadoop/flink-shaded-hadoop2

I do not see any `libthrift` references, and only `commons-lang` comes from 
hadoop itself. You should probably ask the guys maintaining your hadoop 
`2.7.0-xxx` to converge those dependencies (for example pin `commons-lang` to 
2.6 and `libthrift` to 0.9), otherwise bad things can happen randomly.  

I'm not sure how could the solution on Flink side look like. @zentol maybe 
as we discussed previously, we could provide build profiles for some set of 
supported by us hadoop versions. But is there a mechanism for user to provide a 
custom build profile for his own custom hadoop version that solves the 
convergence in `flink-shaded-hadoop2`?


> Disable dependency convergence for "flink-end-to-end-tests"
> ---
>
> Key: FLINK-9498
> URL: https://issues.apache.org/jira/browse/FLINK-9498
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Reporter: Hai Zhou
>Assignee: Hai Zhou
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6116: [FLINK-9498][build] Disable dependency convergence for fl...

2018-06-07 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/6116
  
I think the problem here is not that some of your versions are conflicting 
with flink, but that your dependencies are on their own conflicting. When I 
check:

> mvn -Dhadoop.version=2.7.0 dependency:tree -pl 
flink-shaded-hadoop/flink-shaded-hadoop2

I do not see any `libthrift` references, and only `commons-lang` comes from 
hadoop itself. You should probably ask the guys maintaining your hadoop 
`2.7.0-xxx` to converge those dependencies (for example pin `commons-lang` to 
2.6 and `libthrift` to 0.9), otherwise bad things can happen randomly.  

I'm not sure how could the solution on Flink side look like. @zentol maybe 
as we discussed previously, we could provide build profiles for some set of 
supported by us hadoop versions. But is there a mechanism for user to provide a 
custom build profile for his own custom hadoop version that solves the 
convergence in `flink-shaded-hadoop2`?


---


[jira] [Created] (FLINK-9546) The heartbeatTimeoutIntervalMs of HeartbeatMonitor should be larger than 0

2018-06-07 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9546:
-

 Summary: The heartbeatTimeoutIntervalMs of HeartbeatMonitor should 
be larger than 0
 Key: FLINK-9546
 URL: https://issues.apache.org/jira/browse/FLINK-9546
 Project: Flink
  Issue Type: Bug
  Components: Core
Reporter: Sihua Zhou
Assignee: Sihua Zhou


The heartbeatTimeoutIntervalMs of HeartbeatMonitor should be larger than 0, 
currently the arg check looks like
{code:java}
Preconditions.checkArgument(heartbeatTimeoutIntervalMs >= 0L, "The heartbeat 
timeout interval has to be larger than 0.");
{code}

it should be
{code:java}
Preconditions.checkArgument(heartbeatTimeoutIntervalMs > 0L, "The heartbeat 
timeout interval has to be larger than 0.");
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9482) Not applicable functions for TIME

2018-06-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504403#comment-16504403
 ] 

ASF GitHub Bot commented on FLINK-9482:
---

Github user bioker commented on a diff in the pull request:

https://github.com/apache/flink/pull/6121#discussion_r193663108
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
 ---
@@ -98,16 +98,45 @@ class ScalarFunctionsValidationTest extends 
ScalarTypesTestBase {
 testSqlApi("TIMESTAMPADD(YEAR, 1.0, timestamp '2016-02-24 12:42:25')", 
"2016-06-16")
   }
 
-  @Test(expected = classOf[CodeGenException])
+  @Test(expected = classOf[ValidationException])
   def testDOWWithTimeWhichIsUnsupported(): Unit = {
 testSqlApi("EXTRACT(DOW FROM TIME '12:42:25')", "0")
   }
 
-  @Test(expected = classOf[CodeGenException])
+  @Test(expected = classOf[ValidationException])
   def testDOYWithTimeWhichIsUnsupported(): Unit = {
 testSqlApi("EXTRACT(DOY FROM TIME '12:42:25')", "0")
   }
 
+  def testExtractFromTimeZeroResult(unit: TimeUnit): Unit = {
--- End diff --

yes, sure, haven't noticed at the beginning


> Not applicable functions for TIME
> -
>
> Key: FLINK-9482
> URL: https://issues.apache.org/jira/browse/FLINK-9482
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Viktor Vlasov
>Assignee: Viktor Vlasov
>Priority: Minor
>
> Due to work on https://issues.apache.org/jira/browse/FLINK-9432 I have faced 
> with question how to check DECADE function with tests in
> _org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala._
>  
> Because I have used CENTURY function as an example, first of all I have check 
> it. During the test I figured out that when we use it with TIME it returns 0.
> I suppose arguments for such functions (also it works for YEAR, MONTH, 
> MILLENNIUM, etc) need to be checked and throw some exception if type is not 
> suitable.
> As an example, in Apache Calcite project (checked in sqlline shell), when I 
> am trying to use CENTURY with TIME it throw:
> {code:java}
> java.lang.AssertionError: unexpected TIME
> {code}
> Need to determine, why such check is not exists and add it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6121: [FLINK-9482] [table] EXTRACT function argument val...

2018-06-07 Thread bioker
Github user bioker commented on a diff in the pull request:

https://github.com/apache/flink/pull/6121#discussion_r193663108
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
 ---
@@ -98,16 +98,45 @@ class ScalarFunctionsValidationTest extends 
ScalarTypesTestBase {
 testSqlApi("TIMESTAMPADD(YEAR, 1.0, timestamp '2016-02-24 12:42:25')", 
"2016-06-16")
   }
 
-  @Test(expected = classOf[CodeGenException])
+  @Test(expected = classOf[ValidationException])
   def testDOWWithTimeWhichIsUnsupported(): Unit = {
 testSqlApi("EXTRACT(DOW FROM TIME '12:42:25')", "0")
   }
 
-  @Test(expected = classOf[CodeGenException])
+  @Test(expected = classOf[ValidationException])
   def testDOYWithTimeWhichIsUnsupported(): Unit = {
 testSqlApi("EXTRACT(DOY FROM TIME '12:42:25')", "0")
   }
 
+  def testExtractFromTimeZeroResult(unit: TimeUnit): Unit = {
--- End diff --

yes, sure, haven't noticed at the beginning


---


[GitHub] flink issue #6131: [hotfix][docs] Fix Table API scala example code

2018-06-07 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6131
  
Hi @zjffdu, thanks for the fix!
+1 to merge


---


  1   2   >