[GitHub] flink issue #6136: [FLINK-4303] [CEP] Add CEP examples
Github user kisimple commented on the issue: https://github.com/apache/flink/pull/6136 cc @twalthr ---
[jira] [Commented] (FLINK-4303) Add CEP examples
[ https://issues.apache.org/jira/browse/FLINK-4303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505698#comment-16505698 ] ASF GitHub Bot commented on FLINK-4303: --- Github user kisimple commented on the issue: https://github.com/apache/flink/pull/6136 cc @twalthr > Add CEP examples > > > Key: FLINK-4303 > URL: https://issues.apache.org/jira/browse/FLINK-4303 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Timo Walther >Assignee: blues zheng >Priority: Major > > Neither CEP Java nor CEP Scala contain a runnable example. The example on the > website is also not runnable without adding some additional code. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4303) Add CEP examples
[ https://issues.apache.org/jira/browse/FLINK-4303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505696#comment-16505696 ] ASF GitHub Bot commented on FLINK-4303: --- Github user kisimple commented on the issue: https://github.com/apache/flink/pull/6136 @medcv Thanks for your review :) Updated as your suggestions. > Add CEP examples > > > Key: FLINK-4303 > URL: https://issues.apache.org/jira/browse/FLINK-4303 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Timo Walther >Assignee: blues zheng >Priority: Major > > Neither CEP Java nor CEP Scala contain a runnable example. The example on the > website is also not runnable without adding some additional code. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6136: [FLINK-4303] [CEP] Add CEP examples
Github user kisimple commented on the issue: https://github.com/apache/flink/pull/6136 @medcv Thanks for your review :) Updated as your suggestions. ---
[GitHub] flink pull request #6141: flink-metrics-datadog: beautify metric name by exc...
GitHub user DmitryBe opened a pull request: https://github.com/apache/flink/pull/6141 flink-metrics-datadog: beautify metric name by excluding host_name, tm_id, and task|job_manager values from metric name and providing them as tags this allows to aggregate values by using tags in datadog You can merge this pull request into a Git repository by running: $ git pull https://github.com/DmitryBe/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6141.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6141 commit 72adcfc0be04dc98863916676cc964e3bce17372 Author: Dmitry B Date: 2018-06-08T03:58:32Z exclude host_name, job|task_manager name and tm_id from metric name; this values are provided as a tags ---
[jira] [Assigned] (FLINK-9552) NPE in SpanningRecordSerializer during checkpoint
[ https://issues.apache.org/jira/browse/FLINK-9552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-9552: --- Assignee: vinoyang > NPE in SpanningRecordSerializer during checkpoint > - > > Key: FLINK-9552 > URL: https://issues.apache.org/jira/browse/FLINK-9552 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 1.5.0 >Reporter: Truong Duc Kien >Assignee: vinoyang >Priority: Major > > We're encountering NPE intermittently inside SpanningRecordSerializer during > checkpoint. > > {code:java} > 2018-06-08 08:31:35,741 [ka.actor.default-dispatcher-83] INFO > o.a.f.r.e.ExecutionGraph IterationSource-22 (44/120) > (c1b94ef849db0e5fb9fb7b85c17073ce) switched from RUNNING to FAILED. > java.lang.RuntimeException > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > at > org.apache.flink.streaming.runtime.tasks.StreamIterationHead.run(StreamIterationHead.java:91) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:98) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > ... 5 more > 2018-06-08 08:31:35,746 [ka.actor.default-dispatcher-83] INFO > o.a.f.r.e.ExecutionGraph Job xxx (8a4eaf02c46dc21c7d6f3f70657dbb17) switched > from state RUNNING to FAILING. > java.lang.RuntimeException > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > at > org.apache.flink.streaming.runtime.tasks.StreamIterationHead.run(StreamIterationHead.java:91) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:98) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > ... 5 more > {code} > This issue is probably concurrency related, because the revelant Flink code > seems to have proper null checking > https://github.com/apache/flink/blob/fa024726bb801fc71cec5cc303cac1d4a03f555e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java#L98 > {code:java} > // Copy from intermediate buffers to current target memory segment > if (targetBuffer != null) { > targetBuffer.append(lengthBuffer); > targetBuffer.append(dataBuffer); > targetBuffer.commit(); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4303) Add CEP examples
[ https://issues.apache.org/jira/browse/FLINK-4303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505638#comment-16505638 ] ASF GitHub Bot commented on FLINK-4303: --- Github user kisimple commented on a diff in the pull request: https://github.com/apache/flink/pull/6136#discussion_r193941724 --- Diff: flink-examples/flink-examples-cep/src/main/java/org/apache/flink/cep/examples/java/monitoring/TemperatureMonitoring.java --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.examples.java.monitoring; + +import org.apache.flink.cep.CEP; +import org.apache.flink.cep.PatternFlatSelectFunction; +import org.apache.flink.cep.PatternSelectFunction; +import org.apache.flink.cep.PatternStream; +import org.apache.flink.cep.examples.java.monitoring.events.MonitoringEvent; +import org.apache.flink.cep.examples.java.monitoring.events.TemperatureAlert; +import org.apache.flink.cep.examples.java.monitoring.events.TemperatureEvent; +import org.apache.flink.cep.examples.java.monitoring.events.TemperatureWarning; +import org.apache.flink.cep.examples.java.monitoring.sources.MonitoringEventSource; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.IngestionTimeExtractor; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.util.Collector; + +import java.util.List; +import java.util.Map; + +/** + * CEP example monitoring program. + * This example program generates a stream of monitoring events which are analyzed using + * Flink's CEP library. The input event stream consists of temperature and power events + * from a set of racks. The goal is to detect when a rack is about to overheat. + * In order to do that, we create a CEP pattern which generates a TemperatureWarning + * whenever it sees two consecutive temperature events in a given time interval whose temperatures + * are higher than a given threshold value. A warning itself is not critical but if we see + * two warning for the same rack whose temperatures are rising, we want to generate an alert. + * This is achieved by defining another CEP pattern which analyzes the stream of generated + * temperature warnings. + */ +public class TemperatureMonitoring { + + private static final double TEMPERATURE_THRESHOLD = 100; + + public static void main(String[] args) throws Exception { + System.out.println("Executing temperature monitoring Java example."); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // Use ingestion time => TimeCharacteristic == EventTime + IngestionTimeExtractor + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + // Input stream of monitoring events + DataStream inputEventStream = env.addSource(new MonitoringEventSource()) + .assignTimestampsAndWatermarks(new IngestionTimeExtractor<>()); + + // Warning pattern: Two consecutive temperature events whose temperature is higher + // than the given threshold appearing within a time interval of 10 seconds + Pattern warningPattern = Pattern + .begin("first") + .subtype(TemperatureEvent.class) + .where(new SimpleCondition() { + @Override + public boolean filter(TemperatureEvent event) throws Exception { + return
[GitHub] flink pull request #6136: FLINK-4303] [CEP] Add CEP examples
Github user kisimple commented on a diff in the pull request: https://github.com/apache/flink/pull/6136#discussion_r193941724 --- Diff: flink-examples/flink-examples-cep/src/main/java/org/apache/flink/cep/examples/java/monitoring/TemperatureMonitoring.java --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.examples.java.monitoring; + +import org.apache.flink.cep.CEP; +import org.apache.flink.cep.PatternFlatSelectFunction; +import org.apache.flink.cep.PatternSelectFunction; +import org.apache.flink.cep.PatternStream; +import org.apache.flink.cep.examples.java.monitoring.events.MonitoringEvent; +import org.apache.flink.cep.examples.java.monitoring.events.TemperatureAlert; +import org.apache.flink.cep.examples.java.monitoring.events.TemperatureEvent; +import org.apache.flink.cep.examples.java.monitoring.events.TemperatureWarning; +import org.apache.flink.cep.examples.java.monitoring.sources.MonitoringEventSource; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.IngestionTimeExtractor; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.util.Collector; + +import java.util.List; +import java.util.Map; + +/** + * CEP example monitoring program. + * This example program generates a stream of monitoring events which are analyzed using + * Flink's CEP library. The input event stream consists of temperature and power events + * from a set of racks. The goal is to detect when a rack is about to overheat. + * In order to do that, we create a CEP pattern which generates a TemperatureWarning + * whenever it sees two consecutive temperature events in a given time interval whose temperatures + * are higher than a given threshold value. A warning itself is not critical but if we see + * two warning for the same rack whose temperatures are rising, we want to generate an alert. + * This is achieved by defining another CEP pattern which analyzes the stream of generated + * temperature warnings. + */ +public class TemperatureMonitoring { + + private static final double TEMPERATURE_THRESHOLD = 100; + + public static void main(String[] args) throws Exception { + System.out.println("Executing temperature monitoring Java example."); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // Use ingestion time => TimeCharacteristic == EventTime + IngestionTimeExtractor + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + // Input stream of monitoring events + DataStream inputEventStream = env.addSource(new MonitoringEventSource()) + .assignTimestampsAndWatermarks(new IngestionTimeExtractor<>()); + + // Warning pattern: Two consecutive temperature events whose temperature is higher + // than the given threshold appearing within a time interval of 10 seconds + Pattern warningPattern = Pattern + .begin("first") + .subtype(TemperatureEvent.class) + .where(new SimpleCondition() { + @Override + public boolean filter(TemperatureEvent event) throws Exception { + return event.getTemperature() > TEMPERATURE_THRESHOLD; + } + }) + .next("second") + .subtype(TemperatureEvent.class) +
[jira] [Commented] (FLINK-9554) flink scala shell doesn't work in yarn mode
[ https://issues.apache.org/jira/browse/FLINK-9554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505637#comment-16505637 ] ASF GitHub Bot commented on FLINK-9554: --- Github user zjffdu commented on the issue: https://github.com/apache/flink/pull/6140 @tillrohrmann Could you help review it ? Thanks > flink scala shell doesn't work in yarn mode > --- > > Key: FLINK-9554 > URL: https://issues.apache.org/jira/browse/FLINK-9554 > Project: Flink > Issue Type: Bug > Components: Scala Shell >Affects Versions: 1.5.0 >Reporter: Jeff Zhang >Priority: Major > > It still try to use StandaloneCluster even I specify it using yarn mode. > > Command I Use: bin/start-scala-shell.sh yarn -n 1 > > {code:java} > Starting Flink Shell: > 2018-06-06 12:30:02,672 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: jobmanager.rpc.address, localhost > 2018-06-06 12:30:02,673 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: jobmanager.rpc.port, 6123 > 2018-06-06 12:30:02,674 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: jobmanager.heap.mb, 1024 > 2018-06-06 12:30:02,674 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: taskmanager.heap.mb, 1024 > 2018-06-06 12:30:02,674 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: taskmanager.numberOfTaskSlots, 1 > 2018-06-06 12:30:02,674 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: parallelism.default, 1 > 2018-06-06 12:30:02,675 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: rest.port, 8081 > Exception in thread "main" java.lang.UnsupportedOperationException: Can't > deploy a standalone cluster. > at > org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:57) > at > org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:31) > at > org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:272) > at > org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:164) > at org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:194) > at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:193) > at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:135) > at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6140: [FLINK-9554] flink scala shell doesn't work in yarn mode
Github user zjffdu commented on the issue: https://github.com/apache/flink/pull/6140 @tillrohrmann Could you help review it ? Thanks ---
[jira] [Commented] (FLINK-9554) flink scala shell doesn't work in yarn mode
[ https://issues.apache.org/jira/browse/FLINK-9554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505627#comment-16505627 ] ASF GitHub Bot commented on FLINK-9554: --- GitHub user zjffdu opened a pull request: https://github.com/apache/flink/pull/6140 [FLINK-9554] flink scala shell doesn't work in yarn mode ## What is the purpose of the change This PR is trying to fix the issue of scala-shell unable to run in yarn mode. ## Brief change log The root cause is the options of CustomCommandLine is missed which cause the "-m yarn-cluster" can not be parsed correctly. ## Verifying this change I verify it manually on a single node hadoop cluster. I think it is better to add integration for that so that we can avoid the regression issue in future. If necessary, I can create a ticket for it and do it in another PR. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): ( no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no ) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? ( not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zjffdu/flink FLINK-9554 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6140.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6140 commit f3d37d4799e51323fbbcd52cf972ac1d874d2514 Author: Jeff Zhang Date: 2018-06-07T09:47:32Z save commit d65058133ce220e2d4212b906d521b38b9ef53dd Author: Jeff Zhang Date: 2018-06-08T02:39:27Z [FLINK-9554] flink scala shell doesn't work in yarn mode > flink scala shell doesn't work in yarn mode > --- > > Key: FLINK-9554 > URL: https://issues.apache.org/jira/browse/FLINK-9554 > Project: Flink > Issue Type: Bug > Components: Scala Shell >Affects Versions: 1.5.0 >Reporter: Jeff Zhang >Priority: Major > > It still try to use StandaloneCluster even I specify it using yarn mode. > > Command I Use: bin/start-scala-shell.sh yarn -n 1 > > {code:java} > Starting Flink Shell: > 2018-06-06 12:30:02,672 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: jobmanager.rpc.address, localhost > 2018-06-06 12:30:02,673 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: jobmanager.rpc.port, 6123 > 2018-06-06 12:30:02,674 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: jobmanager.heap.mb, 1024 > 2018-06-06 12:30:02,674 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: taskmanager.heap.mb, 1024 > 2018-06-06 12:30:02,674 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: taskmanager.numberOfTaskSlots, 1 > 2018-06-06 12:30:02,674 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: parallelism.default, 1 > 2018-06-06 12:30:02,675 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: rest.port, 8081 > Exception in thread "main" java.lang.UnsupportedOperationException: Can't > deploy a standalone cluster. > at > org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:57) > at > org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:31) > at > org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:272) > at > org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:164) > at org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:194) > at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:193) > at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:135) > at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6140: [FLINK-9554] flink scala shell doesn't work in yar...
GitHub user zjffdu opened a pull request: https://github.com/apache/flink/pull/6140 [FLINK-9554] flink scala shell doesn't work in yarn mode ## What is the purpose of the change This PR is trying to fix the issue of scala-shell unable to run in yarn mode. ## Brief change log The root cause is the options of CustomCommandLine is missed which cause the "-m yarn-cluster" can not be parsed correctly. ## Verifying this change I verify it manually on a single node hadoop cluster. I think it is better to add integration for that so that we can avoid the regression issue in future. If necessary, I can create a ticket for it and do it in another PR. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): ( no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no ) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? ( not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zjffdu/flink FLINK-9554 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6140.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6140 commit f3d37d4799e51323fbbcd52cf972ac1d874d2514 Author: Jeff Zhang Date: 2018-06-07T09:47:32Z save commit d65058133ce220e2d4212b906d521b38b9ef53dd Author: Jeff Zhang Date: 2018-06-08T02:39:27Z [FLINK-9554] flink scala shell doesn't work in yarn mode ---
[jira] [Closed] (FLINK-9329) hasRowtimeAttribute will throw NPE if user use setProctimeAttribute for table source
[ https://issues.apache.org/jira/browse/FLINK-9329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuemeng closed FLINK-9329. -- Resolution: Invalid > hasRowtimeAttribute will throw NPE if user use setProctimeAttribute for table > source > > > Key: FLINK-9329 > URL: https://issues.apache.org/jira/browse/FLINK-9329 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: yuemeng >Assignee: yuemeng >Priority: Critical > > {code:java} > Examples: > KafkaTableSource source = Kafka010JsonTableSource.builder() > .withSchema(TableSchema.builder() > .field("sensorId", Types.LONG()) > .field("temp", Types.DOUBLE()) > .field("ptime", Types.SQL_TIMESTAMP()).build()) > .withProctimeAttribute("ptime") > .build(); tableEnv.registerTableSource("flights", source ); {code} > {{ }} > {code:java} > Kafka010JsonTableSource implement the DefinedRowtimeAttributes . > so when TableSourceUtil call hasRowtimeAttribute(source)to check ,it will > call follow code > /** Returns a list with all rowtime attribute names of the [[TableSource]]. */ > private def getRowtimeAttributes(tableSource: TableSource[_]): Array[String] > = { > tableSource match { > case r: DefinedRowtimeAttributes => > r.getRowtimeAttributeDescriptors.asScala.map(_.getAttributeName).toArray > case _ => > Array() > } > } > r.getRowtimeAttributeDescriptors will throw NPE because of we use > ProctimeAttribute here > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9554) flink scala shell doesn't work in yarn mode
[ https://issues.apache.org/jira/browse/FLINK-9554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505619#comment-16505619 ] Jeff Zhang commented on FLINK-9554: --- FLINK-8795 is about scala shell hangs when running flink job in local mode which is different from this ticket. So I will create this ticket to resolve the yarn issue separately > flink scala shell doesn't work in yarn mode > --- > > Key: FLINK-9554 > URL: https://issues.apache.org/jira/browse/FLINK-9554 > Project: Flink > Issue Type: Bug > Components: Scala Shell >Affects Versions: 1.5.0 >Reporter: Jeff Zhang >Priority: Major > > It still try to use StandaloneCluster even I specify it using yarn mode. > > Command I Use: bin/start-scala-shell.sh yarn -n 1 > > {code:java} > Starting Flink Shell: > 2018-06-06 12:30:02,672 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: jobmanager.rpc.address, localhost > 2018-06-06 12:30:02,673 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: jobmanager.rpc.port, 6123 > 2018-06-06 12:30:02,674 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: jobmanager.heap.mb, 1024 > 2018-06-06 12:30:02,674 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: taskmanager.heap.mb, 1024 > 2018-06-06 12:30:02,674 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: taskmanager.numberOfTaskSlots, 1 > 2018-06-06 12:30:02,674 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: parallelism.default, 1 > 2018-06-06 12:30:02,675 INFO > org.apache.flink.configuration.GlobalConfiguration - Loading > configuration property: rest.port, 8081 > Exception in thread "main" java.lang.UnsupportedOperationException: Can't > deploy a standalone cluster. > at > org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:57) > at > org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:31) > at > org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:272) > at > org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:164) > at org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:194) > at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:193) > at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:135) > at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9554) flink scala shell doesn't work in yarn mode
Jeff Zhang created FLINK-9554: - Summary: flink scala shell doesn't work in yarn mode Key: FLINK-9554 URL: https://issues.apache.org/jira/browse/FLINK-9554 Project: Flink Issue Type: Bug Components: Scala Shell Affects Versions: 1.5.0 Reporter: Jeff Zhang It still try to use StandaloneCluster even I specify it using yarn mode. Command I Use: bin/start-scala-shell.sh yarn -n 1 {code:java} Starting Flink Shell: 2018-06-06 12:30:02,672 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.address, localhost 2018-06-06 12:30:02,673 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123 2018-06-06 12:30:02,674 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.mb, 1024 2018-06-06 12:30:02,674 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.heap.mb, 1024 2018-06-06 12:30:02,674 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 1 2018-06-06 12:30:02,674 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 1 2018-06-06 12:30:02,675 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: rest.port, 8081 Exception in thread "main" java.lang.UnsupportedOperationException: Can't deploy a standalone cluster. at org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:57) at org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:31) at org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:272) at org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:164) at org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:194) at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:193) at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:135) at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9553) Migrate integration tests for DataSinkITCase
Deepak Sharma created FLINK-9553: Summary: Migrate integration tests for DataSinkITCase Key: FLINK-9553 URL: https://issues.apache.org/jira/browse/FLINK-9553 Project: Flink Issue Type: Sub-task Reporter: Deepak Sharma Assignee: Deepak Sharma -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9552) NPE in SpanningRecordSerializer during checkpoint
Truong Duc Kien created FLINK-9552: -- Summary: NPE in SpanningRecordSerializer during checkpoint Key: FLINK-9552 URL: https://issues.apache.org/jira/browse/FLINK-9552 Project: Flink Issue Type: Bug Components: Type Serialization System Affects Versions: 1.5.0 Reporter: Truong Duc Kien We're encountering NPE intermittently inside SpanningRecordSerializer during checkpoint. {code:java} 2018-06-08 08:31:35,741 [ka.actor.default-dispatcher-83] INFO o.a.f.r.e.ExecutionGraph IterationSource-22 (44/120) (c1b94ef849db0e5fb9fb7b85c17073ce) switched from RUNNING to FAILED. java.lang.RuntimeException at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) at org.apache.flink.streaming.runtime.tasks.StreamIterationHead.run(StreamIterationHead.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NullPointerException at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:98) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105) at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) ... 5 more 2018-06-08 08:31:35,746 [ka.actor.default-dispatcher-83] INFO o.a.f.r.e.ExecutionGraph Job xxx (8a4eaf02c46dc21c7d6f3f70657dbb17) switched from state RUNNING to FAILING. java.lang.RuntimeException at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) at org.apache.flink.streaming.runtime.tasks.StreamIterationHead.run(StreamIterationHead.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NullPointerException at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:98) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105) at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) ... 5 more {code} This issue is probably concurrency related, because the revelant Flink code seems to have proper null checking https://github.com/apache/flink/blob/fa024726bb801fc71cec5cc303cac1d4a03f555e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java#L98 {code:java} // Copy from intermediate buffers to current target memory segment if (targetBuffer != null) { targetBuffer.append(lengthBuffer); targetBuffer.append(dataBuffer); targetBuffer.commit(); } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4303) Add CEP examples
[ https://issues.apache.org/jira/browse/FLINK-4303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505602#comment-16505602 ] ASF GitHub Bot commented on FLINK-4303: --- Github user medcv commented on a diff in the pull request: https://github.com/apache/flink/pull/6136#discussion_r193934744 --- Diff: flink-examples/flink-examples-cep/src/main/java/org/apache/flink/cep/examples/java/monitoring/TemperatureMonitoring.java --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.examples.java.monitoring; + +import org.apache.flink.cep.CEP; +import org.apache.flink.cep.PatternFlatSelectFunction; +import org.apache.flink.cep.PatternSelectFunction; +import org.apache.flink.cep.PatternStream; +import org.apache.flink.cep.examples.java.monitoring.events.MonitoringEvent; +import org.apache.flink.cep.examples.java.monitoring.events.TemperatureAlert; +import org.apache.flink.cep.examples.java.monitoring.events.TemperatureEvent; +import org.apache.flink.cep.examples.java.monitoring.events.TemperatureWarning; +import org.apache.flink.cep.examples.java.monitoring.sources.MonitoringEventSource; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.IngestionTimeExtractor; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.util.Collector; + +import java.util.List; +import java.util.Map; + +/** + * CEP example monitoring program. + * This example program generates a stream of monitoring events which are analyzed using + * Flink's CEP library. The input event stream consists of temperature and power events + * from a set of racks. The goal is to detect when a rack is about to overheat. + * In order to do that, we create a CEP pattern which generates a TemperatureWarning + * whenever it sees two consecutive temperature events in a given time interval whose temperatures + * are higher than a given threshold value. A warning itself is not critical but if we see + * two warning for the same rack whose temperatures are rising, we want to generate an alert. + * This is achieved by defining another CEP pattern which analyzes the stream of generated + * temperature warnings. + */ +public class TemperatureMonitoring { + + private static final double TEMPERATURE_THRESHOLD = 100; + + public static void main(String[] args) throws Exception { + System.out.println("Executing temperature monitoring Java example."); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // Use ingestion time => TimeCharacteristic == EventTime + IngestionTimeExtractor + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + // Input stream of monitoring events + DataStream inputEventStream = env.addSource(new MonitoringEventSource()) + .assignTimestampsAndWatermarks(new IngestionTimeExtractor<>()); + + // Warning pattern: Two consecutive temperature events whose temperature is higher + // than the given threshold appearing within a time interval of 10 seconds + Pattern warningPattern = Pattern + .begin("first") + .subtype(TemperatureEvent.class) + .where(new SimpleCondition() { + @Override + public boolean filter(TemperatureEvent event) throws Exception { + return
[GitHub] flink pull request #6136: FLINK-4303] [CEP] Add CEP examples
Github user medcv commented on a diff in the pull request: https://github.com/apache/flink/pull/6136#discussion_r193934789 --- Diff: flink-examples/flink-examples-cep/src/main/scala/org/apache/flink/cep/examples/scala/monitoring/TemperatureMonitoring.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.examples.scala.monitoring + +import org.apache.flink.cep.examples.scala.monitoring.events.{MonitoringEvent, TemperatureAlert, TemperatureEvent, TemperatureWarning} +import org.apache.flink.cep.examples.scala.monitoring.sources.MonitoringEventSource +import org.apache.flink.cep.scala.CEP +import org.apache.flink.cep.scala.pattern.Pattern +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.IngestionTimeExtractor +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation} +import org.apache.flink.streaming.api.windowing.time.Time + +/** + * CEP example monitoring program. + * This example program generates a stream of monitoring events which are analyzed using + * Flink's CEP library. The input event stream consists of temperature and power events + * from a set of racks. The goal is to detect when a rack is about to overheat. + * In order to do that, we create a CEP pattern which generates a TemperatureWarning + * whenever it sees two consecutive temperature events in a given time interval whose temperatures + * are higher than a given threshold value. A warning itself is not critical but if we see + * two warning for the same rack whose temperatures are rising, we want to generate an alert. + * This is achieved by defining another CEP pattern which analyzes the stream of generated + * temperature warnings. + */ +object TemperatureMonitoring { + + private val TEMPERATURE_THRESHOLD = 100 + + def main(args: Array[String]) { +println("Executing temperature monitoring Scala example.") +val env = StreamExecutionEnvironment.getExecutionEnvironment + +// Use ingestion time => TimeCharacteristic == EventTime + IngestionTimeExtractor +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + +// Input stream of monitoring events +val inputEventStream = env.addSource(new MonitoringEventSource()) + .assignTimestampsAndWatermarks(new IngestionTimeExtractor[MonitoringEvent]) + +// Warning pattern: Two consecutive temperature events whose temperature is higher +// than the given threshold appearing within a time interval of 10 seconds +val warningPattern = Pattern + .begin[MonitoringEvent]("first") +.subtype(classOf[TemperatureEvent]) +.where(_.temperature > TEMPERATURE_THRESHOLD) + .next("second") +.subtype(classOf[TemperatureEvent]) +.where(_.temperature > TEMPERATURE_THRESHOLD) + .within(Time.seconds(10)) + +// Create a pattern stream from our warning pattern +val tempPatternStream = CEP.pattern(inputEventStream.keyBy(_.rackID), warningPattern) + +// Generate temperature warnings for each matched warning pattern +val warnings: DataStream[TemperatureWarning] = tempPatternStream.select( pattern => { +val first = pattern("first").head.asInstanceOf[TemperatureEvent] +val second = pattern("second").head.asInstanceOf[TemperatureEvent] +new TemperatureWarning(first.rackID, (first.temperature + second.temperature) / 2) + } +) + +// Alert pattern: Two consecutive temperature warnings +// appearing within a time interval of 20 seconds +val alertPattern = Pattern + .begin[TemperatureWarning]("first") + .next("second") + .within(Time.seconds(20)) + +// Create a pattern stream from our alert pattern +val alertPatternStream =
[jira] [Commented] (FLINK-4303) Add CEP examples
[ https://issues.apache.org/jira/browse/FLINK-4303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505601#comment-16505601 ] ASF GitHub Bot commented on FLINK-4303: --- Github user medcv commented on a diff in the pull request: https://github.com/apache/flink/pull/6136#discussion_r193934789 --- Diff: flink-examples/flink-examples-cep/src/main/scala/org/apache/flink/cep/examples/scala/monitoring/TemperatureMonitoring.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.examples.scala.monitoring + +import org.apache.flink.cep.examples.scala.monitoring.events.{MonitoringEvent, TemperatureAlert, TemperatureEvent, TemperatureWarning} +import org.apache.flink.cep.examples.scala.monitoring.sources.MonitoringEventSource +import org.apache.flink.cep.scala.CEP +import org.apache.flink.cep.scala.pattern.Pattern +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.IngestionTimeExtractor +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation} +import org.apache.flink.streaming.api.windowing.time.Time + +/** + * CEP example monitoring program. + * This example program generates a stream of monitoring events which are analyzed using + * Flink's CEP library. The input event stream consists of temperature and power events + * from a set of racks. The goal is to detect when a rack is about to overheat. + * In order to do that, we create a CEP pattern which generates a TemperatureWarning + * whenever it sees two consecutive temperature events in a given time interval whose temperatures + * are higher than a given threshold value. A warning itself is not critical but if we see + * two warning for the same rack whose temperatures are rising, we want to generate an alert. + * This is achieved by defining another CEP pattern which analyzes the stream of generated + * temperature warnings. + */ +object TemperatureMonitoring { + + private val TEMPERATURE_THRESHOLD = 100 + + def main(args: Array[String]) { +println("Executing temperature monitoring Scala example.") +val env = StreamExecutionEnvironment.getExecutionEnvironment + +// Use ingestion time => TimeCharacteristic == EventTime + IngestionTimeExtractor +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + +// Input stream of monitoring events +val inputEventStream = env.addSource(new MonitoringEventSource()) + .assignTimestampsAndWatermarks(new IngestionTimeExtractor[MonitoringEvent]) + +// Warning pattern: Two consecutive temperature events whose temperature is higher +// than the given threshold appearing within a time interval of 10 seconds +val warningPattern = Pattern + .begin[MonitoringEvent]("first") +.subtype(classOf[TemperatureEvent]) +.where(_.temperature > TEMPERATURE_THRESHOLD) + .next("second") +.subtype(classOf[TemperatureEvent]) +.where(_.temperature > TEMPERATURE_THRESHOLD) + .within(Time.seconds(10)) + +// Create a pattern stream from our warning pattern +val tempPatternStream = CEP.pattern(inputEventStream.keyBy(_.rackID), warningPattern) + +// Generate temperature warnings for each matched warning pattern +val warnings: DataStream[TemperatureWarning] = tempPatternStream.select( pattern => { +val first = pattern("first").head.asInstanceOf[TemperatureEvent] +val second = pattern("second").head.asInstanceOf[TemperatureEvent] +new TemperatureWarning(first.rackID, (first.temperature + second.temperature) / 2) + } +) + +// Alert pattern: Two consecutive temperature warnings +// appearing within a time interval of 20 seconds +val alertPattern =
[GitHub] flink pull request #6136: FLINK-4303] [CEP] Add CEP examples
Github user medcv commented on a diff in the pull request: https://github.com/apache/flink/pull/6136#discussion_r193934744 --- Diff: flink-examples/flink-examples-cep/src/main/java/org/apache/flink/cep/examples/java/monitoring/TemperatureMonitoring.java --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.examples.java.monitoring; + +import org.apache.flink.cep.CEP; +import org.apache.flink.cep.PatternFlatSelectFunction; +import org.apache.flink.cep.PatternSelectFunction; +import org.apache.flink.cep.PatternStream; +import org.apache.flink.cep.examples.java.monitoring.events.MonitoringEvent; +import org.apache.flink.cep.examples.java.monitoring.events.TemperatureAlert; +import org.apache.flink.cep.examples.java.monitoring.events.TemperatureEvent; +import org.apache.flink.cep.examples.java.monitoring.events.TemperatureWarning; +import org.apache.flink.cep.examples.java.monitoring.sources.MonitoringEventSource; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.IngestionTimeExtractor; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.util.Collector; + +import java.util.List; +import java.util.Map; + +/** + * CEP example monitoring program. + * This example program generates a stream of monitoring events which are analyzed using + * Flink's CEP library. The input event stream consists of temperature and power events + * from a set of racks. The goal is to detect when a rack is about to overheat. + * In order to do that, we create a CEP pattern which generates a TemperatureWarning + * whenever it sees two consecutive temperature events in a given time interval whose temperatures + * are higher than a given threshold value. A warning itself is not critical but if we see + * two warning for the same rack whose temperatures are rising, we want to generate an alert. + * This is achieved by defining another CEP pattern which analyzes the stream of generated + * temperature warnings. + */ +public class TemperatureMonitoring { + + private static final double TEMPERATURE_THRESHOLD = 100; + + public static void main(String[] args) throws Exception { + System.out.println("Executing temperature monitoring Java example."); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // Use ingestion time => TimeCharacteristic == EventTime + IngestionTimeExtractor + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + // Input stream of monitoring events + DataStream inputEventStream = env.addSource(new MonitoringEventSource()) + .assignTimestampsAndWatermarks(new IngestionTimeExtractor<>()); + + // Warning pattern: Two consecutive temperature events whose temperature is higher + // than the given threshold appearing within a time interval of 10 seconds + Pattern warningPattern = Pattern + .begin("first") + .subtype(TemperatureEvent.class) + .where(new SimpleCondition() { + @Override + public boolean filter(TemperatureEvent event) throws Exception { + return event.getTemperature() > TEMPERATURE_THRESHOLD; + } + }) + .next("second") + .subtype(TemperatureEvent.class) +
[jira] [Commented] (FLINK-4303) Add CEP examples
[ https://issues.apache.org/jira/browse/FLINK-4303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505598#comment-16505598 ] ASF GitHub Bot commented on FLINK-4303: --- Github user medcv commented on the issue: https://github.com/apache/flink/pull/6136 @kisimple Recently I started to work on same example. I ran your code on my local and it did work perfectly. I could see some checkstyle errors that I think travis might pick them up. I have a suggestion for the sake of example scenario. For the `alerts` event you using `LocalTime` which indicate when `alerts` occurred, it would be useful also add `warnings` timestamp to `alerts` event in order to show when racks temperature passed the threshold, something like this on [line](https://github.com/kisimple/flink/blob/5cd3a374b84b2a7aaedb4c4184caded073e19295/flink-examples/flink-examples-cep/src/main/java/org/apache/flink/cep/examples/java/monitoring/TemperatureMonitoring.java#L133) `out.collect(new TemperatureAlert(first.getRackID(), second.getDatetime())); ` of course it needs to update the `TemperatureAlert` model also. > Add CEP examples > > > Key: FLINK-4303 > URL: https://issues.apache.org/jira/browse/FLINK-4303 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Timo Walther >Assignee: blues zheng >Priority: Major > > Neither CEP Java nor CEP Scala contain a runnable example. The example on the > website is also not runnable without adding some additional code. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6136: FLINK-4303] [CEP] Add CEP examples
Github user medcv commented on the issue: https://github.com/apache/flink/pull/6136 @kisimple Recently I started to work on same example. I ran your code on my local and it did work perfectly. I could see some checkstyle errors that I think travis might pick them up. I have a suggestion for the sake of example scenario. For the `alerts` event you using `LocalTime` which indicate when `alerts` occurred, it would be useful also add `warnings` timestamp to `alerts` event in order to show when racks temperature passed the threshold, something like this on [line](https://github.com/kisimple/flink/blob/5cd3a374b84b2a7aaedb4c4184caded073e19295/flink-examples/flink-examples-cep/src/main/java/org/apache/flink/cep/examples/java/monitoring/TemperatureMonitoring.java#L133) `out.collect(new TemperatureAlert(first.getRackID(), second.getDatetime())); ` of course it needs to update the `TemperatureAlert` model also. ---
[jira] [Closed] (FLINK-9482) Not applicable functions for TIME
[ https://issues.apache.org/jira/browse/FLINK-9482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuyi Chen closed FLINK-9482. - Resolution: Fixed > Not applicable functions for TIME > - > > Key: FLINK-9482 > URL: https://issues.apache.org/jira/browse/FLINK-9482 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: Viktor Vlasov >Assignee: Viktor Vlasov >Priority: Minor > > Due to work on https://issues.apache.org/jira/browse/FLINK-9432 I have faced > with question how to check DECADE function with tests in > _org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala._ > > Because I have used CENTURY function as an example, first of all I have check > it. During the test I figured out that when we use it with TIME it returns 0. > I suppose arguments for such functions (also it works for YEAR, MONTH, > MILLENNIUM, etc) need to be checked and throw some exception if type is not > suitable. > As an example, in Apache Calcite project (checked in sqlline shell), when I > am trying to use CENTURY with TIME it throw: > {code:java} > java.lang.AssertionError: unexpected TIME > {code} > Need to determine, why such check is not exists and add it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9482) Not applicable functions for TIME
[ https://issues.apache.org/jira/browse/FLINK-9482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505575#comment-16505575 ] ASF GitHub Bot commented on FLINK-9482: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6121 > Not applicable functions for TIME > - > > Key: FLINK-9482 > URL: https://issues.apache.org/jira/browse/FLINK-9482 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: Viktor Vlasov >Assignee: Viktor Vlasov >Priority: Minor > > Due to work on https://issues.apache.org/jira/browse/FLINK-9432 I have faced > with question how to check DECADE function with tests in > _org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala._ > > Because I have used CENTURY function as an example, first of all I have check > it. During the test I figured out that when we use it with TIME it returns 0. > I suppose arguments for such functions (also it works for YEAR, MONTH, > MILLENNIUM, etc) need to be checked and throw some exception if type is not > suitable. > As an example, in Apache Calcite project (checked in sqlline shell), when I > am trying to use CENTURY with TIME it throw: > {code:java} > java.lang.AssertionError: unexpected TIME > {code} > Need to determine, why such check is not exists and add it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6121: [FLINK-9482] [table] EXTRACT function argument val...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6121 ---
[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505512#comment-16505512 ] Indrajit Roychoudhury commented on FLINK-9061: -- [~neoeahit] [~jgrier] I'll pick this up next week, basic structure exists so it won't take much time to alter it. So i'm targeting 2 weeks from now, worst case end of month. > add entropy to s3 path for better scalability > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Jamie Grier >Assignee: Indrajit Roychoudhury >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505461#comment-16505461 ] Jamie Grier commented on FLINK-9061: [~neoeahit] This will affect all versions. > add entropy to s3 path for better scalability > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Jamie Grier >Assignee: Indrajit Roychoudhury >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505447#comment-16505447 ] Monal Daxini commented on FLINK-9061: - In addition to what [~stevenz3wu] and [~jgrier] suggest, it will be good to make the entropy generation pluggable. This way users can override the default entropy generation, if they need to. {code:java} // code placeholder state.backend.fs.checkpointdir.injectEntropy.strategy=com.foo.MyStaticEntropyGenerator {code} > add entropy to s3 path for better scalability > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Jamie Grier >Assignee: Indrajit Roychoudhury >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6138: [FLINK-9550][DOC]FlinkCEP snippet example has some...
GitHub user medcv opened a pull request: https://github.com/apache/flink/pull/6138 [FLINK-9550][DOC]FlinkCEP snippet example has some syntax errors ## What is the purpose of the change Fixing FlinkCEP snippet code syntax errors and data type mismatches ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/medcv/flink FLINK-9550 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6138.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6138 commit 4a5bc91bc2c326c1a38fd231d24b300b491b75cc Author: Yadan.JS Date: 2018-06-07T21:52:27Z [FLINK-9550][DOC]FlinkCEP snippet example has some syntax errors ---
[jira] [Commented] (FLINK-9539) Integrate flink-shaded 4.0
[ https://issues.apache.org/jira/browse/FLINK-9539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505396#comment-16505396 ] ASF GitHub Bot commented on FLINK-9539: --- Github user medcv commented on the issue: https://github.com/apache/flink/pull/6128 +1 > Integrate flink-shaded 4.0 > -- > > Key: FLINK-9539 > URL: https://issues.apache.org/jira/browse/FLINK-9539 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > With the recent release of flink-shaded 4.0 we should bump the versions for > all dependencies (except netty which is handled in FLINK-3952). > We can now remove the exclusions from the jackson dependencies as they are > now properly hidden. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6128: [FLINK-9539][build] Integrate flink-shaded 4.0
Github user medcv commented on the issue: https://github.com/apache/flink/pull/6128 +1 ---
[jira] [Commented] (FLINK-9551) FlinkCEP Scala Combining Patterns table has a missing pattern
[ https://issues.apache.org/jira/browse/FLINK-9551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505380#comment-16505380 ] ASF GitHub Bot commented on FLINK-9551: --- GitHub user medcv opened a pull request: https://github.com/apache/flink/pull/6139 [FLINK-9551][DOCS]FlinkCEP Scala Combining Patterns table has a missing pattern ## What is the purpose of the change in FlinkCEP documentation section related to Combining Patterns Scala Table has a missing patterns compare to Java table `begin(#pattern_sequence)` and also `begin()` pattern has missing `#name` params ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/medcv/flink FLINK-9551 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6139.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6139 commit 83d709f3ce9c73435d8a2c423f79c3d323a74604 Author: Yadan.JS Date: 2018-06-07T22:07:57Z [FLINK-9551][DOCS]FlinkCEP Scala Combining Patterns table has a missing pattern > FlinkCEP Scala Combining Patterns table has a missing pattern > - > > Key: FLINK-9551 > URL: https://issues.apache.org/jira/browse/FLINK-9551 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Yazdan Shirvany >Assignee: Yazdan Shirvany >Priority: Minor > > in FlinkCEP documentation section related to {{Combining Patterns}} Scala > Table has a missing patterns compare to Java table > {{begin(#pattern_sequence)}} > and also > {{begin()}} pattern has missing {{#name}} params -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6139: [FLINK-9551][DOCS]FlinkCEP Scala Combining Pattern...
GitHub user medcv opened a pull request: https://github.com/apache/flink/pull/6139 [FLINK-9551][DOCS]FlinkCEP Scala Combining Patterns table has a missing pattern ## What is the purpose of the change in FlinkCEP documentation section related to Combining Patterns Scala Table has a missing patterns compare to Java table `begin(#pattern_sequence)` and also `begin()` pattern has missing `#name` params ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/medcv/flink FLINK-9551 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6139.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6139 commit 83d709f3ce9c73435d8a2c423f79c3d323a74604 Author: Yadan.JS Date: 2018-06-07T22:07:57Z [FLINK-9551][DOCS]FlinkCEP Scala Combining Patterns table has a missing pattern ---
[jira] [Commented] (FLINK-9550) FlinkCEP snippet example has some syntax errors
[ https://issues.apache.org/jira/browse/FLINK-9550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505367#comment-16505367 ] ASF GitHub Bot commented on FLINK-9550: --- GitHub user medcv opened a pull request: https://github.com/apache/flink/pull/6138 [FLINK-9550][DOC]FlinkCEP snippet example has some syntax errors ## What is the purpose of the change Fixing FlinkCEP snippet code syntax errors and data type mismatches ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/medcv/flink FLINK-9550 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6138.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6138 commit 4a5bc91bc2c326c1a38fd231d24b300b491b75cc Author: Yadan.JS Date: 2018-06-07T21:52:27Z [FLINK-9550][DOC]FlinkCEP snippet example has some syntax errors > FlinkCEP snippet example has some syntax errors > --- > > Key: FLINK-9550 > URL: https://issues.apache.org/jira/browse/FLINK-9550 > Project: Flink > Issue Type: Improvement >Reporter: Yazdan Shirvany >Assignee: Yazdan Shirvany >Priority: Minor > > I tried to work with FlinckCEP documentarians and my assumption was the > snippet example should work but I got some syntax errors that needed to be > fix. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9551) FlinkCEP Scala Combining Patterns table has a missing pattern
[ https://issues.apache.org/jira/browse/FLINK-9551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yazdan Shirvany updated FLINK-9551: --- Summary: FlinkCEP Scala Combining Patterns table has a missing pattern (was: FlinkCEP Scala Combining Patterns table has two missing pattern) > FlinkCEP Scala Combining Patterns table has a missing pattern > - > > Key: FLINK-9551 > URL: https://issues.apache.org/jira/browse/FLINK-9551 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Yazdan Shirvany >Assignee: Yazdan Shirvany >Priority: Minor > > in FlinkCEP documentation section related to {{Combining Patterns}} Scala > Table has a missing patterns compare to Java table > {{begin(#pattern_sequence)}} > and also > {{begin()}} pattern has missing {{#name}} params -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9551) FlinkCEP Scala Combining Patterns table has two missing pattern
Yazdan Shirvany created FLINK-9551: -- Summary: FlinkCEP Scala Combining Patterns table has two missing pattern Key: FLINK-9551 URL: https://issues.apache.org/jira/browse/FLINK-9551 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Yazdan Shirvany Assignee: Yazdan Shirvany in FlinkCEP documentation section related to {{Combining Patterns}} Scala Table has a missing patterns compare to Java table {{begin(#pattern_sequence)}} and also {{begin()}} pattern has missing {{#name}} params -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9550) FlinkCEP snippet example has some syntax errors
Yazdan Shirvany created FLINK-9550: -- Summary: FlinkCEP snippet example has some syntax errors Key: FLINK-9550 URL: https://issues.apache.org/jira/browse/FLINK-9550 Project: Flink Issue Type: Improvement Reporter: Yazdan Shirvany Assignee: Yazdan Shirvany I tried to work with FlinckCEP documentarians and my assumption was the snippet example should work but I got some syntax errors that needed to be fix. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9549) Fix FlickCEP Docs broken link and minor style changes
[ https://issues.apache.org/jira/browse/FLINK-9549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505345#comment-16505345 ] ASF GitHub Bot commented on FLINK-9549: --- GitHub user medcv opened a pull request: https://github.com/apache/flink/pull/6137 [FLINK-9549][DOC]Fix FlickCEP Docs broken link and minor style changes ## What is the purpose of the change Fixing FlickCEP broken link and minor style changes ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/medcv/flink FLINK-9549 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6137.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6137 commit 2a9b6ff7b4dc4e2ec624ed18ba37b46d82f93e6d Author: Yadan.JS Date: 2018-06-07T21:33:49Z [FLINK-9549][DOC]Fix FlickCEP Docs broken link and minor style changes > Fix FlickCEP Docs broken link and minor style changes > - > > Key: FLINK-9549 > URL: https://issues.apache.org/jira/browse/FLINK-9549 > Project: Flink > Issue Type: Improvement >Reporter: Yazdan Shirvany >Assignee: Yazdan Shirvany >Priority: Trivial > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6137: [FLINK-9549][DOC]Fix FlickCEP Docs broken link and...
GitHub user medcv opened a pull request: https://github.com/apache/flink/pull/6137 [FLINK-9549][DOC]Fix FlickCEP Docs broken link and minor style changes ## What is the purpose of the change Fixing FlickCEP broken link and minor style changes ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/medcv/flink FLINK-9549 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6137.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6137 commit 2a9b6ff7b4dc4e2ec624ed18ba37b46d82f93e6d Author: Yadan.JS Date: 2018-06-07T21:33:49Z [FLINK-9549][DOC]Fix FlickCEP Docs broken link and minor style changes ---
[jira] [Created] (FLINK-9549) Fix FlickCEP Docs broken link and minor style changes
Yazdan Shirvany created FLINK-9549: -- Summary: Fix FlickCEP Docs broken link and minor style changes Key: FLINK-9549 URL: https://issues.apache.org/jira/browse/FLINK-9549 Project: Flink Issue Type: Improvement Reporter: Yazdan Shirvany Assignee: Yazdan Shirvany -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6131: [hotfix][docs] Fix Table API scala example code
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6131 ---
[jira] [Created] (FLINK-9548) Flink Apache Kudu Connector
Sandish Kumar HN created FLINK-9548: --- Summary: Flink Apache Kudu Connector Key: FLINK-9548 URL: https://issues.apache.org/jira/browse/FLINK-9548 Project: Flink Issue Type: New Feature Reporter: Sandish Kumar HN Flink Apache Kudu Connector will be good addition. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4303) Add CEP examples
[ https://issues.apache.org/jira/browse/FLINK-4303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505047#comment-16505047 ] ASF GitHub Bot commented on FLINK-4303: --- Github user kisimple commented on the issue: https://github.com/apache/flink/pull/6136 cc @kl0u > Add CEP examples > > > Key: FLINK-4303 > URL: https://issues.apache.org/jira/browse/FLINK-4303 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Timo Walther >Assignee: blues zheng >Priority: Major > > Neither CEP Java nor CEP Scala contain a runnable example. The example on the > website is also not runnable without adding some additional code. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6136: FLINK-4303] [CEP] Add CEP examples
Github user kisimple commented on the issue: https://github.com/apache/flink/pull/6136 cc @kl0u ---
[jira] [Commented] (FLINK-4303) Add CEP examples
[ https://issues.apache.org/jira/browse/FLINK-4303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505046#comment-16505046 ] ASF GitHub Bot commented on FLINK-4303: --- GitHub user kisimple opened a pull request: https://github.com/apache/flink/pull/6136 FLINK-4303] [CEP] Add CEP examples ## What is the purpose of the change Currently neither CEP Java nor CEP Scala contain a runnable example. This PR fixes the problem by adding a `flink-examples-cep` module. The change is based on #2937 ## Brief change log - Add a `flink-examples-cep` module - Add a `TemperatureMonitoring.java` example - Add a `TemperatureMonitoring.scala` example ## Verifying this change - Build the project and run the example by the following commands: `./bin/flink run -c org.apache.flink.cep.examples.java.monitoring.TemperatureMonitoring ./examples/cep/flink-examples-cep-with-dependencies.jar` and `./bin/flink run -c org.apache.flink.cep.examples.scala.monitoring.TemperatureMonitoring ./examples/cep/flink-examples-cep-with-dependencies.jar` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/kisimple/flink FLINK-4303 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6136.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6136 commit f9f382780e0975c83f2a957132c1c09c769cf3a3 Author: Aleksandr Chermenin Date: 2016-12-05T09:44:39Z [FLINK-4303] Added examples for CEP library. commit 7b96b9d5ffd221c0ff4169ac84c601eb34f95876 Author: Aleksandr Chermenin Date: 2016-12-05T10:09:50Z [FLINK-4303] Fixed code style. commit f26e7f403b7d7a74cd9a0d129086468789aa2747 Author: Aleksandr Chermenin Date: 2016-12-05T10:28:59Z [FLINK-4303] Another small code style fix. commit 3a214a38f4cda5a3e965ce4512b960ac5a7fe59f Author: Aleksandr Chermenin Date: 2016-12-05T11:24:14Z [FLINK-4303] Fixed Scala code style. commit 134492dc23a5eaa2173669ed213c3f933dcc4391 Author: Aleksandr Chermenin Date: 2017-02-22T06:59:28Z [FLINK-4303] Fixed hashCode methods for events. commit ccf26763acf363134ef4bfa5127c36af226a3ed2 Author: blueszheng Date: 2018-06-07T17:35:18Z [FLINK-4303] Add CEP examples > Add CEP examples > > > Key: FLINK-4303 > URL: https://issues.apache.org/jira/browse/FLINK-4303 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.1.0 >Reporter: Timo Walther >Assignee: blues zheng >Priority: Major > > Neither CEP Java nor CEP Scala contain a runnable example. The example on the > website is also not runnable without adding some additional code. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6136: FLINK-4303] [CEP] Add CEP examples
GitHub user kisimple opened a pull request: https://github.com/apache/flink/pull/6136 FLINK-4303] [CEP] Add CEP examples ## What is the purpose of the change Currently neither CEP Java nor CEP Scala contain a runnable example. This PR fixes the problem by adding a `flink-examples-cep` module. The change is based on #2937 ## Brief change log - Add a `flink-examples-cep` module - Add a `TemperatureMonitoring.java` example - Add a `TemperatureMonitoring.scala` example ## Verifying this change - Build the project and run the example by the following commands: `./bin/flink run -c org.apache.flink.cep.examples.java.monitoring.TemperatureMonitoring ./examples/cep/flink-examples-cep-with-dependencies.jar` and `./bin/flink run -c org.apache.flink.cep.examples.scala.monitoring.TemperatureMonitoring ./examples/cep/flink-examples-cep-with-dependencies.jar` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/kisimple/flink FLINK-4303 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6136.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6136 commit f9f382780e0975c83f2a957132c1c09c769cf3a3 Author: Aleksandr Chermenin Date: 2016-12-05T09:44:39Z [FLINK-4303] Added examples for CEP library. commit 7b96b9d5ffd221c0ff4169ac84c601eb34f95876 Author: Aleksandr Chermenin Date: 2016-12-05T10:09:50Z [FLINK-4303] Fixed code style. commit f26e7f403b7d7a74cd9a0d129086468789aa2747 Author: Aleksandr Chermenin Date: 2016-12-05T10:28:59Z [FLINK-4303] Another small code style fix. commit 3a214a38f4cda5a3e965ce4512b960ac5a7fe59f Author: Aleksandr Chermenin Date: 2016-12-05T11:24:14Z [FLINK-4303] Fixed Scala code style. commit 134492dc23a5eaa2173669ed213c3f933dcc4391 Author: Aleksandr Chermenin Date: 2017-02-22T06:59:28Z [FLINK-4303] Fixed hashCode methods for events. commit ccf26763acf363134ef4bfa5127c36af226a3ed2 Author: blueszheng Date: 2018-06-07T17:35:18Z [FLINK-4303] Add CEP examples ---
[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505042#comment-16505042 ] Vipul Singh commented on FLINK-9061: We are also seem to be affected by this. I wanted to check regarding the timelines on this feature. [~jgrier] qq wrt to affects versions: Does this only affect >=1.4 ? we seem to be seeing a similar issue for flink 1.3 > add entropy to s3 path for better scalability > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Jamie Grier >Assignee: Indrajit Roychoudhury >Priority: Critical > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9547) CEP pattern not called on windowed stream
[ https://issues.apache.org/jira/browse/FLINK-9547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505021#comment-16505021 ] Lucas Resch edited comment on FLINK-9547 at 6/7/18 6:07 PM: [~dawidwys] I created a small example that does something similar. Somehow the behavior is different though. Now it doesn't call the initial pattern but the one on the windowed stream is called. Something is definitely wrong. {code:java} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStreamSource objectDataStreamSource = env.fromElements( 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ); SingleOutputStreamOperator forces = objectDataStreamSource .filter((FilterFunction) Objects::nonNull) .process(new ProcessFunction() { @Override public void processElement(Integer value, Context ctx, Collector out) throws Exception { out.collect(value.longValue()); } }); Pattern forcesMock = Pattern.begin("start").where(new SimpleCondition() { @Override public boolean filter(Long value) { return true; } }); CEP.pattern(forces, forcesMock) .select(new PatternSelectFunction() { @Override public String select(Map> pattern) throws Exception { return String.format("Prints %d as expected", pattern.get("start").get(0)); } }).print(); SingleOutputStreamOperator intervals = forces .countWindowAll(2, 1) .process(new ProcessAllWindowFunction() { @Override public void process(Context context, Iterable elements, Collector out) throws Exception { List items = new ArrayList<>(); elements.forEach(items::add); if (items.size() == 2) { out.collect(items.get(0)); } } }); Pattern intervalMock = Pattern.begin("start").where(new SimpleCondition() { @Override public boolean filter(Long value) throws Exception { return true; } }); CEP.pattern(intervals, intervalMock) .select(new PatternSelectFunction() { @Override public String select(Map> pattern) throws Exception { return String.format("Doesn't print %d", pattern.get("start").get(0)); } }).print(); env.execute(); {code} was (Author: mlnotw): [~dawidwys] I created a small example that does something similar. Somehow the behavior is different though. Now it doesn't call the initial pattern but the one on the windowed stream is called. Something is definitely wrong. {code:java} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStreamSource objectDataStreamSource = env.fromElements( 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ); SingleOutputStreamOperator forces = objectDataStreamSource .filter((FilterFunction) Objects::nonNull) .process(new ProcessFunction() { @Override public void processElement(Integer value, Context ctx, Collector out) throws Exception { out.collect(value.longValue()); } }); Pattern forcesMock = Pattern.begin("start").where(new SimpleCondition() { @Override public boolean filter(Long value) { return true; } }); CEP.pattern(forces, forcesMock) .select(new PatternSelectFunction() { @Override public String select(Map> pattern) throws Exception { return String.format("Prints %d as expected", pattern.get("start").get(0)); } }).print(); // Create another stream based on a sliding window over the input stream SingleOutputStreamOperator intervals = forces .countWindowAll(2, 1) .process(new ProcessAllWindowFunction() { @Override public void process(Context context, Iterable elements, Collector out) throws Exception { List items = new ArrayList<>(); elements.forEach(items::add); if (items.size() == 2) { out.collect(items.get(0)); } } }); Pattern intervalMock = Pattern.begin("start").where(new SimpleCondition() { @Override public boolean filter(Long value) throws Exception { return true; } }); CEP.pattern(intervals, intervalMock) .select(new PatternSelectFunction() { @Override public String select(Map> pattern) throws Exception { return String.format("Doesn't print %d", pattern.get("start").get(0)); } }).print(); env.execute(); {code} > CEP pattern not called on windowed stream >
[jira] [Comment Edited] (FLINK-9547) CEP pattern not called on windowed stream
[ https://issues.apache.org/jira/browse/FLINK-9547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505021#comment-16505021 ] Lucas Resch edited comment on FLINK-9547 at 6/7/18 6:06 PM: [~dawidwys] I created a small example that does something similar. Somehow the behavior is different though. Now it doesn't call the initial pattern but the one on the windowed stream is called. Something is definitely wrong. {code:java} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStreamSource objectDataStreamSource = env.fromElements( 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ); SingleOutputStreamOperator forces = objectDataStreamSource .filter((FilterFunction) Objects::nonNull) .process(new ProcessFunction() { @Override public void processElement(Integer value, Context ctx, Collector out) throws Exception { out.collect(value.longValue()); } }); Pattern forcesMock = Pattern.begin("start").where(new SimpleCondition() { @Override public boolean filter(Long value) { return true; } }); CEP.pattern(forces, forcesMock) .select(new PatternSelectFunction() { @Override public String select(Map> pattern) throws Exception { return String.format("Prints %d as expected", pattern.get("start").get(0)); } }).print(); // Create another stream based on a sliding window over the input stream SingleOutputStreamOperator intervals = forces .countWindowAll(2, 1) .process(new ProcessAllWindowFunction() { @Override public void process(Context context, Iterable elements, Collector out) throws Exception { List items = new ArrayList<>(); elements.forEach(items::add); if (items.size() == 2) { out.collect(items.get(0)); } } }); Pattern intervalMock = Pattern.begin("start").where(new SimpleCondition() { @Override public boolean filter(Long value) throws Exception { return true; } }); CEP.pattern(intervals, intervalMock) .select(new PatternSelectFunction() { @Override public String select(Map> pattern) throws Exception { return String.format("Doesn't print %d", pattern.get("start").get(0)); } }).print(); env.execute(); {code} was (Author: mlnotw): I created a small example that does something similar. Somehow the behavior is different though. Now it doesn't call the initial pattern but the one on the windowed stream is called. Something is definitely wrong. {code:java} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStreamSource objectDataStreamSource = env.fromElements( 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ); SingleOutputStreamOperator forces = objectDataStreamSource .filter((FilterFunction) Objects::nonNull) .process(new ProcessFunction() { @Override public void processElement(Integer value, Context ctx, Collector out) throws Exception { out.collect(value.longValue()); } }); Pattern forcesMock = Pattern.begin("start").where(new SimpleCondition() { @Override public boolean filter(Long value) { return true; } }); CEP.pattern(forces, forcesMock) .select(new PatternSelectFunction() { @Override public String select(Map> pattern) throws Exception { return String.format("Prints %d as expected", pattern.get("start").get(0)); } }).print(); // Create another stream based on a sliding window over the input stream SingleOutputStreamOperator intervals = forces .countWindowAll(2, 1) .process(new ProcessAllWindowFunction() { @Override public void process(Context context, Iterable elements, Collector out) throws Exception { List items = new ArrayList<>(); elements.forEach(items::add); if (items.size() == 2) { out.collect(items.get(0)); } } }); Pattern intervalMock = Pattern.begin("start").where(new SimpleCondition() { @Override public boolean filter(Long value) throws Exception { return true; } }); CEP.pattern(intervals, intervalMock) .select(new PatternSelectFunction() { @Override public String select(Map> pattern) throws Exception { return String.format("Doesn't print %d", pattern.get("start").get(0)); } }).print(); env.execute(); {code} >
[jira] [Commented] (FLINK-9547) CEP pattern not called on windowed stream
[ https://issues.apache.org/jira/browse/FLINK-9547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505021#comment-16505021 ] Lucas Resch commented on FLINK-9547: I created a small example that does something similar. Somehow the behavior is different though. Now it doesn't call the initial pattern but the one on the windowed stream is called. Something is definitely wrong. {code:java} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStreamSource objectDataStreamSource = env.fromElements( 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ); SingleOutputStreamOperator forces = objectDataStreamSource .filter((FilterFunction) Objects::nonNull) .process(new ProcessFunction() { @Override public void processElement(Integer value, Context ctx, Collector out) throws Exception { out.collect(value.longValue()); } }); Pattern forcesMock = Pattern.begin("start").where(new SimpleCondition() { @Override public boolean filter(Long value) { return true; } }); CEP.pattern(forces, forcesMock) .select(new PatternSelectFunction() { @Override public String select(Map> pattern) throws Exception { return String.format("Prints %d as expected", pattern.get("start").get(0)); } }).print(); // Create another stream based on a sliding window over the input stream SingleOutputStreamOperator intervals = forces .countWindowAll(2, 1) .process(new ProcessAllWindowFunction() { @Override public void process(Context context, Iterable elements, Collector out) throws Exception { List items = new ArrayList<>(); elements.forEach(items::add); if (items.size() == 2) { out.collect(items.get(0)); } } }); Pattern intervalMock = Pattern.begin("start").where(new SimpleCondition() { @Override public boolean filter(Long value) throws Exception { return true; } }); CEP.pattern(intervals, intervalMock) .select(new PatternSelectFunction() { @Override public String select(Map> pattern) throws Exception { return String.format("Doesn't print %d", pattern.get("start").get(0)); } }).print(); env.execute(); {code} > CEP pattern not called on windowed stream > - > > Key: FLINK-9547 > URL: https://issues.apache.org/jira/browse/FLINK-9547 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.2, 1.5.0 >Reporter: Lucas Resch >Priority: Major > > When trying to match a pattern on a stream that was windowed the pattern will > not be called. The following shows example code where the issue was noticed: > {code:java} > // Set up stream > SingleOutputStreamOperator forces = ... > .filter(new FilterForcesFunction()) > .process(new ProcessForcesFunction()); > // Define mock pattern > Pattern forcesMock = Pattern.begin("start").where(new > SimpleCondition() { > @Override > public boolean filter(ForceZ value) { > // This is called as expected > return true; > } > }); > // Print pattern results > // This actually prints all incoming events as expected > CEP.pattern(forcesMock, mock) > .select(new PatternSelectFunction() { > @Override > public ForceZ select(Map> pattern){ > return pattern.get("start").get(0); > } > }).print(); > // Create another stream based on a sliding window over the input stream > SingleOutputStreamOperator intervals = forces > .countWindowAll(2, 1) > .process(new ForceWindowFunction()); > // Define mock pattern > Pattern intervalMock = > Pattern.begin("start").where(new SimpleCondition() { > @Override > public boolean filter(Interval value) throws Exception { > // This is never called > return true; > } > }); > // Print pattern results > // Doesn't print anything since the mock condition is never called > CEP.pattern(intervals, intervalMock) > .select(new PatternSelectFunction() { > @Override > public Interval select(Map> pattern) > throws Exception { > return pattern.get("start").get(0); > } > }).print(); > {code} > Either I'm doing something wrong or this is a major bug. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6130: [FLINK-9545] Support read a file multiple times in Flink ...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6130 You're effectively only explaining what this feature is, but not why it is actually needed. We have to gauge whether this feature is useful for other users as well before we decide to maintain it. ---
[jira] [Commented] (FLINK-9545) Support read a file multiple times in Flink DataStream
[ https://issues.apache.org/jira/browse/FLINK-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505017#comment-16505017 ] ASF GitHub Bot commented on FLINK-9545: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/6130 You're effectively only explaining what this feature is, but not why it is actually needed. We have to gauge whether this feature is useful for other users as well before we decide to maintain it. > Support read a file multiple times in Flink DataStream > --- > > Key: FLINK-9545 > URL: https://issues.apache.org/jira/browse/FLINK-9545 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.6.0 > > > Motivation: We have the requirements to read a bunch files, each file to read > multiple times, to feed our streams > Specifically we need {{StreamExecutionEnvironment.readFile/readTextFile}} to > be able to read a file for a specified {{N}} times, but currently it only > supports reading file once. > We've implemented this internally. Would be good to get it back to the > community version. This jira is to add support for the feature. > Plan: > add a new processing mode as PROCESSING_N_TIMES, and add additional parameter > {{numTimes}} for {{StreamExecutionEnvironment.readFile/readTextFile}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6130: [FLINK-9545] Support read a file multiple times in Flink ...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/6130 @aljoscha Motivation: We have the requirements to read a bunch files, each file to read multiple times, to feed our streams Specifically we need `StreamExecutionEnvironment.readFile/readTextFile` to be able to read a file for a specified `N` times, but currently it only supports reading file once. We've implemented this internally. Would be good to get it back to the community version. This jira is to add support for the feature. ---
[jira] [Commented] (FLINK-9545) Support read a file multiple times in Flink DataStream
[ https://issues.apache.org/jira/browse/FLINK-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504916#comment-16504916 ] ASF GitHub Bot commented on FLINK-9545: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/6130 @aljoscha Motivation: We have the requirements to read a bunch files, each file to read multiple times, to feed our streams Specifically we need `StreamExecutionEnvironment.readFile/readTextFile` to be able to read a file for a specified `N` times, but currently it only supports reading file once. We've implemented this internally. Would be good to get it back to the community version. This jira is to add support for the feature. > Support read a file multiple times in Flink DataStream > --- > > Key: FLINK-9545 > URL: https://issues.apache.org/jira/browse/FLINK-9545 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.6.0 > > > Motivation: We have the requirements to read a bunch files, each file to read > multiple times, to feed our streams > Specifically we need {{StreamExecutionEnvironment.readFile/readTextFile}} to > be able to read a file for a specified {{N}} times, but currently it only > supports reading file once. > We've implemented this internally. Would be good to get it back to the > community version. This jira is to add support for the feature. > Plan: > add a new processing mode as PROCESSING_N_TIMES, and add additional parameter > {{numTimes}} for {{StreamExecutionEnvironment.readFile/readTextFile}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9545) Support read a file multiple times in Flink DataStream
[ https://issues.apache.org/jira/browse/FLINK-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-9545: Description: Motivation: We have the requirements to read a bunch files, each file to read multiple times, to feed our streams Specifically we need {{StreamExecutionEnvironment.readFile/readTextFile}} to be able to read a file for a specified {{N}} times, but currently it only supports reading file once. We've implemented this internally. Would be good to get it back to the community version. This jira is to add support for the feature. Plan: add a new processing mode as PROCESSING_N_TIMES, and add additional parameter {{numTimes}} for {{StreamExecutionEnvironment.readFile/readTextFile}} was: Motivation: We have the requirements to read a bunch files, each file to read multiple times, to feed our streams Specifically we need {{StreamExecutionEnvironment.readFile/readTextFile}} to be able to read a file for a specified {{N}} times, but currently it only supports reading file once. We've implemented this internally. Would be good to back it back to the community version. Feature: add support for the feature. Plan: add a new processing mode as PROCESSING_N_TIMES, and add additional parameter {{numTimes}} for {{StreamExecutionEnvironment.readFile/readTextFile}} > Support read a file multiple times in Flink DataStream > --- > > Key: FLINK-9545 > URL: https://issues.apache.org/jira/browse/FLINK-9545 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.6.0 > > > Motivation: We have the requirements to read a bunch files, each file to read > multiple times, to feed our streams > Specifically we need {{StreamExecutionEnvironment.readFile/readTextFile}} to > be able to read a file for a specified {{N}} times, but currently it only > supports reading file once. > We've implemented this internally. Would be good to get it back to the > community version. This jira is to add support for the feature. > Plan: > add a new processing mode as PROCESSING_N_TIMES, and add additional parameter > {{numTimes}} for {{StreamExecutionEnvironment.readFile/readTextFile}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9545) Support read a file multiple times in Flink DataStream
[ https://issues.apache.org/jira/browse/FLINK-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504913#comment-16504913 ] Bowen Li commented on FLINK-9545: - [~aljoscha] I updated the description > Support read a file multiple times in Flink DataStream > --- > > Key: FLINK-9545 > URL: https://issues.apache.org/jira/browse/FLINK-9545 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.6.0 > > > Motivation: We have the requirements to read a bunch files, each file to read > multiple times, to feed our streams > Specifically we need {{StreamExecutionEnvironment.readFile/readTextFile}} to > be able to read a file for a specified {{N}} times, but currently it only > supports reading file once. > We've implemented this internally. Would be good to get it back to the > community version. This jira is to add support for the feature. > Plan: > add a new processing mode as PROCESSING_N_TIMES, and add additional parameter > {{numTimes}} for {{StreamExecutionEnvironment.readFile/readTextFile}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9545) Support read a file multiple times in Flink DataStream
[ https://issues.apache.org/jira/browse/FLINK-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-9545: Description: Motivation: We have the requirements to read a bunch files, each file to read multiple times, to feed our streams Specifically we need {{StreamExecutionEnvironment.readFile/readTextFile}} to be able to read a file for a specified {{N}} times, but currently it only supports reading file once. We've implemented this internally. Would be good to back it back to the community version. Feature: add support for the feature. Plan: add a new processing mode as PROCESSING_N_TIMES, and add additional parameter {{numTimes}} for {{StreamExecutionEnvironment.readFile/readTextFile}} was: we need {{StreamExecutionEnvironment.readFile/readTextFile}} to read each file for N times, but currently it only supports reading file once. add support for the feature. Plan: add a new processing mode as PROCESSING_N_TIMES, and add additional parameter {{numTimes}} for {{StreamExecutionEnvironment.readFile/readTextFile}} > Support read a file multiple times in Flink DataStream > --- > > Key: FLINK-9545 > URL: https://issues.apache.org/jira/browse/FLINK-9545 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.6.0 > > > Motivation: We have the requirements to read a bunch files, each file to read > multiple times, to feed our streams > Specifically we need {{StreamExecutionEnvironment.readFile/readTextFile}} to > be able to read a file for a specified {{N}} times, but currently it only > supports reading file once. > We've implemented this internally. Would be good to back it back to the > community version. > Feature: add support for the feature. > Plan: > add a new processing mode as PROCESSING_N_TIMES, and add additional parameter > {{numTimes}} for {{StreamExecutionEnvironment.readFile/readTextFile}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9538) Make KeyedStateFunction an interface
[ https://issues.apache.org/jira/browse/FLINK-9538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504863#comment-16504863 ] ASF GitHub Bot commented on FLINK-9538: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/6134 Sorry @yanghua , will have a look tomorrow. You really don't need to ping me every hour, will get to it as soon as I have some time. > Make KeyedStateFunction an interface > > > Key: FLINK-9538 > URL: https://issues.apache.org/jira/browse/FLINK-9538 > Project: Flink > Issue Type: Improvement >Reporter: Dawid Wysakowicz >Assignee: vinoyang >Priority: Major > > I suggest to change the KeyedStateFunction from abstract class to interface > (FunctionalInterface in particular) to enable passing lambdas. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6134: [FLINK-9538] Make KeyedStateFunction an interface
Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/6134 Sorry @yanghua , will have a look tomorrow. You really don't need to ping me every hour, will get to it as soon as I have some time. ---
[GitHub] flink issue #6134: [FLINK-9538] Make KeyedStateFunction an interface
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6134 cc @dawidwys refactored code, please review again~ ---
[jira] [Commented] (FLINK-9538) Make KeyedStateFunction an interface
[ https://issues.apache.org/jira/browse/FLINK-9538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504859#comment-16504859 ] ASF GitHub Bot commented on FLINK-9538: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6134 cc @dawidwys refactored code, please review again~ > Make KeyedStateFunction an interface > > > Key: FLINK-9538 > URL: https://issues.apache.org/jira/browse/FLINK-9538 > Project: Flink > Issue Type: Improvement >Reporter: Dawid Wysakowicz >Assignee: vinoyang >Priority: Major > > I suggest to change the KeyedStateFunction from abstract class to interface > (FunctionalInterface in particular) to enable passing lambdas. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9547) CEP pattern not called on windowed stream
[ https://issues.apache.org/jira/browse/FLINK-9547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504834#comment-16504834 ] Dawid Wysakowicz commented on FLINK-9547: - Hi [~MLNotW]. Could you provide a runnable example that we could use to reproduce this bug? With inputs and ProcessFunctions? > CEP pattern not called on windowed stream > - > > Key: FLINK-9547 > URL: https://issues.apache.org/jira/browse/FLINK-9547 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.2, 1.5.0 >Reporter: Lucas Resch >Priority: Major > > When trying to match a pattern on a stream that was windowed the pattern will > not be called. The following shows example code where the issue was noticed: > {code:java} > // Set up stream > SingleOutputStreamOperator forces = ... > .filter(new FilterForcesFunction()) > .process(new ProcessForcesFunction()); > // Define mock pattern > Pattern forcesMock = Pattern.begin("start").where(new > SimpleCondition() { > @Override > public boolean filter(ForceZ value) { > // This is called as expected > return true; > } > }); > // Print pattern results > // This actually prints all incoming events as expected > CEP.pattern(forcesMock, mock) > .select(new PatternSelectFunction() { > @Override > public ForceZ select(Map> pattern){ > return pattern.get("start").get(0); > } > }).print(); > // Create another stream based on a sliding window over the input stream > SingleOutputStreamOperator intervals = forces > .countWindowAll(2, 1) > .process(new ForceWindowFunction()); > // Define mock pattern > Pattern intervalMock = > Pattern.begin("start").where(new SimpleCondition() { > @Override > public boolean filter(Interval value) throws Exception { > // This is never called > return true; > } > }); > // Print pattern results > // Doesn't print anything since the mock condition is never called > CEP.pattern(intervals, intervalMock) > .select(new PatternSelectFunction() { > @Override > public Interval select(Map> pattern) > throws Exception { > return pattern.get("start").get(0); > } > }).print(); > {code} > Either I'm doing something wrong or this is a major bug. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9168) Pulsar Sink Connector
[ https://issues.apache.org/jira/browse/FLINK-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504826#comment-16504826 ] ASF GitHub Bot commented on FLINK-9168: --- Github user surryr commented on a diff in the pull request: https://github.com/apache/flink/pull/5845#discussion_r193793061 --- Diff: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java --- @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pulsar; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor; +import org.apache.flink.util.SerializableObject; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageBuilder; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConfiguration; +import org.apache.pulsar.client.api.PulsarClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.function.Function; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Flink Sink to produce data into a Pulsar topic. + */ +public class FlinkPulsarProducer + extends RichSinkFunction + implements CheckpointedFunction { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkPulsarProducer.class); + + /** +* The pulsar service url. +*/ + protected final String serviceUrl; + + /** +* User defined configuration for the producer. +*/ + protected final ProducerConfiguration producerConfig; + + /** +* The name of the default topic this producer is writing data to. +*/ + protected final String defaultTopicName; + + /** +* (Serializable) SerializationSchema for turning objects used with Flink into. +* byte[] for Pulsar. +*/ + protected final SerializationSchema schema; + + /** +* User-provided key extractor for assigning a key to a pulsar message. +*/ + protected final PulsarKeyExtractor flinkPulsarKeyExtractor; + + /** +* Produce Mode. +*/ + protected PulsarProduceMode produceMode = PulsarProduceMode.AT_LEAST_ONE; + + /** +* If true, the producer will wait until all outstanding records have been send to the broker. +*/ + protected boolean flushOnCheckpoint; + + // Runtime fields -- + + /** Pulsar Producer instance. */ + protected transient Producer producer; + + /** The callback than handles error propagation or logging callbacks. */ + protected transient Function successCallback = msgId -> { + acknowledgeMessage(); + return msgId; + }; + + protected transient Function failureCallback; + + /** Errors encountered in the async producer are stored here. */ + protected transient volatile Exception asyncException; + + /** Lock for accessing the pending records. */ + protected final SerializableObject pendingRecordsLock = new SerializableObject(); +
[GitHub] flink pull request #5845: [FLINK-9168][flink-connectors]Pulsar Sink connecto...
Github user surryr commented on a diff in the pull request: https://github.com/apache/flink/pull/5845#discussion_r193793061 --- Diff: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java --- @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pulsar; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor; +import org.apache.flink.util.SerializableObject; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageBuilder; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConfiguration; +import org.apache.pulsar.client.api.PulsarClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.function.Function; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Flink Sink to produce data into a Pulsar topic. + */ +public class FlinkPulsarProducer + extends RichSinkFunction + implements CheckpointedFunction { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkPulsarProducer.class); + + /** +* The pulsar service url. +*/ + protected final String serviceUrl; + + /** +* User defined configuration for the producer. +*/ + protected final ProducerConfiguration producerConfig; + + /** +* The name of the default topic this producer is writing data to. +*/ + protected final String defaultTopicName; + + /** +* (Serializable) SerializationSchema for turning objects used with Flink into. +* byte[] for Pulsar. +*/ + protected final SerializationSchema schema; + + /** +* User-provided key extractor for assigning a key to a pulsar message. +*/ + protected final PulsarKeyExtractor flinkPulsarKeyExtractor; + + /** +* Produce Mode. +*/ + protected PulsarProduceMode produceMode = PulsarProduceMode.AT_LEAST_ONE; + + /** +* If true, the producer will wait until all outstanding records have been send to the broker. +*/ + protected boolean flushOnCheckpoint; + + // Runtime fields -- + + /** Pulsar Producer instance. */ + protected transient Producer producer; + + /** The callback than handles error propagation or logging callbacks. */ + protected transient Function successCallback = msgId -> { + acknowledgeMessage(); + return msgId; + }; + + protected transient Function failureCallback; + + /** Errors encountered in the async producer are stored here. */ + protected transient volatile Exception asyncException; + + /** Lock for accessing the pending records. */ + protected final SerializableObject pendingRecordsLock = new SerializableObject(); + + /** Number of unacknowledged records. */ + protected long pendingRecords; + + public FlinkPulsarProducer(String serviceUrl, + String defaultTopicName, +
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504820#comment-16504820 ] swy commented on FLINK-9506: thank for response [~sihuazhou], the key length is around 50 chars. We will change to hashCode as suggested and test again :) > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9547) CEP pattern not called on windowed stream
Lucas Resch created FLINK-9547: -- Summary: CEP pattern not called on windowed stream Key: FLINK-9547 URL: https://issues.apache.org/jira/browse/FLINK-9547 Project: Flink Issue Type: Bug Components: CEP Affects Versions: 1.5.0, 1.3.2 Reporter: Lucas Resch When trying to match a pattern on a stream that was windowed the pattern will not be called. The following shows example code where the issue was noticed: {code:java} // Set up stream SingleOutputStreamOperator forces = ... .filter(new FilterForcesFunction()) .process(new ProcessForcesFunction()); // Define mock pattern Pattern forcesMock = Pattern.begin("start").where(new SimpleCondition() { @Override public boolean filter(ForceZ value) { // This is called as expected return true; } }); // Print pattern results // This actually prints all incoming events as expected CEP.pattern(forcesMock, mock) .select(new PatternSelectFunction() { @Override public ForceZ select(Map> pattern){ return pattern.get("start").get(0); } }).print(); // Create another stream based on a sliding window over the input stream SingleOutputStreamOperator intervals = forces .countWindowAll(2, 1) .process(new ForceWindowFunction()); // Define mock pattern Pattern intervalMock = Pattern.begin("start").where(new SimpleCondition() { @Override public boolean filter(Interval value) throws Exception { // This is never called return true; } }); // Print pattern results // Doesn't print anything since the mock condition is never called CEP.pattern(intervals, intervalMock) .select(new PatternSelectFunction() { @Override public Interval select(Map> pattern) throws Exception { return pattern.get("start").get(0); } }).print(); {code} Either I'm doing something wrong or this is a major bug. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504729#comment-16504729 ] Sihua Zhou commented on FLINK-9506: --- Hi [~yow] From the top of my head, I list answers here: - >> 1. Just to confirm, RocksDB is needed to setup in every TM machine? Any other option? RocksDB is needed to setup in every sub-tasks that use the KeyedState if you are using RocksDB backend. - >> 2. What is the recommendation for RocksDB's statebackend? We are using tmpfs with checkpoint now with savepoint persists to hdfs. Q1. I think the default configuration of the RocksDB backend is quite good for the most of the jobs. Q2. I'm not sure whether I got you correctly, the savepoint is triggered manually, and checkpoint is triggered automatically, you means that you trigger the savepoint manually periodically? - >> 3. By source code, rocksdb options like parallelism and certain predefined option could be configured, any corresponding parameter in flink_config.yaml? AFAIK, RocksDB's options need to set in source code if you need to special it. The default parallelism of the operator can be configured in flink-conf.yaml - >> 4. related to your RocksDB config. I see you are using "file:///tmp/rocksdb_simple_example/checkpoints" as the checkpoint directory, I'm not sure if it's accessible to all TMs. If yes, I think that is ok, and also I didn't see your checkpoint interval... BTW, you said you are using the {{r.getUNIQUE_KEY();}} as the key, I'm a bit curious about it's length in general. If it's too long and if you don't need an exactly result, you could use the {{r.getUNIQUE_KEY().hashCode();}} instead, that may also help to improve the performance. And in fact, I also agree with [~kkrugler] that this type of question is best asked in the user mail list, that way more people could take part in and you might also get more ideals from them. ;) > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9262) KeyedOneInputStreamOperatorTestHarness throws NPE creating snapshot
[ https://issues.apache.org/jira/browse/FLINK-9262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504725#comment-16504725 ] Aljoscha Krettek commented on FLINK-9262: - What dependencies do you have set in your pom? I think you need {code} org.apache.flink flink-test-utils-junit ${flink.version} org.mockito mockito-all 1.10.19 jar test org.apache.flink flink-streaming-java_2.11 ${flink.version} test test-jar org.apache.flink flink-runtime_2.11 ${flink.version} test test-jar {code} > KeyedOneInputStreamOperatorTestHarness throws NPE creating snapshot > --- > > Key: FLINK-9262 > URL: https://issues.apache.org/jira/browse/FLINK-9262 > Project: Flink > Issue Type: Bug > Components: Streaming, Tests >Affects Versions: 1.4.0 > Environment: macOS X High Sierra 10.13.4 > (ancient) Eclipse Luna v.4.4.1 > JRE System Library [Java SE 8 [1.8.0_131]] > Java 8 Update 171 build 11 >Reporter: Chris Schneider >Priority: Blocker > > Although KeyedOneInputStreamOperatorTestHarness and other > AbstractStreamOperatorTestHarness subclasses are not yet part of the public > Flink API, we have been trying to make use of them for unit testing our map > functions. The following code throws NPE from the attempt to collect a > snapshot on Flink 1.4.0 (even after applying [the > fix|https://github.com/apache/flink/pull/5193/commits/ba676d7de5536e32e0c48c3db511bec1758f4e80] > for FLINK-8268), but appears to work properly on Flink 1.5-SNAPSHOT: > {code:java} > package com.scaleunlimited.flinkcrawler.functions; > import org.apache.flink.api.common.functions.RichFlatMapFunction; > import org.apache.flink.api.common.typeinfo.BasicTypeInfo; > import org.apache.flink.api.java.functions.KeySelector; > import org.apache.flink.streaming.api.operators.StreamFlatMap; > import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; > import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; > import org.apache.flink.util.Collector; > import org.junit.Test; > public class FlinkIssueTest { > > @SuppressWarnings("serial") > private static class MyProcessFunction extends > RichFlatMapFunction { > @Override > public void flatMap(String input, Collector collector) throws > Exception { > collector.collect(input); > } > } > > @SuppressWarnings({ > "serial", "hiding" > }) > private static class MyKeySelector implements KeySelector String> { > @Override > public String getKey(String input) throws Exception { > return input; > } > } > @Test > public void test() throws Throwable { > KeyedOneInputStreamOperatorTestHarness > testHarness = > new KeyedOneInputStreamOperatorTestHarness String>( > new StreamFlatMap<>(new MyProcessFunction()), > new MyKeySelector(), > BasicTypeInfo.STRING_TYPE_INFO, > 1, > 1, > 0); > testHarness.setup(); > testHarness.open(); > > for (int i = 0; i < 10; i++) { > String urlString = String.format("https://domain-%d.com/page1;, > i); > testHarness.processElement(new StreamRecord<>(urlString)); > } > testHarness.snapshot(0L, 0L); > } > } > {code} > Output: > {noformat} > java.lang.Exception: Could not complete snapshot 0 for operator MockTask > (1/1). > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459) > at > com.scaleunlimited.flinkcrawler.functions.FlinkIssueTest.test(FlinkIssueTest.java:51) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at >
[jira] [Commented] (FLINK-9540) Apache Flink 1.4.2 S3 Hadoop library for Hadoop 2.7 is built for Hadoop 2.8 and fails
[ https://issues.apache.org/jira/browse/FLINK-9540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504723#comment-16504723 ] Razvan commented on FLINK-9540: --- Hi [~aljoscha], Not sure I understand the question :) So what I'm suggesting is flink-s3-fs-hadoop-1.4.2.jar delivered as built for Hadoop 2.7 is actually built for Hadoop 2.8 and Apache Flink cluster won't start with error mentioned above. These jars require a lot of other jars (which list is not accurate in documentation btw) to enable Apache Flink to work with S3. > Apache Flink 1.4.2 S3 Hadoop library for Hadoop 2.7 is built for Hadoop 2.8 > and fails > - > > Key: FLINK-9540 > URL: https://issues.apache.org/jira/browse/FLINK-9540 > Project: Flink > Issue Type: Bug >Reporter: Razvan >Priority: Blocker > > Download Apache Flink 1.4.2 for Hadoop 2.7 or earlier. Go to opt/ inside > flink-s3-fs-hadoop-1.4.2.jar, open common version info .properties inside you > get: > > # > # Licensed to the Apache Software Foundation (ASF) under one > # or more contributor license agreements. See the NOTICE file > # distributed with this work for additional information > # regarding copyright ownership. The ASF licenses this file > # to you under the Apache License, Version 2.0 (the > # "License"); you may not use this file except in compliance > # with the License. You may obtain a copy of the License at > # > # [http://www.apache.org/licenses/LICENSE-2.0] > # > # Unless required by applicable law or agreed to in writing, software > # distributed under the License is distributed on an "AS IS" BASIS, > # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > # See the License for the specific language governing permissions and > # limitations under the License. > # > *version=2.8.1* > revision=1e6296df38f9cd3d9581c8af58a2a03a6e4312be > *branch=branch-2.8.1-private* > user=vinodkv > date=2017-06-07T21:22Z > *url=[https://git-wip-us.apache.org/repos/asf/hadoop.git*] > srcChecksum=60125541c2b3e266cbf3becc5bda666 > protocVersion=2.5.0 > > This will fail to work with dependencies described in > ([https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html)] > > "Depending on which file system you use, please add the following > dependencies. You can find these as part of the Hadoop binaries in > {{hadoop-2.7/share/hadoop/tools/lib}}: > * {{S3AFileSystem}}: > ** {{hadoop-aws-2.7.3.jar}} > ** {{aws-java-sdk-s3-1.11.183.jar}} and its dependencies: > *** {{aws-java-sdk-core-1.11.183.jar}} > *** {{aws-java-sdk-kms-1.11.183.jar}} > *** {{jackson-annotations-2.6.7.jar}} > *** {{jackson-core-2.6.7.jar}} > *** {{jackson-databind-2.6.7.jar}} > *** {{joda-time-2.8.1.jar}} > *** {{httpcore-4.4.4.jar}} > *** {{httpclient-4.5.3.jar}} > * {{NativeS3FileSystem}}: > ** {{hadoop-aws-2.7.3.jar}} > ** {{guava-11.0.2.jar"}}{{}} > > {{ I presume the build task is flawed, it should be built for Apache Hadoop > 2.7 can someone please check it?}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields
[ https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504685#comment-16504685 ] ASF GitHub Bot commented on FLINK-9444: --- Github user tragicjun commented on the issue: https://github.com/apache/flink/pull/6082 The latest commit uses **Types.OBJECT_ARRAY** to map Avro array type. Hence, Avro **GenericData.Array** has to be converted into regular java arrays back (see **AvroRowSerializationSchema**) and forth(see **AvroRowDeserializationSchema**). Moreover, nested record within Avro map/array is also supported. The unit tests and my local integration tests have passed. Would you please review? @fhueske @twalthr @suez1224 > KafkaAvroTableSource failed to work for map and array fields > > > Key: FLINK-9444 > URL: https://issues.apache.org/jira/browse/FLINK-9444 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API SQL >Affects Versions: 1.6.0 >Reporter: Jun Zhang >Priority: Blocker > Labels: patch > Fix For: 1.6.0 > > Attachments: flink-9444.patch > > > When some Avro schema has map/array fields and the corresponding TableSchema > declares *MapTypeInfo/ListTypeInfo* for these fields, an exception will be > thrown when registering the *KafkaAvroTableSource*, complaining like: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Type Map of table field 'event' does not match with type > GenericType of the field 'event' of the TableSource return > type. > at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74) > at > org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92) > at > org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71) > at > org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33) > at > org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:124) > at > org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:438) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...
Github user tragicjun commented on the issue: https://github.com/apache/flink/pull/6082 The latest commit uses **Types.OBJECT_ARRAY** to map Avro array type. Hence, Avro **GenericData.Array** has to be converted into regular java arrays back (see **AvroRowSerializationSchema**) and forth(see **AvroRowDeserializationSchema**). Moreover, nested record within Avro map/array is also supported. The unit tests and my local integration tests have passed. Would you please review? @fhueske @twalthr @suez1224 ---
[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504675#comment-16504675 ] swy commented on FLINK-9506: [~sihuazhou] your idea is brilliant, but the first test result is not show too much of change surprisingly. Let's us do more test to confirm. But thank you! Do you mind to answer my questions above regarding to Rocksdb setup? I believe it is crucial in this performance test. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9503) Migrate integration tests for iterative aggregators
[ https://issues.apache.org/jira/browse/FLINK-9503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504535#comment-16504535 ] ASF GitHub Bot commented on FLINK-9503: --- Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6129#discussion_r193707442 --- Diff: flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java --- @@ -170,12 +162,12 @@ public void testAggregatorWithParameterForIterate() throws Exception { new NegativeElementsConvergenceCriterion()); DataSet updatedDs = iteration.map(new SubtractOneMapWithParam()); - iteration.closeWith(updatedDs).writeAsText(resultPath); - env.execute(); + List result = iteration.closeWith(updatedDs).collect(); + Collections.sort(result); --- End diff -- @zentol the result need be sorted, otherwise the test would report error, because the result is not ordered, print this : ``` java.lang.AssertionError: Expected :[1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5, 5] Actual :[4, 2, 3, 5, 2, 5, 5, 5, 1, 3, 4, 4, 3, 5, 4] ``` > Migrate integration tests for iterative aggregators > --- > > Key: FLINK-9503 > URL: https://issues.apache.org/jira/browse/FLINK-9503 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Deepak Sharma >Assignee: vinoyang >Priority: Minor > > Migrate integration tests in org.apache.flink.test.iterative.aggregators to > use collect() instead of temp files. Related to parent jira. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6129: [FLINK-9503] Migrate integration tests for iterati...
Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6129#discussion_r193707442 --- Diff: flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java --- @@ -170,12 +162,12 @@ public void testAggregatorWithParameterForIterate() throws Exception { new NegativeElementsConvergenceCriterion()); DataSet updatedDs = iteration.map(new SubtractOneMapWithParam()); - iteration.closeWith(updatedDs).writeAsText(resultPath); - env.execute(); + List result = iteration.closeWith(updatedDs).collect(); + Collections.sort(result); --- End diff -- @zentol the result need be sorted, otherwise the test would report error, because the result is not ordered, print this : ``` java.lang.AssertionError: Expected :[1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5, 5] Actual :[4, 2, 3, 5, 2, 5, 5, 5, 1, 3, 4, 4, 3, 5, 4] ``` ---
[jira] [Commented] (FLINK-9503) Migrate integration tests for iterative aggregators
[ https://issues.apache.org/jira/browse/FLINK-9503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504518#comment-16504518 ] ASF GitHub Bot commented on FLINK-9503: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6129#discussion_r193702401 --- Diff: flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java --- @@ -170,12 +162,12 @@ public void testAggregatorWithParameterForIterate() throws Exception { new NegativeElementsConvergenceCriterion()); DataSet updatedDs = iteration.map(new SubtractOneMapWithParam()); - iteration.closeWith(updatedDs).writeAsText(resultPath); - env.execute(); + List result = iteration.closeWith(updatedDs).collect(); + Collections.sort(result); --- End diff -- still some sort calls left. > Migrate integration tests for iterative aggregators > --- > > Key: FLINK-9503 > URL: https://issues.apache.org/jira/browse/FLINK-9503 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Deepak Sharma >Assignee: vinoyang >Priority: Minor > > Migrate integration tests in org.apache.flink.test.iterative.aggregators to > use collect() instead of temp files. Related to parent jira. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6129: [FLINK-9503] Migrate integration tests for iterati...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6129#discussion_r193702401 --- Diff: flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java --- @@ -170,12 +162,12 @@ public void testAggregatorWithParameterForIterate() throws Exception { new NegativeElementsConvergenceCriterion()); DataSet updatedDs = iteration.map(new SubtractOneMapWithParam()); - iteration.closeWith(updatedDs).writeAsText(resultPath); - env.execute(); + List result = iteration.closeWith(updatedDs).collect(); + Collections.sort(result); --- End diff -- still some sort calls left. ---
[jira] [Commented] (FLINK-9503) Migrate integration tests for iterative aggregators
[ https://issues.apache.org/jira/browse/FLINK-9503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504515#comment-16504515 ] ASF GitHub Bot commented on FLINK-9503: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6129#discussion_r193701993 --- Diff: flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java --- @@ -117,7 +107,9 @@ public boolean filter(Long value) throws Exception { } }).withBroadcastSet(solution, "SOLUTION")).output(new DiscardingOutputFormat()); env.execute(); - expected = testString; // this will be a useless verification now. + String expected = testString; // this will be a useless verification now. + + compareResultsByLinesInMemory(expected, resultPath); --- End diff -- this is unnecessary and can be removed as the actual check is done in the function. > Migrate integration tests for iterative aggregators > --- > > Key: FLINK-9503 > URL: https://issues.apache.org/jira/browse/FLINK-9503 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Deepak Sharma >Assignee: vinoyang >Priority: Minor > > Migrate integration tests in org.apache.flink.test.iterative.aggregators to > use collect() instead of temp files. Related to parent jira. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6129: [FLINK-9503] Migrate integration tests for iterati...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6129#discussion_r193701993 --- Diff: flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java --- @@ -117,7 +107,9 @@ public boolean filter(Long value) throws Exception { } }).withBroadcastSet(solution, "SOLUTION")).output(new DiscardingOutputFormat()); env.execute(); - expected = testString; // this will be a useless verification now. + String expected = testString; // this will be a useless verification now. + + compareResultsByLinesInMemory(expected, resultPath); --- End diff -- this is unnecessary and can be removed as the actual check is done in the function. ---
[jira] [Commented] (FLINK-9538) Make KeyedStateFunction an interface
[ https://issues.apache.org/jira/browse/FLINK-9538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504512#comment-16504512 ] ASF GitHub Bot commented on FLINK-9538: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6134 cc @dawidwys > Make KeyedStateFunction an interface > > > Key: FLINK-9538 > URL: https://issues.apache.org/jira/browse/FLINK-9538 > Project: Flink > Issue Type: Improvement >Reporter: Dawid Wysakowicz >Assignee: vinoyang >Priority: Major > > I suggest to change the KeyedStateFunction from abstract class to interface > (FunctionalInterface in particular) to enable passing lambdas. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6134: [FLINK-9538] Make KeyedStateFunction an interface
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6134 cc @dawidwys ---
[jira] [Commented] (FLINK-9528) Incorrect results: Filter does not treat Upsert messages correctly.
[ https://issues.apache.org/jira/browse/FLINK-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504489#comment-16504489 ] Fabian Hueske commented on FLINK-9528: -- True, highly-selective filters would definitely be an issue. However, most operators could handle duplicates quite well and would not need to have them filtered because they have the necessary state anyway. Only sinks are affected by this. Why not add a duplicate filter for upsert sinks? This would keep the filter stateless and efficient. The duplicate filter for the upsert sink would need a boolean flag per key, and the number of keys depends on the selectivity of the filter, i.e., the size of the state is proportional to the gains. Later we could improve it by using a probabilistic filter (proposed in FLINK-8601). > Incorrect results: Filter does not treat Upsert messages correctly. > --- > > Key: FLINK-9528 > URL: https://issues.apache.org/jira/browse/FLINK-9528 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.3.3, 1.5.0, 1.4.2 >Reporter: Fabian Hueske >Assignee: Hequn Cheng >Priority: Critical > > Currently, Filters (i.e., Calcs with predicates) do not distinguish between > retraction and upsert mode. A Calc looks at record (regardless of its update > semantics) and either discard it (predicate evaluates to false) or pass it on > (predicate evaluates to true). > This works fine for messages with retraction semantics but is not correct for > upsert messages. > The following test case (can be pasted into {{TableSinkITCase}}) shows the > problem: > {code:java} > @Test > def testUpsertsWithFilter(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.getConfig.enableObjectReuse() > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > val tEnv = TableEnvironment.getTableEnvironment(env) > val t = StreamTestData.get3TupleDataStream(env) > .assignAscendingTimestamps(_._1.toLong) > .toTable(tEnv, 'id, 'num, 'text) > t.select('text.charLength() as 'len) > .groupBy('len) > .select('len, 'len.count as 'cnt) > // .where('cnt < 7) > .writeToSink(new TestUpsertSink(Array("len"), false)) > env.execute() > val results = RowCollector.getAndClearValues > val retracted = RowCollector.upsertResults(results, Array(0)).sorted > val expectedWithoutFilter = List( > "2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted > val expectedWithFilter = List( > "2,1", "5,1", "11,1", "14,1", "25,1").sorted > assertEquals(expectedWithoutFilter, retracted) > // assertEquals(expectedWithFilter, retracted) > } > {code} > When we add a filter on the aggregation result, we would expect that all rows > that do not fulfill the condition are removed from the result. However, the > filter only removes the upsert message such that the previous version remains > in the result. > One solution could be to make a filter aware of the update semantics (retract > or upsert) and convert the upsert message into a delete message if the > predicate evaluates to false. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9528) Incorrect results: Filter does not treat Upsert messages correctly.
[ https://issues.apache.org/jira/browse/FLINK-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504471#comment-16504471 ] Hequn Cheng commented on FLINK-9528: Hi, [~fhueske]. Considering the anti-spamming scenario, users probably just want to get top 1% data from the result of group by, the rest 99% of the data will all become delete messages after the {{Filter}}. This would be a disaster for a storage. Especailly for the case, most of the coming keys are new ones and can not be swallowed by a cache. Adding a GroupByHaving operator is a good way but this only cover the case that filter can be pushed down into group by. If a filter can not be pushed down, we can turn the upsert stream into a retract stream to ensure correctness(or throw exception and inform user to use {{RetractTableSink}}). What do you think? > Incorrect results: Filter does not treat Upsert messages correctly. > --- > > Key: FLINK-9528 > URL: https://issues.apache.org/jira/browse/FLINK-9528 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Affects Versions: 1.3.3, 1.5.0, 1.4.2 >Reporter: Fabian Hueske >Assignee: Hequn Cheng >Priority: Critical > > Currently, Filters (i.e., Calcs with predicates) do not distinguish between > retraction and upsert mode. A Calc looks at record (regardless of its update > semantics) and either discard it (predicate evaluates to false) or pass it on > (predicate evaluates to true). > This works fine for messages with retraction semantics but is not correct for > upsert messages. > The following test case (can be pasted into {{TableSinkITCase}}) shows the > problem: > {code:java} > @Test > def testUpsertsWithFilter(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.getConfig.enableObjectReuse() > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > val tEnv = TableEnvironment.getTableEnvironment(env) > val t = StreamTestData.get3TupleDataStream(env) > .assignAscendingTimestamps(_._1.toLong) > .toTable(tEnv, 'id, 'num, 'text) > t.select('text.charLength() as 'len) > .groupBy('len) > .select('len, 'len.count as 'cnt) > // .where('cnt < 7) > .writeToSink(new TestUpsertSink(Array("len"), false)) > env.execute() > val results = RowCollector.getAndClearValues > val retracted = RowCollector.upsertResults(results, Array(0)).sorted > val expectedWithoutFilter = List( > "2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted > val expectedWithFilter = List( > "2,1", "5,1", "11,1", "14,1", "25,1").sorted > assertEquals(expectedWithoutFilter, retracted) > // assertEquals(expectedWithFilter, retracted) > } > {code} > When we add a filter on the aggregation result, we would expect that all rows > that do not fulfill the condition are removed from the result. However, the > filter only removes the upsert message such that the previous version remains > in the result. > One solution could be to make a filter aware of the update semantics (retract > or upsert) and convert the upsert message into a delete message if the > predicate evaluates to false. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9545) Support read a file multiple times in Flink DataStream
[ https://issues.apache.org/jira/browse/FLINK-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504452#comment-16504452 ] ASF GitHub Bot commented on FLINK-9545: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/6130 I posted on the Jira issue: https://issues.apache.org/jira/browse/FLINK-9545?focusedCommentId=16504451=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16504451: What's the motivation for this feature? > Support read a file multiple times in Flink DataStream > --- > > Key: FLINK-9545 > URL: https://issues.apache.org/jira/browse/FLINK-9545 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.6.0 > > > we need {{StreamExecutionEnvironment.readFile/readTextFile}} to read each > file for N times, but currently it only supports reading file once. > add support for the feature. > Plan: > add a new processing mode as PROCESSING_N_TIMES, and add additional parameter > {{numTimes}} for {{StreamExecutionEnvironment.readFile/readTextFile}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop
[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503505#comment-16503505 ] Sihua Zhou edited comment on FLINK-9506 at 6/7/18 9:25 AM: --- [~yow] Maybe there is one more optimization that could have a try, I see you are using the ReduceState in your code just to accumulate the `record.getInt("I_PRIMARY_UNITS")` and collect the result in `onTimer()`. For the ReduceState it works as follows: - get the "old result" from RocksDB. - reduce the "old result" with the input, and put the "new result" back to RocksDB. that means for input record in processElement(), it needs to do a `get` and a `put` to RocksDB. And the `get` cost much more then `put`. I would suggest to use the ListState instead. With using ListState, what you need to do are: - Performing {{ListState.add(record)}} in {{processElement()}}, since the `ListState.add()` is cheap as it only put the record into Rocks. - Performing reducing in {{OnTimer()}}, the reducing might look as follow: {code:java} List< JSONObject> records = listState.get(); for (JSonObject jsonObj : records) { // do accumulation } out.collect(result); {code} In this way, for every key every second, you only need to do one read operation of RocksDB. was (Author: sihuazhou): [~yow] Maybe there is one more optimization that could have a try, I see you are using the ReduceState in your code just to accumulate the `record.getInt("I_PRIMARY_UNITS")` and collect the result in `onTimer()`. For the ReduceState it works as follows: - get the "old result" from RocksDB. - reduce the "old result" with the input, and put the "new result" back to RocksDB. that means for input record in processElement(), it needs to do a `get` and a `put` to RocksDB. And the `get` cost much more then `put`. I would suggest to use the ListState instead. With using ListState, what you need to do are: - Performing {{ListState.add(record)}} in {{processElement()}}, since the `ListState.add()` is cheap as it not put the record into Rocks. - Performing reducing in {{OnTimer()}}, the reducing might look as follow: {code:java} List< JSONObject> records = listState.get(); for (JSonObject jsonObj : records) { // do accumulation } out.collect(result); {code} In this way, for every key very seconds, you only need to do one read operation of RocksDB. > Flink ReducingState.add causing more than 100% performance drop > --- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.4.2 >Reporter: swy >Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) >.keyBy() >.process(new ProcessAggregation()) >.map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6130: [FLINK-9545] Support read a file multiple times in Flink ...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/6130 I posted on the Jira issue: https://issues.apache.org/jira/browse/FLINK-9545?focusedCommentId=16504451=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16504451: What's the motivation for this feature? ---
[jira] [Commented] (FLINK-9545) Support read a file multiple times in Flink DataStream
[ https://issues.apache.org/jira/browse/FLINK-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504451#comment-16504451 ] Aljoscha Krettek commented on FLINK-9545: - [~phoenixjiangnan] What is the motivation for this feature? > Support read a file multiple times in Flink DataStream > --- > > Key: FLINK-9545 > URL: https://issues.apache.org/jira/browse/FLINK-9545 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.6.0 > > > we need {{StreamExecutionEnvironment.readFile/readTextFile}} to read each > file for N times, but currently it only supports reading file once. > add support for the feature. > Plan: > add a new processing mode as PROCESSING_N_TIMES, and add additional parameter > {{numTimes}} for {{StreamExecutionEnvironment.readFile/readTextFile}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8067) User code ClassLoader not set before calling ProcessingTimeCallback
[ https://issues.apache.org/jira/browse/FLINK-8067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-8067: Fix Version/s: 1.6.0 > User code ClassLoader not set before calling ProcessingTimeCallback > --- > > Key: FLINK-8067 > URL: https://issues.apache.org/jira/browse/FLINK-8067 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.4.0 >Reporter: Gary Yao >Assignee: vinoyang >Priority: Minor > Fix For: 1.6.0, 1.5.1 > > > The user code ClassLoader is not set as the context ClassLoader for the > thread invoking {{ProcessingTimeCallback#onProcessingTime(long timestamp)}}: > https://github.com/apache/flink/blob/84a07a34ac22af14f2dd0319447ca5f45de6d0bb/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L222 > This is problematic, for example, if user code dynamically loads classes in > {{ProcessFunction#onTimer(long timestamp, OnTimerContext ctx, Collector > out)}} using the current thread's context ClassLoader (also see FLINK-8005). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9482) Not applicable functions for TIME
[ https://issues.apache.org/jira/browse/FLINK-9482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504439#comment-16504439 ] ASF GitHub Bot commented on FLINK-9482: --- Github user bioker commented on a diff in the pull request: https://github.com/apache/flink/pull/6121#discussion_r193677135 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala --- @@ -98,16 +98,45 @@ class ScalarFunctionsValidationTest extends ScalarTypesTestBase { testSqlApi("TIMESTAMPADD(YEAR, 1.0, timestamp '2016-02-24 12:42:25')", "2016-06-16") } - @Test(expected = classOf[CodeGenException]) + @Test(expected = classOf[ValidationException]) def testDOWWithTimeWhichIsUnsupported(): Unit = { testSqlApi("EXTRACT(DOW FROM TIME '12:42:25')", "0") } - @Test(expected = classOf[CodeGenException]) + @Test(expected = classOf[ValidationException]) def testDOYWithTimeWhichIsUnsupported(): Unit = { testSqlApi("EXTRACT(DOY FROM TIME '12:42:25')", "0") } + def testExtractFromTimeZeroResult(unit: TimeUnit): Unit = { --- End diff -- Fixed > Not applicable functions for TIME > - > > Key: FLINK-9482 > URL: https://issues.apache.org/jira/browse/FLINK-9482 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: Viktor Vlasov >Assignee: Viktor Vlasov >Priority: Minor > > Due to work on https://issues.apache.org/jira/browse/FLINK-9432 I have faced > with question how to check DECADE function with tests in > _org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala._ > > Because I have used CENTURY function as an example, first of all I have check > it. During the test I figured out that when we use it with TIME it returns 0. > I suppose arguments for such functions (also it works for YEAR, MONTH, > MILLENNIUM, etc) need to be checked and throw some exception if type is not > suitable. > As an example, in Apache Calcite project (checked in sqlline shell), when I > am trying to use CENTURY with TIME it throw: > {code:java} > java.lang.AssertionError: unexpected TIME > {code} > Need to determine, why such check is not exists and add it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6121: [FLINK-9482] [table] EXTRACT function argument val...
Github user bioker commented on a diff in the pull request: https://github.com/apache/flink/pull/6121#discussion_r193677135 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala --- @@ -98,16 +98,45 @@ class ScalarFunctionsValidationTest extends ScalarTypesTestBase { testSqlApi("TIMESTAMPADD(YEAR, 1.0, timestamp '2016-02-24 12:42:25')", "2016-06-16") } - @Test(expected = classOf[CodeGenException]) + @Test(expected = classOf[ValidationException]) def testDOWWithTimeWhichIsUnsupported(): Unit = { testSqlApi("EXTRACT(DOW FROM TIME '12:42:25')", "0") } - @Test(expected = classOf[CodeGenException]) + @Test(expected = classOf[ValidationException]) def testDOYWithTimeWhichIsUnsupported(): Unit = { testSqlApi("EXTRACT(DOY FROM TIME '12:42:25')", "0") } + def testExtractFromTimeZeroResult(unit: TimeUnit): Unit = { --- End diff -- Fixed ---
[jira] [Commented] (FLINK-9546) The heartbeatTimeoutIntervalMs of HeartbeatMonitor should be larger than 0
[ https://issues.apache.org/jira/browse/FLINK-9546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504408#comment-16504408 ] ASF GitHub Bot commented on FLINK-9546: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6135 cc @tillrohrmann > The heartbeatTimeoutIntervalMs of HeartbeatMonitor should be larger than 0 > -- > > Key: FLINK-9546 > URL: https://issues.apache.org/jira/browse/FLINK-9546 > Project: Flink > Issue Type: Bug > Components: Core >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Minor > > The heartbeatTimeoutIntervalMs of HeartbeatMonitor should be larger than 0, > currently the arg check looks like > {code:java} > Preconditions.checkArgument(heartbeatTimeoutIntervalMs >= 0L, "The heartbeat > timeout interval has to be larger than 0."); > {code} > it should be > {code:java} > Preconditions.checkArgument(heartbeatTimeoutIntervalMs > 0L, "The heartbeat > timeout interval has to be larger than 0."); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6135: [FLINK-9546][core] Fix the checking of heartbeatTi...
GitHub user sihuazhou opened a pull request: https://github.com/apache/flink/pull/6135 [FLINK-9546][core] Fix the checking of heartbeatTimeoutIntervalMs in HeartbeatMonitor. ## What is the purpose of the change The heartbeatTimeoutIntervalMs of HeartbeatMonitor should be larger than 0, currently the arg check looks like ```java Preconditions.checkArgument(heartbeatTimeoutIntervalMs >= 0L, "The heartbeat timeout interval has to be larger than 0."); ``` it should be ```java Preconditions.checkArgument(heartbeatTimeoutIntervalMs > 0L, "The heartbeat timeout interval has to be larger than 0."); ``` ## Brief change log - *Fix the checking of heartbeatTimeoutIntervalMs in HeartbeatMonitor.* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: No ## Documentation No You can merge this pull request into a Git repository by running: $ git pull https://github.com/sihuazhou/flink FLINK-9546 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6135.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6135 commit 5b7fab68afa434014308f2994a4ece89681c596d Author: sihuazhou Date: 2018-06-07T08:26:20Z fix the checking of heartbeatTimeoutIntervalMs in HeartbeatMonitor. ---
[jira] [Commented] (FLINK-9546) The heartbeatTimeoutIntervalMs of HeartbeatMonitor should be larger than 0
[ https://issues.apache.org/jira/browse/FLINK-9546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504407#comment-16504407 ] ASF GitHub Bot commented on FLINK-9546: --- GitHub user sihuazhou opened a pull request: https://github.com/apache/flink/pull/6135 [FLINK-9546][core] Fix the checking of heartbeatTimeoutIntervalMs in HeartbeatMonitor. ## What is the purpose of the change The heartbeatTimeoutIntervalMs of HeartbeatMonitor should be larger than 0, currently the arg check looks like ```java Preconditions.checkArgument(heartbeatTimeoutIntervalMs >= 0L, "The heartbeat timeout interval has to be larger than 0."); ``` it should be ```java Preconditions.checkArgument(heartbeatTimeoutIntervalMs > 0L, "The heartbeat timeout interval has to be larger than 0."); ``` ## Brief change log - *Fix the checking of heartbeatTimeoutIntervalMs in HeartbeatMonitor.* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: No ## Documentation No You can merge this pull request into a Git repository by running: $ git pull https://github.com/sihuazhou/flink FLINK-9546 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6135.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6135 commit 5b7fab68afa434014308f2994a4ece89681c596d Author: sihuazhou Date: 2018-06-07T08:26:20Z fix the checking of heartbeatTimeoutIntervalMs in HeartbeatMonitor. > The heartbeatTimeoutIntervalMs of HeartbeatMonitor should be larger than 0 > -- > > Key: FLINK-9546 > URL: https://issues.apache.org/jira/browse/FLINK-9546 > Project: Flink > Issue Type: Bug > Components: Core >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Minor > > The heartbeatTimeoutIntervalMs of HeartbeatMonitor should be larger than 0, > currently the arg check looks like > {code:java} > Preconditions.checkArgument(heartbeatTimeoutIntervalMs >= 0L, "The heartbeat > timeout interval has to be larger than 0."); > {code} > it should be > {code:java} > Preconditions.checkArgument(heartbeatTimeoutIntervalMs > 0L, "The heartbeat > timeout interval has to be larger than 0."); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6135: [FLINK-9546][core] Fix the checking of heartbeatTimeoutIn...
Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6135 cc @tillrohrmann ---
[jira] [Commented] (FLINK-9498) Disable dependency convergence for "flink-end-to-end-tests"
[ https://issues.apache.org/jira/browse/FLINK-9498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504406#comment-16504406 ] ASF GitHub Bot commented on FLINK-9498: --- Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/6116 I think the problem here is not that some of your versions are conflicting with flink, but that your dependencies are on their own conflicting. When I check: > mvn -Dhadoop.version=2.7.0 dependency:tree -pl flink-shaded-hadoop/flink-shaded-hadoop2 I do not see any `libthrift` references, and only `commons-lang` comes from hadoop itself. You should probably ask the guys maintaining your hadoop `2.7.0-xxx` to converge those dependencies (for example pin `commons-lang` to 2.6 and `libthrift` to 0.9), otherwise bad things can happen randomly. I'm not sure how could the solution on Flink side look like. @zentol maybe as we discussed previously, we could provide build profiles for some set of supported by us hadoop versions. But is there a mechanism for user to provide a custom build profile for his own custom hadoop version that solves the convergence in `flink-shaded-hadoop2`? > Disable dependency convergence for "flink-end-to-end-tests" > --- > > Key: FLINK-9498 > URL: https://issues.apache.org/jira/browse/FLINK-9498 > Project: Flink > Issue Type: Sub-task > Components: Build System >Reporter: Hai Zhou >Assignee: Hai Zhou >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6116: [FLINK-9498][build] Disable dependency convergence for fl...
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/6116 I think the problem here is not that some of your versions are conflicting with flink, but that your dependencies are on their own conflicting. When I check: > mvn -Dhadoop.version=2.7.0 dependency:tree -pl flink-shaded-hadoop/flink-shaded-hadoop2 I do not see any `libthrift` references, and only `commons-lang` comes from hadoop itself. You should probably ask the guys maintaining your hadoop `2.7.0-xxx` to converge those dependencies (for example pin `commons-lang` to 2.6 and `libthrift` to 0.9), otherwise bad things can happen randomly. I'm not sure how could the solution on Flink side look like. @zentol maybe as we discussed previously, we could provide build profiles for some set of supported by us hadoop versions. But is there a mechanism for user to provide a custom build profile for his own custom hadoop version that solves the convergence in `flink-shaded-hadoop2`? ---
[jira] [Created] (FLINK-9546) The heartbeatTimeoutIntervalMs of HeartbeatMonitor should be larger than 0
Sihua Zhou created FLINK-9546: - Summary: The heartbeatTimeoutIntervalMs of HeartbeatMonitor should be larger than 0 Key: FLINK-9546 URL: https://issues.apache.org/jira/browse/FLINK-9546 Project: Flink Issue Type: Bug Components: Core Reporter: Sihua Zhou Assignee: Sihua Zhou The heartbeatTimeoutIntervalMs of HeartbeatMonitor should be larger than 0, currently the arg check looks like {code:java} Preconditions.checkArgument(heartbeatTimeoutIntervalMs >= 0L, "The heartbeat timeout interval has to be larger than 0."); {code} it should be {code:java} Preconditions.checkArgument(heartbeatTimeoutIntervalMs > 0L, "The heartbeat timeout interval has to be larger than 0."); {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9482) Not applicable functions for TIME
[ https://issues.apache.org/jira/browse/FLINK-9482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504403#comment-16504403 ] ASF GitHub Bot commented on FLINK-9482: --- Github user bioker commented on a diff in the pull request: https://github.com/apache/flink/pull/6121#discussion_r193663108 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala --- @@ -98,16 +98,45 @@ class ScalarFunctionsValidationTest extends ScalarTypesTestBase { testSqlApi("TIMESTAMPADD(YEAR, 1.0, timestamp '2016-02-24 12:42:25')", "2016-06-16") } - @Test(expected = classOf[CodeGenException]) + @Test(expected = classOf[ValidationException]) def testDOWWithTimeWhichIsUnsupported(): Unit = { testSqlApi("EXTRACT(DOW FROM TIME '12:42:25')", "0") } - @Test(expected = classOf[CodeGenException]) + @Test(expected = classOf[ValidationException]) def testDOYWithTimeWhichIsUnsupported(): Unit = { testSqlApi("EXTRACT(DOY FROM TIME '12:42:25')", "0") } + def testExtractFromTimeZeroResult(unit: TimeUnit): Unit = { --- End diff -- yes, sure, haven't noticed at the beginning > Not applicable functions for TIME > - > > Key: FLINK-9482 > URL: https://issues.apache.org/jira/browse/FLINK-9482 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: Viktor Vlasov >Assignee: Viktor Vlasov >Priority: Minor > > Due to work on https://issues.apache.org/jira/browse/FLINK-9432 I have faced > with question how to check DECADE function with tests in > _org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala._ > > Because I have used CENTURY function as an example, first of all I have check > it. During the test I figured out that when we use it with TIME it returns 0. > I suppose arguments for such functions (also it works for YEAR, MONTH, > MILLENNIUM, etc) need to be checked and throw some exception if type is not > suitable. > As an example, in Apache Calcite project (checked in sqlline shell), when I > am trying to use CENTURY with TIME it throw: > {code:java} > java.lang.AssertionError: unexpected TIME > {code} > Need to determine, why such check is not exists and add it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6121: [FLINK-9482] [table] EXTRACT function argument val...
Github user bioker commented on a diff in the pull request: https://github.com/apache/flink/pull/6121#discussion_r193663108 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala --- @@ -98,16 +98,45 @@ class ScalarFunctionsValidationTest extends ScalarTypesTestBase { testSqlApi("TIMESTAMPADD(YEAR, 1.0, timestamp '2016-02-24 12:42:25')", "2016-06-16") } - @Test(expected = classOf[CodeGenException]) + @Test(expected = classOf[ValidationException]) def testDOWWithTimeWhichIsUnsupported(): Unit = { testSqlApi("EXTRACT(DOW FROM TIME '12:42:25')", "0") } - @Test(expected = classOf[CodeGenException]) + @Test(expected = classOf[ValidationException]) def testDOYWithTimeWhichIsUnsupported(): Unit = { testSqlApi("EXTRACT(DOY FROM TIME '12:42:25')", "0") } + def testExtractFromTimeZeroResult(unit: TimeUnit): Unit = { --- End diff -- yes, sure, haven't noticed at the beginning ---
[GitHub] flink issue #6131: [hotfix][docs] Fix Table API scala example code
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6131 Hi @zjffdu, thanks for the fix! +1 to merge ---