[jira] [Assigned] (FLINK-4207) WindowOperator becomes very slow with allowed lateness
[ https://issues.apache.org/jira/browse/FLINK-4207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas reassigned FLINK-4207: - Assignee: Kostas Kloudas > WindowOperator becomes very slow with allowed lateness > -- > > Key: FLINK-4207 > URL: https://issues.apache.org/jira/browse/FLINK-4207 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas >Priority: Blocker > > In this simple example the throughput (as measured by the count the window > emits) becomes very low when an allowed lateness is set: > {code} > public class WindowWordCount { > public static void main(String[] args) throws Exception { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); > env.setParallelism(1); > env.addSource(new InfiniteTupleSource(100_000)) > .keyBy(0) > .timeWindow(Time.seconds(3)) > .allowedLateness(Time.seconds(1)) > .reduce(new ReduceFunctionInteger>>() { > @Override > public Tuple2 > reduce(Tuple2 value1, > Tuple2 > value2) throws Exception { > return Tuple2.of(value1.f0, > value1.f1 + value2.f1); > } > }) > .filter(new FilterFunction Integer>>() { > private static final long > serialVersionUID = 1L; > @Override > public boolean filter(Tuple2 Integer> value) throws Exception { > return > value.f0.startsWith("Tuple 0"); > } > }) > .print(); > // execute program > env.execute("WindowWordCount"); > } > public static class InfiniteTupleSource implements > ParallelSourceFunction > { > private static final long serialVersionUID = 1L; > private int numGroups; > public InfiniteTupleSource(int numGroups) { > this.numGroups = numGroups; > } > @Override > public void run(SourceContext > out) > throws Exception { > long index = 0; > while (true) { > Tuple2 tuple = new > Tuple2<>("Tuple " + (index % numGroups), 1); > out.collect(tuple); > index++; > } > } > @Override > public void cancel() { > } > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4194) KinesisDeserializationSchema.isEndOfStream() is never called
[ https://issues.apache.org/jira/browse/FLINK-4194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai reassigned FLINK-4194: -- Assignee: Tzu-Li (Gordon) Tai > KinesisDeserializationSchema.isEndOfStream() is never called > > > Key: FLINK-4194 > URL: https://issues.apache.org/jira/browse/FLINK-4194 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector >Affects Versions: 1.1.0 >Reporter: Robert Metzger >Assignee: Tzu-Li (Gordon) Tai > > The Kinesis connector does not respect the > {{KinesisDeserializationSchema.isEndOfStream()}} method. > The purpose of this method is to stop consuming from a source, based on input > data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4194) KinesisDeserializationSchema.isEndOfStream() is never called
[ https://issues.apache.org/jira/browse/FLINK-4194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378829#comment-15378829 ] Tzu-Li (Gordon) Tai commented on FLINK-4194: Just curious, was there any obstacles for the {{FlinkKafkaConsumer08}} so that it had to be implemented differently? > KinesisDeserializationSchema.isEndOfStream() is never called > > > Key: FLINK-4194 > URL: https://issues.apache.org/jira/browse/FLINK-4194 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector >Affects Versions: 1.1.0 >Reporter: Robert Metzger > > The Kinesis connector does not respect the > {{KinesisDeserializationSchema.isEndOfStream()}} method. > The purpose of this method is to stop consuming from a source, based on input > data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.
[ https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378685#comment-15378685 ] ASF GitHub Bot commented on FLINK-1502: --- Github user sumitchawla commented on the issue: https://github.com/apache/flink/pull/1947 hi @zentol .. thanks for the information. This is great and i could see the metrics in JMX. I have one more question on interoperability of Accumulators and Metrics. As per my understanding, currently Metrics are only available at system level , and User Accumulators are available to Job writers. Is there any plan for supporting Metrics for Job writers? Metrics give much more capabilities than current Accumulators and would be a great way to track custom metrics at each operator level? > Expose metrics to graphite, ganglia and JMX. > > > Key: FLINK-1502 > URL: https://issues.apache.org/jira/browse/FLINK-1502 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Chesnay Schepler >Priority: Minor > Fix For: 1.1.0 > > > The metrics library allows to expose collected metrics easily to other > systems such as graphite, ganglia or Java's JVM (VisualVM). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #1947: [FLINK-1502] [WIP] Basic Metric System
Github user sumitchawla commented on the issue: https://github.com/apache/flink/pull/1947 hi @zentol .. thanks for the information. This is great and i could see the metrics in JMX. I have one more question on interoperability of Accumulators and Metrics. As per my understanding, currently Metrics are only available at system level , and User Accumulators are available to Job writers. Is there any plan for supporting Metrics for Job writers? Metrics give much more capabilities than current Accumulators and would be a great way to track custom metrics at each operator level? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3630) Little mistake in documentation
[ https://issues.apache.org/jira/browse/FLINK-3630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378657#comment-15378657 ] ASF GitHub Bot commented on FLINK-3630: --- GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/2254 [FLINK-3630] [docs] Little mistake in documentation You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 3630_little_mistake_in_documentation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2254.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2254 commit c659b51803545f067192de2760e0d7df958ef236 Author: Greg HoganDate: 2016-07-14T18:42:55Z [FLINK-3630] [docs] Little mistake in documentation > Little mistake in documentation > --- > > Key: FLINK-3630 > URL: https://issues.apache.org/jira/browse/FLINK-3630 > Project: Flink > Issue Type: Bug > Components: DataSet API, Documentation >Affects Versions: 1.0.0 >Reporter: Riccardo Diomedi >Assignee: Greg Hogan >Priority: Minor > Labels: documentation > > in section "GroupCombine on a Grouped DataSet" of the following link: > https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/dataset_transformations.html#groupreduce-on-grouped-dataset > there is a little mistake in java code in both combine and reduce method(it's > the same mistake). The variable "word" is defined in the scope of the for > loop so it cannot be used in collect method. > Possible solution could be to initialise the variable before the for and > assign a value inside the for. > Something like: > int count = 0; > String word; > for (String record : words) { > word = record; > count++; > } > out.collect(new Tuple2(word, count)); -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2254: [FLINK-3630] [docs] Little mistake in documentatio...
GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/2254 [FLINK-3630] [docs] Little mistake in documentation You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 3630_little_mistake_in_documentation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2254.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2254 commit c659b51803545f067192de2760e0d7df958ef236 Author: Greg HoganDate: 2016-07-14T18:42:55Z [FLINK-3630] [docs] Little mistake in documentation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4219) Quote PDSH opts in start-cluster.sh
[ https://issues.apache.org/jira/browse/FLINK-4219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378654#comment-15378654 ] ASF GitHub Bot commented on FLINK-4219: --- GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/2253 [FLINK-4219] [scripts] Quote PDSH opts in start-cluster.sh This prevents word splitting if the user configures multiple SSH options. You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 4219_quote_pdsh_opts_in_startclustersh Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2253.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2253 commit 2a71c1cd12186f28ab2e17aebb2b6f8a5935a19e Author: Greg HoganDate: 2016-07-14T17:40:35Z [FLINK-4219] [scripts] Quote PDSH opts in start-cluster.sh This prevents word splitting if the user configures multiple SSH options. > Quote PDSH opts in start-cluster.sh > --- > > Key: FLINK-4219 > URL: https://issues.apache.org/jira/browse/FLINK-4219 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Quote {{PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS}} in {{start-cluster.sh}} to > prevent word splitting if the user configures multiple SSH options. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2253: [FLINK-4219] [scripts] Quote PDSH opts in start-cl...
GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/2253 [FLINK-4219] [scripts] Quote PDSH opts in start-cluster.sh This prevents word splitting if the user configures multiple SSH options. You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 4219_quote_pdsh_opts_in_startclustersh Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2253.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2253 commit 2a71c1cd12186f28ab2e17aebb2b6f8a5935a19e Author: Greg HoganDate: 2016-07-14T17:40:35Z [FLINK-4219] [scripts] Quote PDSH opts in start-cluster.sh This prevents word splitting if the user configures multiple SSH options. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2249: 4166 zookeeper namespaces
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2249#discussion_r70887209 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java --- @@ -72,26 +68,36 @@ * This class is the executable entry point for the YARN application master. * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} * and {@link YarnFlinkResourceManager}. - * + * --- End diff -- sorry for that one, i accidentally hit the AF and noticed it too late. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2249: 4166 zookeeper namespaces
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2249#discussion_r70887016 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -545,6 +551,19 @@ protected YarnClusterClient deployInternal() throws Exception { // Set-up ApplicationSubmissionContext for the application ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext(); + final ApplicationId appId = appContext.getApplicationId(); + + // -- Add Zookeeper namespace to local flinkConfiguraton -- + String zkNamespace = getZookeeperNamespace(); + // no user specified cli argument for namespace? + if(zkNamespace == null || zkNamespace.isEmpty()) { + // namespace defined in config? else use applicationId as default. + zkNamespace = flinkConfiguration.getString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY, String.valueOf(appId)); + setZookeeperNamespace(zkNamespace); + } + + flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY, zkNamespace); --- End diff -- simply moving this to the else block makes the code incorrect. i preferred the chance of overwriting a value with itself once in non performance critical code over bloating up the code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2249: 4166 zookeeper namespaces
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2249#discussion_r70886483 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -545,6 +551,19 @@ protected YarnClusterClient deployInternal() throws Exception { // Set-up ApplicationSubmissionContext for the application ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext(); + final ApplicationId appId = appContext.getApplicationId(); --- End diff -- this line was only moved for my change. it is in fact used further down in the same method and cannot be moved inside the if. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2249: 4166 zookeeper namespaces
Github user zentol commented on the issue: https://github.com/apache/flink/pull/2249 Please make sure to follow the naming conventions for pull requests, as described in the PR template. First entry under "General" : "The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")" I can't speak for whether the implementation is correct, but the changes made don't appear to be covered by tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2249: 4166 zookeeper namespaces
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2249#discussion_r70885172 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -545,6 +551,19 @@ protected YarnClusterClient deployInternal() throws Exception { // Set-up ApplicationSubmissionContext for the application ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext(); + final ApplicationId appId = appContext.getApplicationId(); --- End diff -- since this is only used in the `if` block it should be moved in there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2249: 4166 zookeeper namespaces
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2249#discussion_r70884431 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java --- @@ -72,26 +68,36 @@ * This class is the executable entry point for the YARN application master. * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} * and {@link YarnFlinkResourceManager}. - * + * --- End diff -- Of the 96 additions made a grand total of 6 are actually related to this PR. Please limit your auto-formatting. I'm not a fan of checking 15x the number of lines necessary. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2249: 4166 zookeeper namespaces
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2249#discussion_r70883754 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -545,6 +551,19 @@ protected YarnClusterClient deployInternal() throws Exception { // Set-up ApplicationSubmissionContext for the application ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext(); + final ApplicationId appId = appContext.getApplicationId(); + + // -- Add Zookeeper namespace to local flinkConfiguraton -- + String zkNamespace = getZookeeperNamespace(); + // no user specified cli argument for namespace? + if(zkNamespace == null || zkNamespace.isEmpty()) { + // namespace defined in config? else use applicationId as default. + zkNamespace = flinkConfiguration.getString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY, String.valueOf(appId)); + setZookeeperNamespace(zkNamespace); + } + + flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_NAMESPACE_KEY, zkNamespace); --- End diff -- this should be in an `else` block, otherwise we may set a property even though it is already set. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2249: 4166 zookeeper namespaces
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2249#discussion_r70883320 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -545,6 +551,19 @@ protected YarnClusterClient deployInternal() throws Exception { // Set-up ApplicationSubmissionContext for the application ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext(); + final ApplicationId appId = appContext.getApplicationId(); + + // -- Add Zookeeper namespace to local flinkConfiguraton -- + String zkNamespace = getZookeeperNamespace(); + // no user specified cli argument for namespace? + if(zkNamespace == null || zkNamespace.isEmpty()) { --- End diff -- mising space after if --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2249: 4166 zookeeper namespaces
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2249#discussion_r70883081 --- Diff: docs/setup/jobmanager_high_availability.md --- @@ -74,11 +74,15 @@ In order to start an HA-cluster add the following configuration keys to `conf/fl Each *addressX:port* refers to a ZooKeeper server, which is reachable by Flink at the given address and port. -- **ZooKeeper root** (recommended): The *root ZooKeeper node*, under which all required coordination data is placed. +- **ZooKeeper root** (recommended): The *root ZooKeeper node*, under which all cluster namespace nodes are placed. - recovery.zookeeper.path.root: /flink # important: customize per cluster + recovery.zookeeper.path.root: /flink - **Important**: if you are running multiple Flink HA clusters, you have to manually configure separate root nodes for each cluster. +- **ZooKeeper namespace** (recommended): The *namespace ZooKeeper node*, under which all required coordination data for a cluster is placed. + + recovery.zookeeper.path.namespace: /default_ns # important: customize per cluster + + **Important**: if you are running multiple Flink HA clusters, you have to manually configure separate namespaces for each cluster. By default, Yarn cluster and Yarn session automatically generate namespaces based on Yarn application id. A manual configuration overrides this behaviour in Yarn. Specifying a namespace with the -z CLI option, in turn, overrides manual configuration. --- End diff -- i believe you are missing a "the" before "Yarn cluster" and "Yarn application id" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2249: 4166 zookeeper namespaces
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2249#discussion_r70882042 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java --- @@ -428,6 +429,12 @@ private static Configuration createConfiguration(String baseDirectory, Map
[GitHub] flink pull request #2249: 4166 zookeeper namespaces
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2249#discussion_r70881905 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -475,23 +484,23 @@ public YarnClusterClient retrieveCluster( CommandLine cmdLine, Configuration config) throws UnsupportedOperationException { - // first check for an application id - if (cmdLine.hasOption(APPLICATION_ID.getOpt())) { - String applicationID = cmdLine.getOptionValue(APPLICATION_ID.getOpt()); + // first check for an application id, then try to load from yarn properties + String applicationID = cmdLine.hasOption(APPLICATION_ID.getOpt()) ? + cmdLine.getOptionValue(APPLICATION_ID.getOpt()) + : loadYarnPropertiesFile(cmdLine, config); + + if(null != applicationID) { --- End diff -- missing space after if --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4208) Support Running Flink processes in foreground mode
[ https://issues.apache.org/jira/browse/FLINK-4208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378354#comment-15378354 ] ASF GitHub Bot commented on FLINK-4208: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2239 Using `wait $mypid` or just `wait` works for me if I `./bin/jobmanager.sh start cluster` (jobmanager starts in foreground), then in another terminal `./bin/jobmanager.sh stop` and both terminals are now at the command prompt. > Support Running Flink processes in foreground mode > -- > > Key: FLINK-4208 > URL: https://issues.apache.org/jira/browse/FLINK-4208 > Project: Flink > Issue Type: Improvement >Reporter: Ismaël Mejía >Priority: Minor > > Flink clusters are started automatically in daemon mode, this is definitely > the default case, however if we want to start containers based on flinks, the > execution context gets lost. Running flink as foreground processes can fix > this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2239: [FLINK-4208] Support Running Flink processes in foregroun...
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2239 Using `wait $mypid` or just `wait` works for me if I `./bin/jobmanager.sh start cluster` (jobmanager starts in foreground), then in another terminal `./bin/jobmanager.sh stop` and both terminals are now at the command prompt. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4210) Move close()/isClosed() out of MetricGroup interface
[ https://issues.apache.org/jira/browse/FLINK-4210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378352#comment-15378352 ] Chesnay Schepler commented on FLINK-4210: - This is actually wrong. Let's say a user calls close() on the MetricGorup returned by the RuntimeContext. Then all operator metrics for that operator are turned off, not just user-metrics. Besides that, I'm mostly concerned about usability. Having a close() method in the interface implicitly conveys "hey, this object has to be closed". But they don't have to close it, and it will just make using them a bit unwieldy to users who believe they have to. They would have to store it in a field, close it on exit, check for null. All of this is unnecessary. > Move close()/isClosed() out of MetricGroup interface > > > Key: FLINK-4210 > URL: https://issues.apache.org/jira/browse/FLINK-4210 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Minor > Fix For: 1.1.0 > > > The (user-facing) MetricGroup interface currently exposes a close() and > isClosed() method which generally users shouldn't need to call. They are an > internal thing, and thus should be moved into the AbstractMetricGroup class. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3466) Job might get stuck in restoreState() from HDFS due to interrupt
[ https://issues.apache.org/jira/browse/FLINK-3466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378350#comment-15378350 ] ASF GitHub Bot commented on FLINK-3466: --- GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/2252 [FLINK-3466] [runtime] Cancel state handled on state restore This pull request fixes the issue that state restore operations can get stuck when tasks are cancelled during state restore. That happens due to a bug in HDFS, which deadlocks (or livelocks) when the reading thread is interrupted. This introduces two things: 1. All state handles and key/value snapshots are now `Closable`. This does not delete any checkpoint data, but simply closes pending streams and data fetch handles. Operations concurrently accessing the state handles state should fail. 2. The `StreamTask` holds a set of "Closables" that it closes upon cancellation. This is a cleaner way of stopping in-progress work than relying on "interrupt()" to interrupt that work. This mechanism should eventually be extended to also cancel operators and state handles pending asynchronous materialization. There is a test that has an interrupt sensitive state handle (mimicking HDFS's deadlock behavior) that causes a stall without this pull request and cleanly finishes with the changes in this pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink state_handle_cancellation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2252.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2252 commit 224503b86c2864f604a7c519ea5f415c57f35ff3 Author: Stephan EwenDate: 2016-07-14T13:14:12Z [FLINK-3466] [tests] Add serialization validation for state handles commit c411b379381ab1390e2166356232a33165c1abd9 Author: Stephan Ewen Date: 2016-07-13T19:32:40Z [FLINK-3466] [runtime] Make state handles cancelable. State handles are cancelable, to make sure long running checkpoint restore operations do finish early on cancallation, even if the code does not properly react to interrupts. This is especially important since HDFS client code is so buggy that it deadlocks when interrupted without closing. > Job might get stuck in restoreState() from HDFS due to interrupt > > > Key: FLINK-3466 > URL: https://issues.apache.org/jira/browse/FLINK-3466 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.0.0, 0.10.2 >Reporter: Robert Metzger >Assignee: Stephan Ewen > > A user reported the following issue with a failing job: > {code} > 10:46:09,223 WARN org.apache.flink.runtime.taskmanager.Task >- Task 'XXX -> YYY (3/5)' did not react to cancelling signal, but is stuck > in method: > sun.misc.Unsafe.park(Native Method) > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUninterruptibly(AbstractQueuedSynchronizer.java:1979) > org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:255) > org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434) > org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1016) > org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:477) > org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:783) > org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:717) > org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:421) > org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:332) > org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576) > org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:800) > org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:848) > java.io.DataInputStream.read(DataInputStream.java:149) > org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:69) > java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310) > java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2323) > java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794) >
[GitHub] flink pull request #2252: [FLINK-3466] [runtime] Cancel state handled on sta...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/2252 [FLINK-3466] [runtime] Cancel state handled on state restore This pull request fixes the issue that state restore operations can get stuck when tasks are cancelled during state restore. That happens due to a bug in HDFS, which deadlocks (or livelocks) when the reading thread is interrupted. This introduces two things: 1. All state handles and key/value snapshots are now `Closable`. This does not delete any checkpoint data, but simply closes pending streams and data fetch handles. Operations concurrently accessing the state handles state should fail. 2. The `StreamTask` holds a set of "Closables" that it closes upon cancellation. This is a cleaner way of stopping in-progress work than relying on "interrupt()" to interrupt that work. This mechanism should eventually be extended to also cancel operators and state handles pending asynchronous materialization. There is a test that has an interrupt sensitive state handle (mimicking HDFS's deadlock behavior) that causes a stall without this pull request and cleanly finishes with the changes in this pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink state_handle_cancellation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2252.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2252 commit 224503b86c2864f604a7c519ea5f415c57f35ff3 Author: Stephan EwenDate: 2016-07-14T13:14:12Z [FLINK-3466] [tests] Add serialization validation for state handles commit c411b379381ab1390e2166356232a33165c1abd9 Author: Stephan Ewen Date: 2016-07-13T19:32:40Z [FLINK-3466] [runtime] Make state handles cancelable. State handles are cancelable, to make sure long running checkpoint restore operations do finish early on cancallation, even if the code does not properly react to interrupts. This is especially important since HDFS client code is so buggy that it deadlocks when interrupted without closing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #:
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/commit/2477161352e12e75e2f0f85b5833ad04dc6d31f2#commitcomment-18252310 In flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java: In flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java on line 103: The change I made was to clear the heap references earlier. Less chance of redundant work when concurrent disposals happen. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #:
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/commit/2477161352e12e75e2f0f85b5833ad04dc6d31f2#commitcomment-18252297 In flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java: In flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java on line 103: The pattern is - cache the reference on stack (immutable against concurrent modifications) - set the heap reference to null - proceed based on the stack reference I think that should work. If the heap reference was non null initially before, the stack reference is non null, and the condition is true. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-4214) JobExceptionsHandler will return all exceptions
[ https://issues.apache.org/jira/browse/FLINK-4214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-4214. - Resolution: Fixed Fix Version/s: 1.1.0 Fixed via 91d5c63a717d8786506c2d791bb4683838f699d8 Thank you for the contribution! > JobExceptionsHandler will return all exceptions > --- > > Key: FLINK-4214 > URL: https://issues.apache.org/jira/browse/FLINK-4214 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Reporter: Sumit Chawla >Priority: Minor > Fix For: 1.1.0 > > > JobExceptionsHandler will return all exceptions and is not incrementing the > integer to track the exceptions being serialized -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4216) WordWithCount example with Java has wrong generics type
[ https://issues.apache.org/jira/browse/FLINK-4216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-4216. --- > WordWithCount example with Java has wrong generics type > --- > > Key: FLINK-4216 > URL: https://issues.apache.org/jira/browse/FLINK-4216 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Serhiy Boychenko >Assignee: Matthias J. Sax >Priority: Trivial > Fix For: 1.1.0 > > Original Estimate: 10m > Remaining Estimate: 10m > > The Java example of the POJOs results in the: > {code} > Exception in thread "main" java.lang.Error: Unresolved compilation problem: > {code} > due to the wrong type of the generics of the DataStream. > Currently it is {code}DataStream>{code} > but should be {code}DataSource{code}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4216) WordWithCount example with Java has wrong generics type
[ https://issues.apache.org/jira/browse/FLINK-4216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-4216. - Resolution: Fixed Assignee: Matthias J. Sax Fix Version/s: 1.1.0 Fixed via 2346468446414b2a14c2833be4d60288cd8d0550 > WordWithCount example with Java has wrong generics type > --- > > Key: FLINK-4216 > URL: https://issues.apache.org/jira/browse/FLINK-4216 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Serhiy Boychenko >Assignee: Matthias J. Sax >Priority: Trivial > Fix For: 1.1.0 > > Original Estimate: 10m > Remaining Estimate: 10m > > The Java example of the POJOs results in the: > {code} > Exception in thread "main" java.lang.Error: Unresolved compilation problem: > {code} > due to the wrong type of the generics of the DataStream. > Currently it is {code}DataStream>{code} > but should be {code}DataSource{code}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2125) String delimiter for SocketTextStream
[ https://issues.apache.org/jira/browse/FLINK-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378331#comment-15378331 ] ASF GitHub Bot commented on FLINK-2125: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2233 @StephanEwen The socketTextStream methods are already marked as `@PublicEvolving`, i thought changing those was allowed? > String delimiter for SocketTextStream > - > > Key: FLINK-2125 > URL: https://issues.apache.org/jira/browse/FLINK-2125 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.9 >Reporter: Márton Balassi >Priority: Minor > Labels: starter > > The SocketTextStreamFunction uses a character delimiter, despite other parts > of the API using String delimiter. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4214) JobExceptionsHandler will return all exceptions
[ https://issues.apache.org/jira/browse/FLINK-4214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-4214. --- > JobExceptionsHandler will return all exceptions > --- > > Key: FLINK-4214 > URL: https://issues.apache.org/jira/browse/FLINK-4214 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Reporter: Sumit Chawla >Priority: Minor > Fix For: 1.1.0 > > > JobExceptionsHandler will return all exceptions and is not incrementing the > integer to track the exceptions being serialized -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2233: [FLINK-2125][streaming] Delimiter change from char to str...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/2233 @StephanEwen The socketTextStream methods are already marked as `@PublicEvolving`, i thought changing those was allowed? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2247: FLINK-4216
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2247 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2246: [hotfix] [doc] fixed example
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2246 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2242: [FLINK-4214] ExceptionHandler keep count of except...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2242 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4214) JobExceptionsHandler will return all exceptions
[ https://issues.apache.org/jira/browse/FLINK-4214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378320#comment-15378320 ] ASF GitHub Bot commented on FLINK-4214: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2242 > JobExceptionsHandler will return all exceptions > --- > > Key: FLINK-4214 > URL: https://issues.apache.org/jira/browse/FLINK-4214 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Reporter: Sumit Chawla >Priority: Minor > > JobExceptionsHandler will return all exceptions and is not incrementing the > integer to track the exceptions being serialized -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4216) WordWithCount example with Java has wrong generics type
[ https://issues.apache.org/jira/browse/FLINK-4216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378321#comment-15378321 ] ASF GitHub Bot commented on FLINK-4216: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2247 > WordWithCount example with Java has wrong generics type > --- > > Key: FLINK-4216 > URL: https://issues.apache.org/jira/browse/FLINK-4216 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Serhiy Boychenko >Priority: Trivial > Original Estimate: 10m > Remaining Estimate: 10m > > The Java example of the POJOs results in the: > {code} > Exception in thread "main" java.lang.Error: Unresolved compilation problem: > {code} > due to the wrong type of the generics of the DataStream. > Currently it is {code}DataStream>{code} > but should be {code}DataSource{code}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4208) Support Running Flink processes in foreground mode
[ https://issues.apache.org/jira/browse/FLINK-4208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378305#comment-15378305 ] ASF GitHub Bot commented on FLINK-4208: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2239#discussion_r70877433 --- Diff: flink-dist/src/main/flink-bin/bin/flink-daemon.sh --- @@ -77,31 +77,36 @@ if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then fi case $STARTSTOP in +(start|start-foreground) + # Rotate log files + rotateLogFile $log + rotateLogFile $out + + # Print a warning if daemons are already running on host + if [ -f $pid ]; then +active=() +while IFS='' read -r p || [[ -n "$p" ]]; do + kill -0 $p >/dev/null 2>&1 + if [ $? -eq 0 ]; then +active+=($p) + fi +done < "${pid}" -(start) -# Rotate log files -rotateLogFile $log -rotateLogFile $out - -# Print a warning if daemons are already running on host -if [ -f $pid ]; then - active=() - while IFS='' read -r p || [[ -n "$p" ]]; do -kill -0 $p >/dev/null 2>&1 -if [ $? -eq 0 ]; then - active+=($p) -fi - done < "${pid}" - - count="${#active[@]}" +count="${#active[@]}" - if [ ${count} -gt 0 ]; then -echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME." - fi +if [ ${count} -gt 0 ]; then + echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME." fi + fi + + if [[ $STARTSTOP == "start-foreground" ]]; then +echo "Starting $DAEMON as a foreground process on host $HOSTNAME." +$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null + fi + if [[ $STARTSTOP == "start" ]]; then echo "Starting $DAEMON daemon on host $HOSTNAME." -$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null & +nohup $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null & --- End diff -- This is excellent documentation. It looks like (from a `grep daemon` through the code) that Flink is daemonizing its threads internally. Perhaps @StephanEwen can look at this. > Support Running Flink processes in foreground mode > -- > > Key: FLINK-4208 > URL: https://issues.apache.org/jira/browse/FLINK-4208 > Project: Flink > Issue Type: Improvement >Reporter: Ismaël Mejía >Priority: Minor > > Flink clusters are started automatically in daemon mode, this is definitely > the default case, however if we want to start containers based on flinks, the > execution context gets lost. Running flink as foreground processes can fix > this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4208) Support Running Flink processes in foreground mode
[ https://issues.apache.org/jira/browse/FLINK-4208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378301#comment-15378301 ] ASF GitHub Bot commented on FLINK-4208: --- Github user iemejia commented on the issue: https://github.com/apache/flink/pull/2239 I tried not to change the current daemon behavior, that's the reason why I took the decision to add an additional option. I am not sure if using wait may work for what I want but if it does, perfect, can you give me hints of how to test this ? I naively did this and it does not seem to work. ``` if [[ ${mypid} =~ ${IS_NUMBER} ]] && kill -0 $mypid > /dev/null 2>&1 ; then echo $mypid >> $pid + wait $mypid # I also tried with $pid and it does not work either else ``` > Support Running Flink processes in foreground mode > -- > > Key: FLINK-4208 > URL: https://issues.apache.org/jira/browse/FLINK-4208 > Project: Flink > Issue Type: Improvement >Reporter: Ismaël Mejía >Priority: Minor > > Flink clusters are started automatically in daemon mode, this is definitely > the default case, however if we want to start containers based on flinks, the > execution context gets lost. Running flink as foreground processes can fix > this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2239: [FLINK-4208] Support Running Flink processes in fo...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2239#discussion_r70877433 --- Diff: flink-dist/src/main/flink-bin/bin/flink-daemon.sh --- @@ -77,31 +77,36 @@ if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then fi case $STARTSTOP in +(start|start-foreground) + # Rotate log files + rotateLogFile $log + rotateLogFile $out + + # Print a warning if daemons are already running on host + if [ -f $pid ]; then +active=() +while IFS='' read -r p || [[ -n "$p" ]]; do + kill -0 $p >/dev/null 2>&1 + if [ $? -eq 0 ]; then +active+=($p) + fi +done < "${pid}" -(start) -# Rotate log files -rotateLogFile $log -rotateLogFile $out - -# Print a warning if daemons are already running on host -if [ -f $pid ]; then - active=() - while IFS='' read -r p || [[ -n "$p" ]]; do -kill -0 $p >/dev/null 2>&1 -if [ $? -eq 0 ]; then - active+=($p) -fi - done < "${pid}" - - count="${#active[@]}" +count="${#active[@]}" - if [ ${count} -gt 0 ]; then -echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME." - fi +if [ ${count} -gt 0 ]; then + echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME." fi + fi + + if [[ $STARTSTOP == "start-foreground" ]]; then +echo "Starting $DAEMON as a foreground process on host $HOSTNAME." +$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null + fi + if [[ $STARTSTOP == "start" ]]; then echo "Starting $DAEMON daemon on host $HOSTNAME." -$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null & +nohup $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null & --- End diff -- This is excellent documentation. It looks like (from a `grep daemon` through the code) that Flink is daemonizing its threads internally. Perhaps @StephanEwen can look at this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2239: [FLINK-4208] Support Running Flink processes in foregroun...
Github user iemejia commented on the issue: https://github.com/apache/flink/pull/2239 I tried not to change the current daemon behavior, that's the reason why I took the decision to add an additional option. I am not sure if using wait may work for what I want but if it does, perfect, can you give me hints of how to test this ? I naively did this and it does not seem to work. ``` if [[ ${mypid} =~ ${IS_NUMBER} ]] && kill -0 $mypid > /dev/null 2>&1 ; then echo $mypid >> $pid + wait $mypid # I also tried with $pid and it does not work either else ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2249: 4166 zookeeper namespaces
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2249#discussion_r70876628 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java --- @@ -285,6 +291,18 @@ public static ZooKeeperCheckpointIDCounter createCheckpointIDCounter( } } + private static String generateZookeeperPath(String root, String namespace) { + if(!namespace.startsWith("/")) { --- End diff -- missing space after if, and a second time a few lines downwards --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2249: 4166 zookeeper namespaces
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2249#discussion_r70876542 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java --- @@ -164,6 +169,7 @@ public static ZooKeeperLeaderElectionService createLeaderElectionService( * @param clientThe {@link CuratorFramework} ZooKeeper client to use * @param configuration {@link Configuration} object containing the configuration values * @return {@link ZooKeeperLeaderElectionService} instance. +* @return {@link ZooKeeperLeaderElectionService} instance. --- End diff -- duplicate `@return` entry --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #:
Github user zentol commented on the pull request: https://github.com/apache/flink/commit/2477161352e12e75e2f0f85b5833ad04dc6d31f2#commitcomment-18251972 In flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java: In flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java on line 103: will this condition now ever be true? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4029) Multi-field "sum" function just like "keyBy"
[ https://issues.apache.org/jira/browse/FLINK-4029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378271#comment-15378271 ] Ivan Mushketyk commented on FLINK-4029: --- Gabor, This should be very helpful! Thank you for your suggestion. > Multi-field "sum" function just like "keyBy" > > > Key: FLINK-4029 > URL: https://issues.apache.org/jira/browse/FLINK-4029 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Rami >Assignee: Ivan Mushketyk >Priority: Minor > > I can use keyBy as follows: > stream.keyBy(“pojo.field1”,”pojo.field2”,…) > Would make sense that I can use sum for example, to do its job for more than > one field: > stream.sum(“pojo.field1”,”pojo.field2”,…) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4029) Multi-field "sum" function just like "keyBy"
[ https://issues.apache.org/jira/browse/FLINK-4029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378269#comment-15378269 ] Ivan Mushketyk commented on FLINK-4029: --- Rami, Thank you for your reply, it is clear now. > Multi-field "sum" function just like "keyBy" > > > Key: FLINK-4029 > URL: https://issues.apache.org/jira/browse/FLINK-4029 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Rami >Assignee: Ivan Mushketyk >Priority: Minor > > I can use keyBy as follows: > stream.keyBy(“pojo.field1”,”pojo.field2”,…) > Would make sense that I can use sum for example, to do its job for more than > one field: > stream.sum(“pojo.field1”,”pojo.field2”,…) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4208) Support Running Flink processes in foreground mode
[ https://issues.apache.org/jira/browse/FLINK-4208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378263#comment-15378263 ] ASF GitHub Bot commented on FLINK-4208: --- Github user iemejia commented on a diff in the pull request: https://github.com/apache/flink/pull/2239#discussion_r70873347 --- Diff: flink-dist/src/main/flink-bin/bin/flink-daemon.sh --- @@ -77,31 +77,36 @@ if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then fi case $STARTSTOP in +(start|start-foreground) + # Rotate log files + rotateLogFile $log + rotateLogFile $out + + # Print a warning if daemons are already running on host + if [ -f $pid ]; then +active=() +while IFS='' read -r p || [[ -n "$p" ]]; do + kill -0 $p >/dev/null 2>&1 + if [ $? -eq 0 ]; then +active+=($p) + fi +done < "${pid}" -(start) -# Rotate log files -rotateLogFile $log -rotateLogFile $out - -# Print a warning if daemons are already running on host -if [ -f $pid ]; then - active=() - while IFS='' read -r p || [[ -n "$p" ]]; do -kill -0 $p >/dev/null 2>&1 -if [ $? -eq 0 ]; then - active+=($p) -fi - done < "${pid}" - - count="${#active[@]}" +count="${#active[@]}" - if [ ${count} -gt 0 ]; then -echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME." - fi +if [ ${count} -gt 0 ]; then + echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME." fi + fi + + if [[ $STARTSTOP == "start-foreground" ]]; then +echo "Starting $DAEMON as a foreground process on host $HOSTNAME." +$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null + fi + if [[ $STARTSTOP == "start" ]]; then echo "Starting $DAEMON daemon on host $HOSTNAME." -$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null & +nohup $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null & --- End diff -- I added this because I was trying to grasp what makes a daemon a daemon and I found a reference that convinced me that nohup was missing: https://stackoverflow.com/questions/3430330/best-way-to-make-a-shell-script-daemon Additionally when I looked for inspiration for my changes (the start-foreground name), I look at how they started the daemon in zookeeper and I noticed they use nohup too. https://github.com/apache/zookeeper/blob/trunk/bin/zkServer.sh#L219 This is an extra thing and not the core of the Pull Request, if you don't agree I can rebase and remove that commit, but I think it is worth the addition. > Support Running Flink processes in foreground mode > -- > > Key: FLINK-4208 > URL: https://issues.apache.org/jira/browse/FLINK-4208 > Project: Flink > Issue Type: Improvement >Reporter: Ismaël Mejía >Priority: Minor > > Flink clusters are started automatically in daemon mode, this is definitely > the default case, however if we want to start containers based on flinks, the > execution context gets lost. Running flink as foreground processes can fix > this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2239: [FLINK-4208] Support Running Flink processes in fo...
Github user iemejia commented on a diff in the pull request: https://github.com/apache/flink/pull/2239#discussion_r70873347 --- Diff: flink-dist/src/main/flink-bin/bin/flink-daemon.sh --- @@ -77,31 +77,36 @@ if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then fi case $STARTSTOP in +(start|start-foreground) + # Rotate log files + rotateLogFile $log + rotateLogFile $out + + # Print a warning if daemons are already running on host + if [ -f $pid ]; then +active=() +while IFS='' read -r p || [[ -n "$p" ]]; do + kill -0 $p >/dev/null 2>&1 + if [ $? -eq 0 ]; then +active+=($p) + fi +done < "${pid}" -(start) -# Rotate log files -rotateLogFile $log -rotateLogFile $out - -# Print a warning if daemons are already running on host -if [ -f $pid ]; then - active=() - while IFS='' read -r p || [[ -n "$p" ]]; do -kill -0 $p >/dev/null 2>&1 -if [ $? -eq 0 ]; then - active+=($p) -fi - done < "${pid}" - - count="${#active[@]}" +count="${#active[@]}" - if [ ${count} -gt 0 ]; then -echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME." - fi +if [ ${count} -gt 0 ]; then + echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME." fi + fi + + if [[ $STARTSTOP == "start-foreground" ]]; then +echo "Starting $DAEMON as a foreground process on host $HOSTNAME." +$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null + fi + if [[ $STARTSTOP == "start" ]]; then echo "Starting $DAEMON daemon on host $HOSTNAME." -$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null & +nohup $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null & --- End diff -- I added this because I was trying to grasp what makes a daemon a daemon and I found a reference that convinced me that nohup was missing: https://stackoverflow.com/questions/3430330/best-way-to-make-a-shell-script-daemon Additionally when I looked for inspiration for my changes (the start-foreground name), I look at how they started the daemon in zookeeper and I noticed they use nohup too. https://github.com/apache/zookeeper/blob/trunk/bin/zkServer.sh#L219 This is an extra thing and not the core of the Pull Request, if you don't agree I can rebase and remove that commit, but I think it is worth the addition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-3874) Add a Kafka TableSink with JSON serialization
[ https://issues.apache.org/jira/browse/FLINK-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Mushketyk reassigned FLINK-3874: - Assignee: Ivan Mushketyk > Add a Kafka TableSink with JSON serialization > - > > Key: FLINK-3874 > URL: https://issues.apache.org/jira/browse/FLINK-3874 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk >Priority: Minor > > Add a TableSink that writes JSON serialized data to Kafka. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...
Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2244 I've update the PR according to the PR and fixed the build (I was using a method from JDK8). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4212) Lock PID file when starting daemons
[ https://issues.apache.org/jira/browse/FLINK-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378248#comment-15378248 ] ASF GitHub Bot commented on FLINK-4212: --- GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/2251 [FLINK-4212] [scripts] Lock PID file when starting daemons You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 4212_lock_pid_file_when_starting_daemons Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2251.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2251 commit e7d59835ab011e38de61a0b304cd93f1ca7cefb7 Author: Greg HoganDate: 2016-07-14T15:55:41Z [FLINK-4212] [scripts] Lock PID file when starting daemons > Lock PID file when starting daemons > --- > > Key: FLINK-4212 > URL: https://issues.apache.org/jira/browse/FLINK-4212 > Project: Flink > Issue Type: Improvement > Components: Startup Shell Scripts >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > > As noted on the mailing list (0), when multiple TaskManagers are started in > parallel (using pdsh) there is a race condition on updating the pid: 1) the > pid file is first read to parse the process' index, 2) the process is > started, and 3) on success the daemon pid is appended to the pid file. > We could use a tool such as {{flock}} to lock on the pid file while starting > the Flink daemon. > 0: > http://mail-archives.apache.org/mod_mbox/flink-user/201607.mbox/%3CCA%2BssbKXw954Bz_sBRwP6db0FntWyGWzTyP7wJZ5nhOeQnof3kg%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2251: [FLINK-4212] [scripts] Lock PID file when starting...
GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/2251 [FLINK-4212] [scripts] Lock PID file when starting daemons You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 4212_lock_pid_file_when_starting_daemons Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2251.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2251 commit e7d59835ab011e38de61a0b304cd93f1ca7cefb7 Author: Greg HoganDate: 2016-07-14T15:55:41Z [FLINK-4212] [scripts] Lock PID file when starting daemons --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3874) Add a Kafka TableSink with JSON serialization
[ https://issues.apache.org/jira/browse/FLINK-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378227#comment-15378227 ] ASF GitHub Bot commented on FLINK-3874: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2244 I've update the PR according to the PR and fixed the build (I was using a method from JDK8). > Add a Kafka TableSink with JSON serialization > - > > Key: FLINK-3874 > URL: https://issues.apache.org/jira/browse/FLINK-3874 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Priority: Minor > > Add a TableSink that writes JSON serialized data to Kafka. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3874) Add a Kafka TableSink with JSON serialization
[ https://issues.apache.org/jira/browse/FLINK-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378225#comment-15378225 ] ASF GitHub Bot commented on FLINK-3874: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2244#discussion_r70870120 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroSerializationSchemaTest.java --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.flink.streaming.util.serialization.AvroSerializationSchema; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Objects; + +import static javafx.scene.input.KeyCode.T; +import static org.junit.Assert.assertEquals; + +public class AvroSerializationSchemaTest { +// @Test --- End diff -- Sorry, I accidentally added this file to this PR. I've already removed it. > Add a Kafka TableSink with JSON serialization > - > > Key: FLINK-3874 > URL: https://issues.apache.org/jira/browse/FLINK-3874 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Priority: Minor > > Add a TableSink that writes JSON serialized data to Kafka. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3874) Add a Kafka TableSink with JSON serialization
[ https://issues.apache.org/jira/browse/FLINK-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378226#comment-15378226 ] ASF GitHub Bot commented on FLINK-3874: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2244#discussion_r70870183 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroSerializationSchemaTest.java --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.flink.streaming.util.serialization.AvroSerializationSchema; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Objects; + +import static javafx.scene.input.KeyCode.T; --- End diff -- I've already removed this file. > Add a Kafka TableSink with JSON serialization > - > > Key: FLINK-3874 > URL: https://issues.apache.org/jira/browse/FLINK-3874 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Priority: Minor > > Add a TableSink that writes JSON serialized data to Kafka. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2244: [FLINK-3874] Add a Kafka TableSink with JSON seria...
Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2244#discussion_r70870120 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroSerializationSchemaTest.java --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.apache.flink.streaming.util.serialization.AvroSerializationSchema; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Objects; + +import static javafx.scene.input.KeyCode.T; +import static org.junit.Assert.assertEquals; + +public class AvroSerializationSchemaTest { +// @Test --- End diff -- Sorry, I accidentally added this file to this PR. I've already removed it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2125) String delimiter for SocketTextStream
[ https://issues.apache.org/jira/browse/FLINK-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378219#comment-15378219 ] ASF GitHub Bot commented on FLINK-2125: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2233 Because this breaks the public API, it would be good to do the following: Add a new method to the `StreamExecutionEnvironment`, rather than changing the old method. Tag that new method as `@PublicEvolving`. Take the old method, delegate to the new method, and mark it as `@Deprecated`. Also add a proper deprecation comment. > String delimiter for SocketTextStream > - > > Key: FLINK-2125 > URL: https://issues.apache.org/jira/browse/FLINK-2125 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.9 >Reporter: Márton Balassi >Priority: Minor > Labels: starter > > The SocketTextStreamFunction uses a character delimiter, despite other parts > of the API using String delimiter. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2233: [FLINK-2125][streaming] Delimiter change from char to str...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2233 Because this breaks the public API, it would be good to do the following: Add a new method to the `StreamExecutionEnvironment`, rather than changing the old method. Tag that new method as `@PublicEvolving`. Take the old method, delegate to the new method, and mark it as `@Deprecated`. Also add a proper deprecation comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3666) Remove Nephele references
[ https://issues.apache.org/jira/browse/FLINK-3666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378214#comment-15378214 ] ASF GitHub Bot commented on FLINK-3666: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2241 Good catch! +1 to merge > Remove Nephele references > - > > Key: FLINK-3666 > URL: https://issues.apache.org/jira/browse/FLINK-3666 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Trivial > > There still exist a few references to nephele which should be removed: > {code} > flink\docs\setup\local_setup.md: >79 $ tail log/flink-*-jobmanager-*.log >80 INFO ... - Initializing memory manager with 409 megabytes of memory >81: INFO ... - Trying to load > org.apache.flinknephele.jobmanager.scheduler.local.LocalScheduler as scheduler >82 INFO ... - Setting up web info server, using web-root directory ... >83: INFO ... - Web info server will display information about nephele > job-manager on localhost, port 8081. >84 INFO ... - Starting web info server for JobManager on port 8081 >85 ~~~ >.. > 118 $ cd flink > 119 $ bin/start-local.sh > 120: Starting Nephele job manager > 121 ~~~ > {code} > {code} > flink\flink-runtime\src\main\java\org\apache\flink\runtime\operators\TaskContext.java: >70:AbstractInvokable getOwningNepheleTask(); > {code} > {code} > flink\flink-runtime\src\main\java\org\apache\flink\runtime\operators\BatchTask.java: > 1149 * @param message The main message for the log. > 1150 * @param taskName The name of the task. > 1151: * @param parent The nephele task that contains the code > producing the message. > 1152 * > 1153 * @return The string for logging. > > 1254 */ > 1255 @SuppressWarnings("unchecked") > 1256:public static Collector initOutputs(AbstractInvokable > nepheleTask, ClassLoader cl, TaskConfig config, > 1257 > ListchainedTasksTarget, > 1258 > List eventualOutputs, > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2241: [FLINK-3666] Remove all remaining Nephele references
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2241 Good catch! +1 to merge --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2246: [hotfix] [doc] fixed example
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2246 Thanks, merging this! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3163) Configure Flink for NUMA systems
[ https://issues.apache.org/jira/browse/FLINK-3163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378192#comment-15378192 ] Greg Hogan commented on FLINK-3163: --- I think we can achieve "good enough" without changing the format of {{masters}} and {{slaves}}. Mesos and YARN provide cluster management, and it might be best to keep the Flink configuration simple. What if we added * a configuration parameter to enable NUMA which would result in a TaskManager started on each NUMA node for each IP in {{slaves}} * a configuration parameter (one or two?) for the JobManager and ResourceManager to run in their own NUMA node, not shared with a TaskManager (would the JM and RM share a NUMA node if on the same IP?) These could be {{taskmanager.compute.numa}}, {{jobmanager.compute.numa}}, and {{resourcemanager.compute.numa}}. We could also add, as a related idea, {{taskmanager.compute.fraction}}. This would operate relative to {{taskmanager.numberOfTaskSlots}} as {{taskmanager.memory.fraction}} operates relative to {{taskmanager.memory.size}}. If set to {{1.0}} you would get one slot per (hyper-threaded) processor. As [~saliya] noted, binding processes is quite easy. Since I have only dealt with single-socket systems I have temporarily hard-coded the following in my build: {code} diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh index e579c0c..5f076d5 100755 --- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh +++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh @@ -96,4 +96,10 @@ if [[ $STARTSTOP == "start" ]]; then args=("--configDir" "${FLINK_CONF_DIR}") fi -"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP taskmanager "${args[@]}" +command -v numactl >/dev/null 2>&1 +if [[ $? -ne 0 ]]; then +"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP taskmanager "${args[@]}" +else +numactl --membind=0 --cpunodebind=0 -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP taskmanager "${args[@]}" +numactl --membind=1 --cpunodebind=1 -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP taskmanager "${args[@]}" +fi {code} > Configure Flink for NUMA systems > > > Key: FLINK-3163 > URL: https://issues.apache.org/jira/browse/FLINK-3163 > Project: Flink > Issue Type: Improvement > Components: Startup Shell Scripts >Affects Versions: 1.0.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > > On NUMA systems Flink can be pinned to a single physical processor ("node") > using {{numactl --membind=$node --cpunodebind=$node }}. Commonly > available NUMA systems include the largest AWS and Google Compute instances. > For example, on an AWS c4.8xlarge system with 36 hyperthreads the user could > configure a single TaskManager with 36 slots or have Flink create two > TaskManagers bound to each of the NUMA nodes, each with 18 slots. > There may be some extra overhead in transferring network buffers between > TaskManagers on the same system, though the fraction of data shuffled in this > manner decreases with the size of the cluster. The performance improvement > from only accessing local memory looks to be significant though difficult to > benchmark. > The JobManagers may fit into NUMA nodes rather than requiring full systems. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4210) Move close()/isClosed() out of MetricGroup interface
[ https://issues.apache.org/jira/browse/FLINK-4210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378189#comment-15378189 ] Stephan Ewen commented on FLINK-4210: - I don't think that is too bad. Worst thing that can happen is that a user closes the user-code-metricgroup. > Move close()/isClosed() out of MetricGroup interface > > > Key: FLINK-4210 > URL: https://issues.apache.org/jira/browse/FLINK-4210 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Minor > Fix For: 1.1.0 > > > The (user-facing) MetricGroup interface currently exposes a close() and > isClosed() method which generally users shouldn't need to call. They are an > internal thing, and thus should be moved into the AbstractMetricGroup class. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4218) Sporadic "java.lang.RuntimeException: Error triggering a checkpoint..." causes task restarting
[ https://issues.apache.org/jira/browse/FLINK-4218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378170#comment-15378170 ] Stephan Ewen commented on FLINK-4218: - If I understand correctly, it is actually possible that some nodes see the files, and some do not? We can try different options: - When a file state handle is closed, the file system is queried for the existence of that file. Only after that is there, the checkpoint is acknowledged. There needs to be a somewhat tight timeout on that. - On restore, we can have some re-tries, with a reasonable timeout. What do you think about these options? > Sporadic "java.lang.RuntimeException: Error triggering a checkpoint..." > causes task restarting > -- > > Key: FLINK-4218 > URL: https://issues.apache.org/jira/browse/FLINK-4218 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.1.0 >Reporter: Sergii Koshel > > Sporadically see exception as below. And restart of task because of it. > {code:title=Exception|borderStyle=solid} > java.lang.RuntimeException: Error triggering a checkpoint as the result of > receiving checkpoint barrier > at > org.apache.flink.streaming.runtime.tasks.StreamTask$3.onEvent(StreamTask.java:785) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$3.onEvent(StreamTask.java:775) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:265) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.FileNotFoundException: No such file or directory: > s3:///flink/checkpoints/ece317c26960464ba5de75f3bbc38cb2/chk-8810/96eebbeb-de14-45c7-8ebb-e7cde978d6d3 > at > org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:996) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:77) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:351) > at > org.apache.flink.runtime.state.filesystem.AbstractFileStateHandle.getFileSize(AbstractFileStateHandle.java:93) > at > org.apache.flink.runtime.state.filesystem.FileStreamStateHandle.getStateSize(FileStreamStateHandle.java:58) > at > org.apache.flink.runtime.state.AbstractStateBackend$DataInputViewHandle.getStateSize(AbstractStateBackend.java:482) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskStateList.getStateSize(StreamTaskStateList.java:77) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:604) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$3.onEvent(StreamTask.java:779) > ... 8 more > {code} > File actually exists on S3. > I suppose it is related to some race conditions with S3 but would be good to > retry a few times before stop task execution. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4214) JobExceptionsHandler will return all exceptions
[ https://issues.apache.org/jira/browse/FLINK-4214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378166#comment-15378166 ] ASF GitHub Bot commented on FLINK-4214: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2242 Merging this, thanks for the patch! > JobExceptionsHandler will return all exceptions > --- > > Key: FLINK-4214 > URL: https://issues.apache.org/jira/browse/FLINK-4214 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Reporter: Sumit Chawla >Priority: Minor > > JobExceptionsHandler will return all exceptions and is not incrementing the > integer to track the exceptions being serialized -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2242: [FLINK-4214] ExceptionHandler keep count of exceptions
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2242 Merging this, thanks for the patch! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2248: [FLINK-4213] [gelly] Provide CombineHint in Gelly algorit...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2248 Looks good to me. How much faster does it get with the hash combiner? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4213) Provide CombineHint in Gelly algorithms
[ https://issues.apache.org/jira/browse/FLINK-4213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378165#comment-15378165 ] ASF GitHub Bot commented on FLINK-4213: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2248 Looks good to me. How much faster does it get with the hash combiner? > Provide CombineHint in Gelly algorithms > --- > > Key: FLINK-4213 > URL: https://issues.apache.org/jira/browse/FLINK-4213 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > > Many graph algorithms will see better {{reduce}} performance with the > hash-combine compared with the still default sort-combine, e.g. HITS and > LocalClusteringCoefficient. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4217) Gelly drivers should read CSV values as strings
[ https://issues.apache.org/jira/browse/FLINK-4217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15378145#comment-15378145 ] ASF GitHub Bot commented on FLINK-4217: --- GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/2250 [FLINK-4217] [gelly] Gelly drivers should read CSV values as strings You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 4217_gelly_drivers_should_read_csv_values_as_strings Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2250.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2250 commit f7674ee7126d20f8627c4007592c41b7f8a2bc39 Author: Greg HoganDate: 2016-07-14T14:02:23Z [FLINK-4217] [gelly] Gelly drivers should read CSV values as strings The user must now select "integer" or "string" when reading a graph from a CSV file. > Gelly drivers should read CSV values as strings > --- > > Key: FLINK-4217 > URL: https://issues.apache.org/jira/browse/FLINK-4217 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > > Gelly drivers ClusteringCoefficient, HITS, JaccardIndex, and TriangleListing > parse CSV files as {{LongValue}}. This works for anonymized data sets such as > SNAP but should be configurable as {{StringValue}} to handle the general case. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2250: [FLINK-4217] [gelly] Gelly drivers should read CSV...
GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/2250 [FLINK-4217] [gelly] Gelly drivers should read CSV values as strings You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 4217_gelly_drivers_should_read_csv_values_as_strings Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2250.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2250 commit f7674ee7126d20f8627c4007592c41b7f8a2bc39 Author: Greg HoganDate: 2016-07-14T14:02:23Z [FLINK-4217] [gelly] Gelly drivers should read CSV values as strings The user must now select "integer" or "string" when reading a graph from a CSV file. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-3630) Little mistake in documentation
[ https://issues.apache.org/jira/browse/FLINK-3630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan reassigned FLINK-3630: - Assignee: Greg Hogan (was: Riccardo Diomedi) > Little mistake in documentation > --- > > Key: FLINK-3630 > URL: https://issues.apache.org/jira/browse/FLINK-3630 > Project: Flink > Issue Type: Bug > Components: DataSet API, Documentation >Affects Versions: 1.0.0 >Reporter: Riccardo Diomedi >Assignee: Greg Hogan >Priority: Minor > Labels: documentation > > in section "GroupCombine on a Grouped DataSet" of the following link: > https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/dataset_transformations.html#groupreduce-on-grouped-dataset > there is a little mistake in java code in both combine and reduce method(it's > the same mistake). The variable "word" is defined in the scope of the for > loop so it cannot be used in collect method. > Possible solution could be to initialise the variable before the for and > assign a value inside the for. > Something like: > int count = 0; > String word; > for (String record : words) { > word = record; > count++; > } > out.collect(new Tuple2(word, count)); -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4219) Quote PDSH opts in start-cluster.sh
Greg Hogan created FLINK-4219: - Summary: Quote PDSH opts in start-cluster.sh Key: FLINK-4219 URL: https://issues.apache.org/jira/browse/FLINK-4219 Project: Flink Issue Type: Bug Components: Startup Shell Scripts Affects Versions: 1.1.0 Reporter: Greg Hogan Assignee: Greg Hogan Fix For: 1.1.0 Quote {{PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS}} in {{start-cluster.sh}} to prevent word splitting if the user configures multiple SSH options. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2239: [FLINK-4208] Support Running Flink processes in foregroun...
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/2239 What if instead of changing how we start the daemon (so continue to always start as a background process), we instead add a `wait` after the PID file has been updated when starting a foreground process? If FLINK-4212 is accepted we would also need to release the file lock before calling `wait`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2220: [FLINK-4184] [metrics] Replace invalid characters ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2220#discussion_r70845241 --- Diff: flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java --- @@ -74,6 +75,15 @@ protected ScheduledDropwizardReporter() { } // + // Getters + // + + // used for testing purposes + MapgetCounters() { --- End diff -- could we move this into the TestingScheduledDropwizardReporter? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4184) Ganglia and GraphiteReporter report metric names with invalid characters
[ https://issues.apache.org/jira/browse/FLINK-4184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15377314#comment-15377314 ] ASF GitHub Bot commented on FLINK-4184: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2220 only one comment left, otherwise +1 > Ganglia and GraphiteReporter report metric names with invalid characters > > > Key: FLINK-4184 > URL: https://issues.apache.org/jira/browse/FLINK-4184 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.1.0 > > > Flink's {{GangliaReporter}} and {{GraphiteReporter}} report metrics with > names which contain invalid characters. For example, quotes are not filtered > out which can be problematic for Ganglia. Moreover, dots are not replaced > which causes Graphite to think that an IP address is actually a scoped metric > name. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2220: [FLINK-4184] [metrics] Replace invalid characters in Sche...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/2220 only one comment left, otherwise +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4184) Ganglia and GraphiteReporter report metric names with invalid characters
[ https://issues.apache.org/jira/browse/FLINK-4184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15377304#comment-15377304 ] ASF GitHub Bot commented on FLINK-4184: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2220#discussion_r70845241 --- Diff: flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java --- @@ -74,6 +75,15 @@ protected ScheduledDropwizardReporter() { } // + // Getters + // + + // used for testing purposes + MapgetCounters() { --- End diff -- could we move this into the TestingScheduledDropwizardReporter? > Ganglia and GraphiteReporter report metric names with invalid characters > > > Key: FLINK-4184 > URL: https://issues.apache.org/jira/browse/FLINK-4184 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.1.0 > > > Flink's {{GangliaReporter}} and {{GraphiteReporter}} report metrics with > names which contain invalid characters. For example, quotes are not filtered > out which can be problematic for Ganglia. Moreover, dots are not replaced > which causes Graphite to think that an IP address is actually a scoped metric > name. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4208) Support Running Flink processes in foreground mode
[ https://issues.apache.org/jira/browse/FLINK-4208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15377297#comment-15377297 ] ASF GitHub Bot commented on FLINK-4208: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2239#discussion_r70844709 --- Diff: flink-dist/src/main/flink-bin/bin/flink-daemon.sh --- @@ -77,31 +77,36 @@ if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then fi case $STARTSTOP in +(start|start-foreground) + # Rotate log files + rotateLogFile $log + rotateLogFile $out + + # Print a warning if daemons are already running on host + if [ -f $pid ]; then +active=() +while IFS='' read -r p || [[ -n "$p" ]]; do + kill -0 $p >/dev/null 2>&1 + if [ $? -eq 0 ]; then +active+=($p) + fi +done < "${pid}" -(start) -# Rotate log files -rotateLogFile $log -rotateLogFile $out - -# Print a warning if daemons are already running on host -if [ -f $pid ]; then - active=() - while IFS='' read -r p || [[ -n "$p" ]]; do -kill -0 $p >/dev/null 2>&1 -if [ $? -eq 0 ]; then - active+=($p) -fi - done < "${pid}" - - count="${#active[@]}" +count="${#active[@]}" - if [ ${count} -gt 0 ]; then -echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME." - fi +if [ ${count} -gt 0 ]; then + echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME." fi + fi + + if [[ $STARTSTOP == "start-foreground" ]]; then +echo "Starting $DAEMON as a foreground process on host $HOSTNAME." +$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null + fi + if [[ $STARTSTOP == "start" ]]; then echo "Starting $DAEMON daemon on host $HOSTNAME." -$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null & +nohup $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null & --- End diff -- Under what circumstances is the `nohup` necessary? > Support Running Flink processes in foreground mode > -- > > Key: FLINK-4208 > URL: https://issues.apache.org/jira/browse/FLINK-4208 > Project: Flink > Issue Type: Improvement >Reporter: Ismaël Mejía >Priority: Minor > > Flink clusters are started automatically in daemon mode, this is definitely > the default case, however if we want to start containers based on flinks, the > execution context gets lost. Running flink as foreground processes can fix > this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2239: [FLINK-4208] Support Running Flink processes in fo...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/2239#discussion_r70844709 --- Diff: flink-dist/src/main/flink-bin/bin/flink-daemon.sh --- @@ -77,31 +77,36 @@ if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then fi case $STARTSTOP in +(start|start-foreground) + # Rotate log files + rotateLogFile $log + rotateLogFile $out + + # Print a warning if daemons are already running on host + if [ -f $pid ]; then +active=() +while IFS='' read -r p || [[ -n "$p" ]]; do + kill -0 $p >/dev/null 2>&1 + if [ $? -eq 0 ]; then +active+=($p) + fi +done < "${pid}" -(start) -# Rotate log files -rotateLogFile $log -rotateLogFile $out - -# Print a warning if daemons are already running on host -if [ -f $pid ]; then - active=() - while IFS='' read -r p || [[ -n "$p" ]]; do -kill -0 $p >/dev/null 2>&1 -if [ $? -eq 0 ]; then - active+=($p) -fi - done < "${pid}" - - count="${#active[@]}" +count="${#active[@]}" - if [ ${count} -gt 0 ]; then -echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME." - fi +if [ ${count} -gt 0 ]; then + echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME." fi + fi + + if [[ $STARTSTOP == "start-foreground" ]]; then +echo "Starting $DAEMON as a foreground process on host $HOSTNAME." +$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null + fi + if [[ $STARTSTOP == "start" ]]; then echo "Starting $DAEMON daemon on host $HOSTNAME." -$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null & +nohup $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null & --- End diff -- Under what circumstances is the `nohup` necessary? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4186) Expose Kafka metrics through Flink metrics
[ https://issues.apache.org/jira/browse/FLINK-4186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15377290#comment-15377290 ] ASF GitHub Bot commented on FLINK-4186: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2236 couldn't find any problem, +1 from my side. > Expose Kafka metrics through Flink metrics > -- > > Key: FLINK-4186 > URL: https://issues.apache.org/jira/browse/FLINK-4186 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.1.0 >Reporter: Robert Metzger >Assignee: Robert Metzger > > Currently, we expose the Kafka metrics through Flink's accumulators. > We can now use the metrics system in Flink to report Kafka metrics. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2236: [FLINK-4186] Use Flink metrics to report Kafka metrics
Github user zentol commented on the issue: https://github.com/apache/flink/pull/2236 couldn't find any problem, +1 from my side. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4186) Expose Kafka metrics through Flink metrics
[ https://issues.apache.org/jira/browse/FLINK-4186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15377282#comment-15377282 ] ASF GitHub Bot commented on FLINK-4186: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2236#discussion_r70843655 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -1235,15 +1236,127 @@ public void flatMap(Tuple2value, Collector out) throws JobExecutionResult result = tryExecute(env1, "Consume " + ELEMENT_COUNT + " elements from Kafka"); - Map accuResults = result.getAllAccumulatorResults(); - // kafka 0.9 consumer: 39 results - if (kafkaServer.getVersion().equals("0.9")) { - assertTrue("Not enough accumulators from Kafka Consumer: " + accuResults.size(), accuResults.size() > 38); + deleteTestTopic(topic); + } + + /** +* Test metrics reporting for consumer +* +* @throws Exception +*/ + public void runMetricsTest() throws Throwable { + + // create a stream with 5 topics + final String topic = "metricsStream"; + createTestTopic(topic, 5, 1); + + final Tuple1 error = new Tuple1<>(null); + Runnable job = new Runnable() { + @Override + public void run() { + try { + // start job writing & reading data. + final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env1.setParallelism(1); + env1.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env1.getConfig().disableSysoutLogging(); + env1.disableOperatorChaining(); // let the source read everything into the network buffers + + TypeInformationSerializationSchema > schema = new TypeInformationSerializationSchema<>(TypeInfoParser. >parse("Tuple2 "), env1.getConfig()); + DataStream > fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps)); + fromKafka.flatMap(new FlatMapFunction , Void>() { + @Override + public void flatMap(Tuple2 value, Collector out) throws Exception {// no op + } + }); + + DataStream > fromGen = env1.addSource(new RichSourceFunction >() { + boolean running = true; + + @Override + public void run(SourceContext > ctx) throws Exception { + int i = 0; + while (running) { + ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask())); + Thread.sleep(1); + } + } + + @Override + public void cancel() { + running = false; + } + }); + + fromGen.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null)); + + env1.execute("Metrics test job"); + } catch(Throwable t) { + LOG.warn("Got exception during execution", t); + if(!(t.getCause() instanceof JobCancellationException)) { // we'll cancel the job + error.f0 = t; + } + } +
[GitHub] flink pull request #2236: [FLINK-4186] Use Flink metrics to report Kafka met...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2236#discussion_r70843655 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -1235,15 +1236,127 @@ public void flatMap(Tuple2value, Collector out) throws JobExecutionResult result = tryExecute(env1, "Consume " + ELEMENT_COUNT + " elements from Kafka"); - Map accuResults = result.getAllAccumulatorResults(); - // kafka 0.9 consumer: 39 results - if (kafkaServer.getVersion().equals("0.9")) { - assertTrue("Not enough accumulators from Kafka Consumer: " + accuResults.size(), accuResults.size() > 38); + deleteTestTopic(topic); + } + + /** +* Test metrics reporting for consumer +* +* @throws Exception +*/ + public void runMetricsTest() throws Throwable { + + // create a stream with 5 topics + final String topic = "metricsStream"; + createTestTopic(topic, 5, 1); + + final Tuple1 error = new Tuple1<>(null); + Runnable job = new Runnable() { + @Override + public void run() { + try { + // start job writing & reading data. + final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env1.setParallelism(1); + env1.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env1.getConfig().disableSysoutLogging(); + env1.disableOperatorChaining(); // let the source read everything into the network buffers + + TypeInformationSerializationSchema > schema = new TypeInformationSerializationSchema<>(TypeInfoParser. >parse("Tuple2 "), env1.getConfig()); + DataStream > fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps)); + fromKafka.flatMap(new FlatMapFunction , Void>() { + @Override + public void flatMap(Tuple2 value, Collector out) throws Exception {// no op + } + }); + + DataStream > fromGen = env1.addSource(new RichSourceFunction >() { + boolean running = true; + + @Override + public void run(SourceContext > ctx) throws Exception { + int i = 0; + while (running) { + ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask())); + Thread.sleep(1); + } + } + + @Override + public void cancel() { + running = false; + } + }); + + fromGen.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null)); + + env1.execute("Metrics test job"); + } catch(Throwable t) { + LOG.warn("Got exception during execution", t); + if(!(t.getCause() instanceof JobCancellationException)) { // we'll cancel the job + error.f0 = t; + } + } + } + }; + Thread jobThread = new Thread(job); + jobThread.start(); + + try { + // connect to JMX + MBeanServer
[GitHub] flink pull request #2249: 4166 zookeeper namespaces
GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/2249 4166 zookeeper namespaces Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink 4166-zookeeper-namespaces Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2249.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2249 commit 6e418c9ee6004cd13d4bfde158ff0b56cb0136aa Author: Stefan RichterDate: 2016-07-14T16:21:40Z [FLINK-4166] [Distributed Coordination] zookeeper namespaces (cli parameter -z) commit f343da4042a02aa86b5bde81f37d13200a081b41 Author: Stefan Richter Date: 2016-07-14T17:01:15Z [FLINK-4166] [Distributed Coordination] doku: zookeeper namespaces (cli parameter -z) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3985) A metric with the name * was already registered
[ https://issues.apache.org/jira/browse/FLINK-3985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15377240#comment-15377240 ] Joshua Griffith commented on FLINK-3985: I'm also seeing around 3000 lines of this error repeatedly outputted to the log when running Flink 1.1-SNAPSHOT programs locally. Is there a way to turn off JMX reporting? > A metric with the name * was already registered > --- > > Key: FLINK-3985 > URL: https://issues.apache.org/jira/browse/FLINK-3985 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.1.0 >Reporter: Robert Metzger >Assignee: Stephan Ewen > Labels: test-stability > > The YARN tests detected the following failure while running WordCount. > {code} > 2016-05-27 21:50:48,230 INFO org.apache.flink.yarn.YarnTaskManager > - Received task CHAIN DataSource (at main(WordCount.java:70) > (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at > main(WordCount.java:80)) -> Combine(SUM(1), at main(WordCount.java:83) (1/2) > 2016-05-27 21:50:48,231 ERROR org.apache.flink.metrics.reporter.JMXReporter > - A metric with the name > org.apache.flink.metrics:key0=testing-worker-linux-docker-6e03e1e8-3385-linux-1,key1=taskmanager,key2=ee7c10183f32c9a96f8e7cfd873863d1,key3=WordCount_Example,key4=CHAIN_DataSource_(at_main(WordCount.java-70)_(org.apache.flink.api.java.io.TextInputFormat))_->_FlatMap_(FlatMap_at_main(WordCount.java-80))_->_Combine(SUM(1)-_at_main(WordCount.java-83),name=numBytesIn > was already registered. > javax.management.InstanceAlreadyExistsException: > org.apache.flink.metrics:key0=testing-worker-linux-docker-6e03e1e8-3385-linux-1,key1=taskmanager,key2=ee7c10183f32c9a96f8e7cfd873863d1,key3=WordCount_Example,key4=CHAIN_DataSource_(at_main(WordCount.java-70)_(org.apache.flink.api.java.io.TextInputFormat))_->_FlatMap_(FlatMap_at_main(WordCount.java-80))_->_Combine(SUM(1)-_at_main(WordCount.java-83),name=numBytesIn > at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) > at > com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) > at > org.apache.flink.metrics.reporter.JMXReporter.notifyOfAddedMetric(JMXReporter.java:76) > at > org.apache.flink.metrics.MetricRegistry.register(MetricRegistry.java:177) > at > org.apache.flink.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:191) > at > org.apache.flink.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:144) > at > org.apache.flink.metrics.groups.IOMetricGroup.(IOMetricGroup.java:40) > at > org.apache.flink.metrics.groups.TaskMetricGroup.(TaskMetricGroup.java:74) > at > org.apache.flink.metrics.groups.JobMetricGroup.addTask(JobMetricGroup.java:74) > at > org.apache.flink.metrics.groups.TaskManagerMetricGroup.addTaskForJob(TaskManagerMetricGroup.java:86) > at > org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.scala:1093) > at > org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:442) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:284) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at
[GitHub] flink pull request #2248: [FLINK-4213] [gelly] Provide CombineHint in Gelly ...
GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/2248 [FLINK-4213] [gelly] Provide CombineHint in Gelly algorithms `VertexDegrees` has changed from using a `ReduceFunction` to using a `GroupReduceFunction` as in directed `TriangleListing`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 4213_provide_combinehint_in_gelly_algorithms Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2248.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2248 commit 2a8e5c708acefa881223bb3c8a6371bd5bb6ea9b Author: Greg HoganDate: 2016-07-14T13:39:02Z [FLINK-4213] [gelly] Provide CombineHint in Gelly algorithms --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4213) Provide CombineHint in Gelly algorithms
[ https://issues.apache.org/jira/browse/FLINK-4213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15377224#comment-15377224 ] ASF GitHub Bot commented on FLINK-4213: --- GitHub user greghogan opened a pull request: https://github.com/apache/flink/pull/2248 [FLINK-4213] [gelly] Provide CombineHint in Gelly algorithms `VertexDegrees` has changed from using a `ReduceFunction` to using a `GroupReduceFunction` as in directed `TriangleListing`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/greghogan/flink 4213_provide_combinehint_in_gelly_algorithms Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2248.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2248 commit 2a8e5c708acefa881223bb3c8a6371bd5bb6ea9b Author: Greg HoganDate: 2016-07-14T13:39:02Z [FLINK-4213] [gelly] Provide CombineHint in Gelly algorithms > Provide CombineHint in Gelly algorithms > --- > > Key: FLINK-4213 > URL: https://issues.apache.org/jira/browse/FLINK-4213 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > > Many graph algorithms will see better {{reduce}} performance with the > hash-combine compared with the still default sort-combine, e.g. HITS and > LocalClusteringCoefficient. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4218) Sporadic "java.lang.RuntimeException: Error triggering a checkpoint..." causes task restarting
[ https://issues.apache.org/jira/browse/FLINK-4218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15377211#comment-15377211 ] Robert Metzger commented on FLINK-4218: --- I think this an artifact of S3's consistency model. Enabling EMRFS on EMR will probably resolve this issue: https://www.infoq.com/news/2015/01/emrfs-s3-consistency > Sporadic "java.lang.RuntimeException: Error triggering a checkpoint..." > causes task restarting > -- > > Key: FLINK-4218 > URL: https://issues.apache.org/jira/browse/FLINK-4218 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.1.0 >Reporter: Sergii Koshel > > Sporadically see exception as below. And restart of task because of it. > {code:title=Exception|borderStyle=solid} > java.lang.RuntimeException: Error triggering a checkpoint as the result of > receiving checkpoint barrier > at > org.apache.flink.streaming.runtime.tasks.StreamTask$3.onEvent(StreamTask.java:785) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$3.onEvent(StreamTask.java:775) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:265) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.FileNotFoundException: No such file or directory: > s3:///flink/checkpoints/ece317c26960464ba5de75f3bbc38cb2/chk-8810/96eebbeb-de14-45c7-8ebb-e7cde978d6d3 > at > org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:996) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:77) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:351) > at > org.apache.flink.runtime.state.filesystem.AbstractFileStateHandle.getFileSize(AbstractFileStateHandle.java:93) > at > org.apache.flink.runtime.state.filesystem.FileStreamStateHandle.getStateSize(FileStreamStateHandle.java:58) > at > org.apache.flink.runtime.state.AbstractStateBackend$DataInputViewHandle.getStateSize(AbstractStateBackend.java:482) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskStateList.getStateSize(StreamTaskStateList.java:77) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:604) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$3.onEvent(StreamTask.java:779) > ... 8 more > {code} > File actually exists on S3. > I suppose it is related to some race conditions with S3 but would be good to > retry a few times before stop task execution. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4212) Lock PID file when starting daemons
[ https://issues.apache.org/jira/browse/FLINK-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Hogan updated FLINK-4212: -- Summary: Lock PID file when starting daemons (was: Lock on pid file when starting daemons) > Lock PID file when starting daemons > --- > > Key: FLINK-4212 > URL: https://issues.apache.org/jira/browse/FLINK-4212 > Project: Flink > Issue Type: Improvement > Components: Startup Shell Scripts >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > > As noted on the mailing list (0), when multiple TaskManagers are started in > parallel (using pdsh) there is a race condition on updating the pid: 1) the > pid file is first read to parse the process' index, 2) the process is > started, and 3) on success the daemon pid is appended to the pid file. > We could use a tool such as {{flock}} to lock on the pid file while starting > the Flink daemon. > 0: > http://mail-archives.apache.org/mod_mbox/flink-user/201607.mbox/%3CCA%2BssbKXw954Bz_sBRwP6db0FntWyGWzTyP7wJZ5nhOeQnof3kg%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4186) Expose Kafka metrics through Flink metrics
[ https://issues.apache.org/jira/browse/FLINK-4186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15377134#comment-15377134 ] ASF GitHub Bot commented on FLINK-4186: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2236 I fixed the failing build and addressed all comments so far ;) (I was working on the code while you've reviewed it ) > Expose Kafka metrics through Flink metrics > -- > > Key: FLINK-4186 > URL: https://issues.apache.org/jira/browse/FLINK-4186 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.1.0 >Reporter: Robert Metzger >Assignee: Robert Metzger > > Currently, we expose the Kafka metrics through Flink's accumulators. > We can now use the metrics system in Flink to report Kafka metrics. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4216) WordWithCount example with Java has wrong generics type
[ https://issues.apache.org/jira/browse/FLINK-4216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15377116#comment-15377116 ] ASF GitHub Bot commented on FLINK-4216: --- Github user serhiy commented on the issue: https://github.com/apache/flink/pull/2247 Damn I was a little bit late :D! You guys are fast :D! > WordWithCount example with Java has wrong generics type > --- > > Key: FLINK-4216 > URL: https://issues.apache.org/jira/browse/FLINK-4216 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Serhiy Boychenko >Priority: Trivial > Original Estimate: 10m > Remaining Estimate: 10m > > The Java example of the POJOs results in the: > {code} > Exception in thread "main" java.lang.Error: Unresolved compilation problem: > {code} > due to the wrong type of the generics of the DataStream. > Currently it is {code}DataStream>{code} > but should be {code}DataSource{code}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2236: [FLINK-4186] Use Flink metrics to report Kafka metrics
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2236 I fixed the failing build and addressed all comments so far ;) (I was working on the code while you've reviewed it ) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4186) Expose Kafka metrics through Flink metrics
[ https://issues.apache.org/jira/browse/FLINK-4186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15377131#comment-15377131 ] ASF GitHub Bot commented on FLINK-4186: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2236#discussion_r70828051 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -1235,15 +1236,129 @@ public void flatMap(Tuple2value, Collector out) throws JobExecutionResult result = tryExecute(env1, "Consume " + ELEMENT_COUNT + " elements from Kafka"); - Map accuResults = result.getAllAccumulatorResults(); - // kafka 0.9 consumer: 39 results - if (kafkaServer.getVersion().equals("0.9")) { - assertTrue("Not enough accumulators from Kafka Consumer: " + accuResults.size(), accuResults.size() > 38); + deleteTestTopic(topic); + } + + /** +* Test metrics reporting for consumer +* +* @throws Exception +*/ + public void runMetricsTest() throws Throwable { + + // create a stream with 5 topics + final String topic = "metricsStream"; + createTestTopic(topic, 5, 1); + + final Tuple1 error = new Tuple1<>(null); + Runnable job = new Runnable() { + @Override + public void run() { + try { + // start job writing & reading data. + final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env1.setParallelism(1); + env1.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env1.getConfig().disableSysoutLogging(); + env1.disableOperatorChaining(); // let the source read everything into the network buffers + + TypeInformationSerializationSchema > schema = new TypeInformationSerializationSchema<>(TypeInfoParser. >parse("Tuple2 "), env1.getConfig()); + DataStream > fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps)); + fromKafka.flatMap(new FlatMapFunction , Void>() { + @Override + public void flatMap(Tuple2 value, Collector out) throws Exception { + // read slowly + Thread.sleep(100); + } + }); + + DataStream > fromGen = env1.addSource(new RichSourceFunction >() { + boolean running = true; + + @Override + public void run(SourceContext > ctx) throws Exception { + int i = 0; + while (running) { + ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask())); + Thread.sleep(1); + } + } + + @Override + public void cancel() { + running = false; + } + }); + + fromGen.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null)); + + env1.execute("Metrics test job"); + } catch(Throwable t) { + LOG.warn("Got exception during execution", t); + if(!(t.getCause() instanceof JobCancellationException)) { // we'll cancel the job +
[jira] [Commented] (FLINK-4186) Expose Kafka metrics through Flink metrics
[ https://issues.apache.org/jira/browse/FLINK-4186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15377132#comment-15377132 ] ASF GitHub Bot commented on FLINK-4186: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2236#discussion_r70828069 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -1235,15 +1236,129 @@ public void flatMap(Tuple2value, Collector out) throws JobExecutionResult result = tryExecute(env1, "Consume " + ELEMENT_COUNT + " elements from Kafka"); - Map accuResults = result.getAllAccumulatorResults(); - // kafka 0.9 consumer: 39 results - if (kafkaServer.getVersion().equals("0.9")) { - assertTrue("Not enough accumulators from Kafka Consumer: " + accuResults.size(), accuResults.size() > 38); + deleteTestTopic(topic); + } + + /** +* Test metrics reporting for consumer +* +* @throws Exception +*/ + public void runMetricsTest() throws Throwable { + + // create a stream with 5 topics + final String topic = "metricsStream"; + createTestTopic(topic, 5, 1); + + final Tuple1 error = new Tuple1<>(null); + Runnable job = new Runnable() { + @Override + public void run() { + try { + // start job writing & reading data. + final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env1.setParallelism(1); + env1.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env1.getConfig().disableSysoutLogging(); + env1.disableOperatorChaining(); // let the source read everything into the network buffers + + TypeInformationSerializationSchema > schema = new TypeInformationSerializationSchema<>(TypeInfoParser. >parse("Tuple2 "), env1.getConfig()); + DataStream > fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps)); + fromKafka.flatMap(new FlatMapFunction , Void>() { + @Override + public void flatMap(Tuple2 value, Collector out) throws Exception { + // read slowly + Thread.sleep(100); + } + }); + + DataStream > fromGen = env1.addSource(new RichSourceFunction >() { + boolean running = true; + + @Override + public void run(SourceContext > ctx) throws Exception { + int i = 0; + while (running) { + ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask())); + Thread.sleep(1); + } + } + + @Override + public void cancel() { + running = false; + } + }); + + fromGen.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null)); + + env1.execute("Metrics test job"); + } catch(Throwable t) { + LOG.warn("Got exception during execution", t); + if(!(t.getCause() instanceof JobCancellationException)) { // we'll cancel the job +
[GitHub] flink pull request #2236: [FLINK-4186] Use Flink metrics to report Kafka met...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2236#discussion_r70828017 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -1235,15 +1236,129 @@ public void flatMap(Tuple2value, Collector out) throws JobExecutionResult result = tryExecute(env1, "Consume " + ELEMENT_COUNT + " elements from Kafka"); - Map accuResults = result.getAllAccumulatorResults(); - // kafka 0.9 consumer: 39 results - if (kafkaServer.getVersion().equals("0.9")) { - assertTrue("Not enough accumulators from Kafka Consumer: " + accuResults.size(), accuResults.size() > 38); + deleteTestTopic(topic); + } + + /** +* Test metrics reporting for consumer +* +* @throws Exception +*/ + public void runMetricsTest() throws Throwable { + + // create a stream with 5 topics + final String topic = "metricsStream"; + createTestTopic(topic, 5, 1); + + final Tuple1 error = new Tuple1<>(null); + Runnable job = new Runnable() { + @Override + public void run() { + try { + // start job writing & reading data. + final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env1.setParallelism(1); + env1.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env1.getConfig().disableSysoutLogging(); + env1.disableOperatorChaining(); // let the source read everything into the network buffers + + TypeInformationSerializationSchema > schema = new TypeInformationSerializationSchema<>(TypeInfoParser. >parse("Tuple2 "), env1.getConfig()); + DataStream > fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps)); + fromKafka.flatMap(new FlatMapFunction , Void>() { + @Override + public void flatMap(Tuple2 value, Collector out) throws Exception { + // read slowly + Thread.sleep(100); + } + }); + + DataStream > fromGen = env1.addSource(new RichSourceFunction >() { + boolean running = true; + + @Override + public void run(SourceContext > ctx) throws Exception { + int i = 0; + while (running) { + ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask())); + Thread.sleep(1); + } + } + + @Override + public void cancel() { + running = false; + } + }); + + fromGen.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null)); + + env1.execute("Metrics test job"); + } catch(Throwable t) { + LOG.warn("Got exception during execution", t); + if(!(t.getCause() instanceof JobCancellationException)) { // we'll cancel the job + error.f0 = t; + } + } + } + }; + Thread jobThread = new Thread(job); +
[GitHub] flink pull request #2236: [FLINK-4186] Use Flink metrics to report Kafka met...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2236#discussion_r70828051 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -1235,15 +1236,129 @@ public void flatMap(Tuple2value, Collector out) throws JobExecutionResult result = tryExecute(env1, "Consume " + ELEMENT_COUNT + " elements from Kafka"); - Map accuResults = result.getAllAccumulatorResults(); - // kafka 0.9 consumer: 39 results - if (kafkaServer.getVersion().equals("0.9")) { - assertTrue("Not enough accumulators from Kafka Consumer: " + accuResults.size(), accuResults.size() > 38); + deleteTestTopic(topic); + } + + /** +* Test metrics reporting for consumer +* +* @throws Exception +*/ + public void runMetricsTest() throws Throwable { + + // create a stream with 5 topics + final String topic = "metricsStream"; + createTestTopic(topic, 5, 1); + + final Tuple1 error = new Tuple1<>(null); + Runnable job = new Runnable() { + @Override + public void run() { + try { + // start job writing & reading data. + final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env1.setParallelism(1); + env1.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env1.getConfig().disableSysoutLogging(); + env1.disableOperatorChaining(); // let the source read everything into the network buffers + + TypeInformationSerializationSchema > schema = new TypeInformationSerializationSchema<>(TypeInfoParser. >parse("Tuple2 "), env1.getConfig()); + DataStream > fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps)); + fromKafka.flatMap(new FlatMapFunction , Void>() { + @Override + public void flatMap(Tuple2 value, Collector out) throws Exception { + // read slowly + Thread.sleep(100); + } + }); + + DataStream > fromGen = env1.addSource(new RichSourceFunction >() { + boolean running = true; + + @Override + public void run(SourceContext > ctx) throws Exception { + int i = 0; + while (running) { + ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask())); + Thread.sleep(1); + } + } + + @Override + public void cancel() { + running = false; + } + }); + + fromGen.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null)); + + env1.execute("Metrics test job"); + } catch(Throwable t) { + LOG.warn("Got exception during execution", t); + if(!(t.getCause() instanceof JobCancellationException)) { // we'll cancel the job + error.f0 = t; + } + } + } + }; + Thread jobThread = new Thread(job); +
[jira] [Commented] (FLINK-4186) Expose Kafka metrics through Flink metrics
[ https://issues.apache.org/jira/browse/FLINK-4186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15377130#comment-15377130 ] ASF GitHub Bot commented on FLINK-4186: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2236#discussion_r70828017 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -1235,15 +1236,129 @@ public void flatMap(Tuple2value, Collector out) throws JobExecutionResult result = tryExecute(env1, "Consume " + ELEMENT_COUNT + " elements from Kafka"); - Map accuResults = result.getAllAccumulatorResults(); - // kafka 0.9 consumer: 39 results - if (kafkaServer.getVersion().equals("0.9")) { - assertTrue("Not enough accumulators from Kafka Consumer: " + accuResults.size(), accuResults.size() > 38); + deleteTestTopic(topic); + } + + /** +* Test metrics reporting for consumer +* +* @throws Exception +*/ + public void runMetricsTest() throws Throwable { + + // create a stream with 5 topics + final String topic = "metricsStream"; + createTestTopic(topic, 5, 1); + + final Tuple1 error = new Tuple1<>(null); + Runnable job = new Runnable() { + @Override + public void run() { + try { + // start job writing & reading data. + final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env1.setParallelism(1); + env1.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env1.getConfig().disableSysoutLogging(); + env1.disableOperatorChaining(); // let the source read everything into the network buffers + + TypeInformationSerializationSchema > schema = new TypeInformationSerializationSchema<>(TypeInfoParser. >parse("Tuple2 "), env1.getConfig()); + DataStream > fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps)); + fromKafka.flatMap(new FlatMapFunction , Void>() { + @Override + public void flatMap(Tuple2 value, Collector out) throws Exception { + // read slowly + Thread.sleep(100); + } + }); + + DataStream > fromGen = env1.addSource(new RichSourceFunction >() { + boolean running = true; + + @Override + public void run(SourceContext > ctx) throws Exception { + int i = 0; + while (running) { + ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask())); + Thread.sleep(1); + } + } + + @Override + public void cancel() { + running = false; + } + }); + + fromGen.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null)); + + env1.execute("Metrics test job"); + } catch(Throwable t) { + LOG.warn("Got exception during execution", t); + if(!(t.getCause() instanceof JobCancellationException)) { // we'll cancel the job +
[jira] [Commented] (FLINK-4186) Expose Kafka metrics through Flink metrics
[ https://issues.apache.org/jira/browse/FLINK-4186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15377128#comment-15377128 ] ASF GitHub Bot commented on FLINK-4186: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2236#discussion_r70827999 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -1235,15 +1236,129 @@ public void flatMap(Tuple2value, Collector out) throws JobExecutionResult result = tryExecute(env1, "Consume " + ELEMENT_COUNT + " elements from Kafka"); - Map accuResults = result.getAllAccumulatorResults(); - // kafka 0.9 consumer: 39 results - if (kafkaServer.getVersion().equals("0.9")) { - assertTrue("Not enough accumulators from Kafka Consumer: " + accuResults.size(), accuResults.size() > 38); + deleteTestTopic(topic); + } + + /** +* Test metrics reporting for consumer +* +* @throws Exception +*/ + public void runMetricsTest() throws Throwable { + + // create a stream with 5 topics + final String topic = "metricsStream"; + createTestTopic(topic, 5, 1); + + final Tuple1 error = new Tuple1<>(null); + Runnable job = new Runnable() { + @Override + public void run() { + try { + // start job writing & reading data. + final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env1.setParallelism(1); + env1.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env1.getConfig().disableSysoutLogging(); + env1.disableOperatorChaining(); // let the source read everything into the network buffers + + TypeInformationSerializationSchema > schema = new TypeInformationSerializationSchema<>(TypeInfoParser. >parse("Tuple2 "), env1.getConfig()); + DataStream > fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps)); + fromKafka.flatMap(new FlatMapFunction , Void>() { + @Override + public void flatMap(Tuple2 value, Collector out) throws Exception { + // read slowly + Thread.sleep(100); + } + }); + + DataStream > fromGen = env1.addSource(new RichSourceFunction >() { + boolean running = true; + + @Override + public void run(SourceContext > ctx) throws Exception { + int i = 0; + while (running) { + ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask())); + Thread.sleep(1); + } + } + + @Override + public void cancel() { + running = false; + } + }); + + fromGen.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null)); + + env1.execute("Metrics test job"); + } catch(Throwable t) { + LOG.warn("Got exception during execution", t); + if(!(t.getCause() instanceof JobCancellationException)) { // we'll cancel the job +
[GitHub] flink pull request #2236: [FLINK-4186] Use Flink metrics to report Kafka met...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2236#discussion_r70828069 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -1235,15 +1236,129 @@ public void flatMap(Tuple2value, Collector out) throws JobExecutionResult result = tryExecute(env1, "Consume " + ELEMENT_COUNT + " elements from Kafka"); - Map accuResults = result.getAllAccumulatorResults(); - // kafka 0.9 consumer: 39 results - if (kafkaServer.getVersion().equals("0.9")) { - assertTrue("Not enough accumulators from Kafka Consumer: " + accuResults.size(), accuResults.size() > 38); + deleteTestTopic(topic); + } + + /** +* Test metrics reporting for consumer +* +* @throws Exception +*/ + public void runMetricsTest() throws Throwable { + + // create a stream with 5 topics + final String topic = "metricsStream"; + createTestTopic(topic, 5, 1); + + final Tuple1 error = new Tuple1<>(null); + Runnable job = new Runnable() { + @Override + public void run() { + try { + // start job writing & reading data. + final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env1.setParallelism(1); + env1.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env1.getConfig().disableSysoutLogging(); + env1.disableOperatorChaining(); // let the source read everything into the network buffers + + TypeInformationSerializationSchema > schema = new TypeInformationSerializationSchema<>(TypeInfoParser. >parse("Tuple2 "), env1.getConfig()); + DataStream > fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps)); + fromKafka.flatMap(new FlatMapFunction , Void>() { + @Override + public void flatMap(Tuple2 value, Collector out) throws Exception { + // read slowly + Thread.sleep(100); + } + }); + + DataStream > fromGen = env1.addSource(new RichSourceFunction >() { + boolean running = true; + + @Override + public void run(SourceContext > ctx) throws Exception { + int i = 0; + while (running) { + ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask())); + Thread.sleep(1); + } + } + + @Override + public void cancel() { + running = false; + } + }); + + fromGen.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null)); + + env1.execute("Metrics test job"); + } catch(Throwable t) { + LOG.warn("Got exception during execution", t); + if(!(t.getCause() instanceof JobCancellationException)) { // we'll cancel the job + error.f0 = t; + } + } + } + }; + Thread jobThread = new Thread(job); +