[GitHub] flink pull request: [FLINK-3353] CSV-related tests may fail depend...
GitHub user stefanobaghino opened a pull request: https://github.com/apache/flink/pull/1598 [FLINK-3353] CSV-related tests may fail depending on locale As the results are hard-coded, it makes sense to explicitly pass the US locale to render the results as strings. Should close [FLINK-3353](https://issues.apache.org/jira/browse/FLINK-3353). You can merge this pull request into a Git repository by running: $ git pull https://github.com/radicalbit/flink 3353 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1598.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1598 commit ef91c907105638fe4d839dc891d3af33ba35e2a8 Author: Stefano BaghinoDate: 2016-02-06T21:57:33Z [FLINK-3353] CSV-related tests may fail depending on locale --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3353) CSV-related tests may fail depending on locale
[ https://issues.apache.org/jira/browse/FLINK-3353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15136027#comment-15136027 ] ASF GitHub Bot commented on FLINK-3353: --- GitHub user stefanobaghino opened a pull request: https://github.com/apache/flink/pull/1598 [FLINK-3353] CSV-related tests may fail depending on locale As the results are hard-coded, it makes sense to explicitly pass the US locale to render the results as strings. Should close [FLINK-3353](https://issues.apache.org/jira/browse/FLINK-3353). You can merge this pull request into a Git repository by running: $ git pull https://github.com/radicalbit/flink 3353 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1598.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1598 commit ef91c907105638fe4d839dc891d3af33ba35e2a8 Author: Stefano BaghinoDate: 2016-02-06T21:57:33Z [FLINK-3353] CSV-related tests may fail depending on locale > CSV-related tests may fail depending on locale > -- > > Key: FLINK-3353 > URL: https://issues.apache.org/jira/browse/FLINK-3353 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.0.0 >Reporter: Stefano Baghino >Assignee: Stefano Baghino >Priority: Trivial > Fix For: 1.0.0 > > > As I've been running some tests, three suites > ({{KMeansWithBroadcastSetITCase.java}}, > {{ScalaCsvReaderWithPOJOITCase.scala}} and {{CsvReaderITCase.java}}) kept > failing locally because the expected results (string literals) were matched > against an object rendered as a string using the {{String.format}} method, a > method whose result depends on the default Locale; as my Locale (Italian) > renders doubles with a comma instead of a dot as the decimal separator, the > representation of doubles diverged from the expected one, thus making my > tests fail, despite the results actually being correct. > As the result is hard-coded, it makes sense to explicitly use the US locale > to represent those object. I'll open a PR with my solution ASAP. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-2213 Makes the number of vcores per YARN...
Github user kl0u commented on the pull request: https://github.com/apache/flink/pull/1588#issuecomment-180847040 Thanks a lot for the comments @rmetzger and @StephanEwen . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2213) Configure number of vcores
[ https://issues.apache.org/jira/browse/FLINK-2213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15135974#comment-15135974 ] ASF GitHub Bot commented on FLINK-2213: --- Github user kl0u commented on the pull request: https://github.com/apache/flink/pull/1588#issuecomment-180847040 Thanks a lot for the comments @rmetzger and @StephanEwen . > Configure number of vcores > -- > > Key: FLINK-2213 > URL: https://issues.apache.org/jira/browse/FLINK-2213 > Project: Flink > Issue Type: Improvement > Components: YARN Client >Affects Versions: 0.10.0 >Reporter: Ufuk Celebi >Assignee: Kostas > Fix For: 1.0.0 > > > Currently, the number of vcores per YARN container is set to 1. > It is desirable to allow configuring this value. As a simple heuristic it > makes sense to at least set it to the number of slots per container. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-2021) Rework examples to use ParameterTool
[ https://issues.apache.org/jira/browse/FLINK-2021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefano Baghino reassigned FLINK-2021: -- Assignee: Stefano Baghino > Rework examples to use ParameterTool > > > Key: FLINK-2021 > URL: https://issues.apache.org/jira/browse/FLINK-2021 > Project: Flink > Issue Type: Improvement > Components: Examples >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Stefano Baghino >Priority: Minor > Labels: starter > > In FLINK-1525, we introduced the {{ParameterTool}}. > We should port the examples to use the tool. > The examples could look like this (we should maybe discuss it first on the > mailing lists): > {code} > public static void main(String[] args) throws Exception { > ParameterTool pt = ParameterTool.fromArgs(args); > boolean fileOutput = pt.getNumberOfParameters() == 2; > String textPath = null; > String outputPath = null; > if(fileOutput) { > textPath = pt.getRequired("input"); > outputPath = pt.getRequired("output"); > } > // set up the execution environment > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > env.getConfig().setUserConfig(pt); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3352) RocksDB Backend cannot determine correct hdfs path
Gyula Fora created FLINK-3352: - Summary: RocksDB Backend cannot determine correct hdfs path Key: FLINK-3352 URL: https://issues.apache.org/jira/browse/FLINK-3352 Project: Flink Issue Type: Bug Components: Streaming Reporter: Gyula Fora Priority: Blocker The HDFSCopyFromLocal utility class instantiates the hadoop FileSystem object with the default configuration. This disregards the hadoop configurations pulled in by the Flink FileSystem wrappers causing an error if the hostname is not specified in the uri for instance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3351) RocksDB Backend cannot determine correct local db path
Gyula Fora created FLINK-3351: - Summary: RocksDB Backend cannot determine correct local db path Key: FLINK-3351 URL: https://issues.apache.org/jira/browse/FLINK-3351 Project: Flink Issue Type: Bug Components: Streaming Reporter: Gyula Fora The rocks db cannot handle if the String path to the local directory is given in the URI form of: "file://path/..." The easiest would be to convert to Path in the constructor instead of storing the raw string. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3353) CSV-related tests may fail depending on locale
Stefano Baghino created FLINK-3353: -- Summary: CSV-related tests may fail depending on locale Key: FLINK-3353 URL: https://issues.apache.org/jira/browse/FLINK-3353 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.0.0 Reporter: Stefano Baghino Assignee: Stefano Baghino Priority: Trivial Fix For: 1.0.0 As I've been running some tests, three suites ({{KMeansWithBroadcastSetITCase.java}}, {{ScalaCsvReaderWithPOJOITCase.scala}} and {{CsvReaderITCase.java}}) kept failing locally because the expected results (string literals) were matched against an object rendered as a string using the {{String.format}} method, a method whose result depends on the default Locale; as my Locale (Italian) renders doubles with a comma instead of a dot as the decimal separator, the representation of doubles diverged from the expected one, thus making my tests fail, despite the results actually being correct. As the result is hard-coded, it makes sense to explicitly use the US locale to represent those object. I'll open a PR with my solution ASAP. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3354) RocksDB should compute checkpoint size based on backup file size
Gyula Fora created FLINK-3354: - Summary: RocksDB should compute checkpoint size based on backup file size Key: FLINK-3354 URL: https://issues.apache.org/jira/browse/FLINK-3354 Project: Flink Issue Type: Improvement Components: Streaming Reporter: Gyula Fora Priority: Critical Currently the RocksDB backend returns 0 for state size, the actual state size could be computed using: fs.getContentSummary(path).getLength(); -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3355) Allow passing RocksDB Option to RocksDBStateBackend
Gyula Fora created FLINK-3355: - Summary: Allow passing RocksDB Option to RocksDBStateBackend Key: FLINK-3355 URL: https://issues.apache.org/jira/browse/FLINK-3355 Project: Flink Issue Type: Improvement Components: Streaming Reporter: Gyula Fora Priority: Critical Currently the RocksDB state backend does not allow users to set the parameters of the created store which might lead to suboptimal performance on some workloads. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3035) Redis as State Backend
[ https://issues.apache.org/jira/browse/FLINK-3035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15136075#comment-15136075 ] Subhobrata Dey commented on FLINK-3035: --- Hello [~mjsax], I had a look into the rocksdb state backend implementation & found that rocksdb is used in embedded mode. However, there are not many popular libraries which allow Redis to be used in embedded mode. One such project I found is: https://github.com/kstyrc/embedded-redis So, would you suggest to use the above project for using redis in embedded mode or can we start redis server separately (externally)? Looking forward to your suggestions. > Redis as State Backend > -- > > Key: FLINK-3035 > URL: https://issues.apache.org/jira/browse/FLINK-3035 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Matthias J. Sax >Assignee: Subhobrata Dey >Priority: Minor > > Add Redis as a state backend for distributed snapshots. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3343) Exception while using Kafka 0.9 connector
[ https://issues.apache.org/jira/browse/FLINK-3343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15135743#comment-15135743 ] Robert Metzger commented on FLINK-3343: --- Where did you see that the default value is 200 ? The official [kafka documentation|http://kafka.apache.org/documentation.html#producerconfigs] says the default value is {{16384}}. Flink is not overwriting the configuration value from Kafka. > Exception while using Kafka 0.9 connector > -- > > Key: FLINK-3343 > URL: https://issues.apache.org/jira/browse/FLINK-3343 > Project: Flink > Issue Type: Improvement > Components: flink-contrib, Kafka Connector >Affects Versions: 1.00 >Reporter: Farouk Salem > > While running a job, without fault tolerance, producing data to Kafka, the > job failed due to "Batch Expired exception". I tried to increase the > "request.timeout.ms" and "max.block.ms" to 6 instead of 3 but still > the same problem. The only way to ride on this problem is using snapshotting. > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48106 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48105 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48104 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,068 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask >- Caught exception while processing timer. > java.lang.RuntimeException: Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:319) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.evaluateWindow(AggregatingKeyedTimePanes.java:59) > at > org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.computeWindow(AbstractAlignedProcessingTimeWindowOperator.java:242) > at > org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.trigger(AbstractAlignedProcessingTimeWindowOperator.java:223) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:606) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:319) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:316) > ... 15 more > Caused by: java.lang.Exception: Failed to send data to Kafka: Batch Expired > at >
[jira] [Created] (FLINK-3350) Increase timeouts on Travis Builds
Stephan Ewen created FLINK-3350: --- Summary: Increase timeouts on Travis Builds Key: FLINK-3350 URL: https://issues.apache.org/jira/browse/FLINK-3350 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.0.0 Reporter: Stephan Ewen Priority: Critical Fix For: 1.0.0 We see a lot of failures on Travis because of Timeouts. I think the problem is simply that the default ask timeouts of "10 seconds" is too short to reliably execute tests in parallel on the small containers on Travis, especially when many services (like zookeeper mini clusters, etc) are involved. The tests work most of the time, but with the large amount of tests we have, builds fail if 1 out of 1000 tests experiences a timeout. I suggest that we change the {{ForkableMiniCluster}} such that it multiplies the default timeout by a factor from an environment variable, which we set in the travis build scripts. Something like {{export TEST_TIMEOUT_FACTOR=3}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2213) Configure number of vcores
[ https://issues.apache.org/jira/browse/FLINK-2213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15135951#comment-15135951 ] ASF GitHub Bot commented on FLINK-2213: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/1588#discussion_r52106077 --- Diff: docs/setup/config.md --- @@ -211,6 +211,8 @@ The parameters define the behavior of tasks that create result files. yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" +- `yarn.containers.vcores` The number of virtual cores (vcores) per YARN container. By default the number of `vcores` is set equal to the maximum between the number of slots per TaskManager, and the number of cores available to the Java runtime. --- End diff -- This was to have a fallback strategy in case the slots parameter is not set. But @StephanEwen 's comment probably solves it. The fallback will be set to the previous strategy where vcores=1. > Configure number of vcores > -- > > Key: FLINK-2213 > URL: https://issues.apache.org/jira/browse/FLINK-2213 > Project: Flink > Issue Type: Improvement > Components: YARN Client >Affects Versions: 0.10.0 >Reporter: Ufuk Celebi >Assignee: Kostas > Fix For: 1.0.0 > > > Currently, the number of vcores per YARN container is set to 1. > It is desirable to allow configuring this value. As a simple heuristic it > makes sense to at least set it to the number of slots per container. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3339] Make ValueState.update(null) act ...
Github user mbalassi commented on the pull request: https://github.com/apache/flink/pull/1594#issuecomment-180842858 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3339) Checkpointing NPE when using filterWithState
[ https://issues.apache.org/jira/browse/FLINK-3339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15135960#comment-15135960 ] ASF GitHub Bot commented on FLINK-3339: --- Github user mbalassi commented on the pull request: https://github.com/apache/flink/pull/1594#issuecomment-180842858 LGTM > Checkpointing NPE when using filterWithState > > > Key: FLINK-3339 > URL: https://issues.apache.org/jira/browse/FLINK-3339 > Project: Flink > Issue Type: Bug >Reporter: Shikhar Bhushan >Assignee: Aljoscha Krettek > > (1.0-SNAPSHOT) > I am using the Scala API keyedStream.filterWithState(..), where the state is > an Option[Long] > I am seeing the following error which goes away if I remove the filter. > {noformat} > 02/04/2016 14:10:19 Job execution switched to status FAILING. > java.lang.RuntimeException: Error triggering a checkpoint as the result of > receiving checkpoint barrier > at > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:651) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:644) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:201) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:127) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:173) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException > at > org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:63) > at > org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:27) > at > org.apache.flink.runtime.state.memory.AbstractMemState.snapshot(AbstractMemState.java:74) > at > org.apache.flink.runtime.state.AbstractStateBackend.snapshotPartitionedState(AbstractStateBackend.java:245) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotOperatorState(AbstractStreamOperator.java:174) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:119) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:470) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:648) > ... 8 more > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2213) Configure number of vcores
[ https://issues.apache.org/jira/browse/FLINK-2213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15135815#comment-15135815 ] ASF GitHub Bot commented on FLINK-2213: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1588#issuecomment-180789530 The fallback behavior here is now different than the original behavior. I think it would be good to make the fallback the same as before, meaning to use the number of slots as the number of vcores if possible, otherwise, use one vcore (unless configured). > Configure number of vcores > -- > > Key: FLINK-2213 > URL: https://issues.apache.org/jira/browse/FLINK-2213 > Project: Flink > Issue Type: Improvement > Components: YARN Client >Affects Versions: 0.10.0 >Reporter: Ufuk Celebi >Assignee: Kostas > Fix For: 1.0.0 > > > Currently, the number of vcores per YARN container is set to 1. > It is desirable to allow configuring this value. As a simple heuristic it > makes sense to at least set it to the number of slots per container. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-2213 Makes the number of vcores per YARN...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1588#issuecomment-180789530 The fallback behavior here is now different than the original behavior. I think it would be good to make the fallback the same as before, meaning to use the number of slots as the number of vcores if possible, otherwise, use one vcore (unless configured). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3341) Kafka connector's 'auto.offset.reset' inconsistent with Kafka
[ https://issues.apache.org/jira/browse/FLINK-3341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15135818#comment-15135818 ] ASF GitHub Bot commented on FLINK-3341: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1597#issuecomment-180790019 Good fix, pretty important to get this into 1.0 > Kafka connector's 'auto.offset.reset' inconsistent with Kafka > - > > Key: FLINK-3341 > URL: https://issues.apache.org/jira/browse/FLINK-3341 > Project: Flink > Issue Type: Bug >Reporter: Shikhar Bhushan >Assignee: Robert Metzger >Priority: Minor > > Kafka docs talk of valid "auto.offset.reset" values being "smallest" or > "largest" > https://kafka.apache.org/08/configuration.html > The {{LegacyFetcher}} looks for "latest" and otherwise defaults to "smallest" > cc [~rmetzger] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3341] Make 'auto.offset.reset' compatib...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1597#issuecomment-180790019 Good fix, pretty important to get this into 1.0 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: FLINK-2213 Makes the number of vcores per YARN...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/1588#discussion_r52106077 --- Diff: docs/setup/config.md --- @@ -211,6 +211,8 @@ The parameters define the behavior of tasks that create result files. yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" +- `yarn.containers.vcores` The number of virtual cores (vcores) per YARN container. By default the number of `vcores` is set equal to the maximum between the number of slots per TaskManager, and the number of cores available to the Java runtime. --- End diff -- This was to have a fallback strategy in case the slots parameter is not set. But @StephanEwen 's comment probably solves it. The fallback will be set to the previous strategy where vcores=1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3128) Add Isotonic Regression To ML Library
[ https://issues.apache.org/jira/browse/FLINK-3128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15135852#comment-15135852 ] ASF GitHub Bot commented on FLINK-3128: --- Github user f-sander commented on the pull request: https://github.com/apache/flink/pull/1565#issuecomment-180798486 Sorry for the long delay. I still don't really have time for this, but I wan't to describe it anyways. That's why the writing and formatting is pretty sloppy in this. Sorry for that, I hope you bare with me: We only consider isotonic regression on weighted, two dimensional data. Thus, datapoints are tuples of three doubles: `(y, x, w)`. PAV assumes the data to be sorted by `x`. It starts on the left and goes to the right. Whenever two Point's (or more) are found that are descending in order of `x`, it "pools" them, which means all `y` values (multiplied by their weight) in that pool are averaged by the sum of all weights. Any point in the pool then looks like this: `(y_weighted_pool_avg, x, w)`. Because the `y` values where changed, we have to look back in `x` order if the new pool avg is lower than the value before the pool. If that's the case, we have to pool again until now higher `y` value is present before the pool. Any sequence of data points from `i` to `j` sharing the same `y` value is compressed in the following way: `(y, x_i, sum_of_weights), (y, x_j, 0)`. The hope of Sparks implementation is that enough data gets compressed that way, that all remaining data fits into one node in the last step. However, there are of course cases, where this simply doesn't work. Our approach (not implemented in this PR) works like this: ``` compare two consecutive data points i and j: if y_i < y_j, leave them untouched if y_i > y_j, replace both with ((y_i * w_i + y_j * w_j) / (w_i + w_j), x_i, w_i + w_j). Also remember x_j if y_i = y_j, replace both with (y_i, x_i, w_i + w_j). Also remember x_j Repeat that until no pairs are combined to one ``` After the iteration terminated: Foreach point that has a "remembered" `x_j`, add another `(y, x_j, 0)` directly behind it. We are able to compare each point with its successor, by attaching each point with an index (zipWithIndex) and a "next-pointer" (index+1) and then doing a: `set.join(set).where(next).equalTo(index)` However, because of the weight summation, we must avoid that a point appears in multiple join pairs. Otherwise a point's weight might be summed into multiple combined points. We worked around that by doing two joins in each iteration step: ``` step 1: left join side has only points with even indices, right side only with odd step 2: left join side has only points with odd indices, right side only with even if nothing happened during these two runs, we are done ``` Unfortunately, because of the merging the indices are not incrementing by 1 anymore. That's why we wanted to apply another zipWithIndex after the two joins, but the join repartitioned the data, so we loose our range-partitioning. But, this is required to get indices representing the total order of the data. I hope you can understand the problem. Again sorry for sloppy writing. > Add Isotonic Regression To ML Library > - > > Key: FLINK-3128 > URL: https://issues.apache.org/jira/browse/FLINK-3128 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Fridtjof Sander >Assignee: Fridtjof Sander >Priority: Minor > > Isotonic Regression fits a monotonically increasing function (also called > isotonic function) to a plane of datapoints. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3128] [flink-ml] Add Isotonic Regressio...
Github user f-sander commented on the pull request: https://github.com/apache/flink/pull/1565#issuecomment-180798486 Sorry for the long delay. I still don't really have time for this, but I wan't to describe it anyways. That's why the writing and formatting is pretty sloppy in this. Sorry for that, I hope you bare with me: We only consider isotonic regression on weighted, two dimensional data. Thus, datapoints are tuples of three doubles: `(y, x, w)`. PAV assumes the data to be sorted by `x`. It starts on the left and goes to the right. Whenever two Point's (or more) are found that are descending in order of `x`, it "pools" them, which means all `y` values (multiplied by their weight) in that pool are averaged by the sum of all weights. Any point in the pool then looks like this: `(y_weighted_pool_avg, x, w)`. Because the `y` values where changed, we have to look back in `x` order if the new pool avg is lower than the value before the pool. If that's the case, we have to pool again until now higher `y` value is present before the pool. Any sequence of data points from `i` to `j` sharing the same `y` value is compressed in the following way: `(y, x_i, sum_of_weights), (y, x_j, 0)`. The hope of Sparks implementation is that enough data gets compressed that way, that all remaining data fits into one node in the last step. However, there are of course cases, where this simply doesn't work. Our approach (not implemented in this PR) works like this: ``` compare two consecutive data points i and j: if y_i < y_j, leave them untouched if y_i > y_j, replace both with ((y_i * w_i + y_j * w_j) / (w_i + w_j), x_i, w_i + w_j). Also remember x_j if y_i = y_j, replace both with (y_i, x_i, w_i + w_j). Also remember x_j Repeat that until no pairs are combined to one ``` After the iteration terminated: Foreach point that has a "remembered" `x_j`, add another `(y, x_j, 0)` directly behind it. We are able to compare each point with its successor, by attaching each point with an index (zipWithIndex) and a "next-pointer" (index+1) and then doing a: `set.join(set).where(next).equalTo(index)` However, because of the weight summation, we must avoid that a point appears in multiple join pairs. Otherwise a point's weight might be summed into multiple combined points. We worked around that by doing two joins in each iteration step: ``` step 1: left join side has only points with even indices, right side only with odd step 2: left join side has only points with odd indices, right side only with even if nothing happened during these two runs, we are done ``` Unfortunately, because of the merging the indices are not incrementing by 1 anymore. That's why we wanted to apply another zipWithIndex after the two joins, but the join repartitioned the data, so we loose our range-partitioning. But, this is required to get indices representing the total order of the data. I hope you can understand the problem. Again sorry for sloppy writing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2131) Add Initialization schemes for K-means clustering
[ https://issues.apache.org/jira/browse/FLINK-2131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15136131#comment-15136131 ] ASF GitHub Bot commented on FLINK-2131: --- Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/757#issuecomment-180943918 Thanks for the review. I will push a fix soon. > Add Initialization schemes for K-means clustering > - > > Key: FLINK-2131 > URL: https://issues.apache.org/jira/browse/FLINK-2131 > Project: Flink > Issue Type: Task > Components: Machine Learning Library >Reporter: Sachin Goel >Assignee: Sachin Goel > > The Lloyd's [KMeans] algorithm takes initial centroids as its input. However, > in case the user doesn't provide the initial centers, they may ask for a > particular initialization scheme to be followed. The most commonly used are > these: > 1. Random initialization: Self-explanatory > 2. kmeans++ initialization: http://ilpubs.stanford.edu:8090/778/1/2006-13.pdf > 3. kmeans|| : http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf > For very large data sets, or for large values of k, the kmeans|| method is > preferred as it provides the same approximation guarantees as kmeans++ and > requires lesser number of passes over the input data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3349) PlanVisualizer doesn't work
[ https://issues.apache.org/jira/browse/FLINK-3349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15135744#comment-15135744 ] Riccardo Diomedi commented on FLINK-3349: - If you download Flink source from this site: http://www.apache.org/dyn/closer.lua/flink/flink-0.10.1/flink-0.10.1-src.tgz, there is the planVisualizer.html but there are not the css and js related files. If you download Flink release from this site: https://www.apache.org/dyn/closer.lua/flink/flink-0.10.1/flink-0.10.1-bin-hadoop1-scala_2.10.tgz, everything works! I needed a worked planVisualizer and I found it! So my issue is solved! > PlanVisualizer doesn't work > --- > > Key: FLINK-3349 > URL: https://issues.apache.org/jira/browse/FLINK-3349 > Project: Flink > Issue Type: Bug >Affects Versions: 1.0.0 >Reporter: Riccardo Diomedi >Priority: Minor > > the planVisualizer.html doesn't work! > I try to paste the json but nothing happen! -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2721) Add Tuple meta information
[ https://issues.apache.org/jira/browse/FLINK-2721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15135762#comment-15135762 ] ASF GitHub Bot commented on FLINK-2721: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1591#discussion_r52102060 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java --- @@ -229,9 +229,24 @@ private void translateTopology() { boolean makeProgress = true; while (bolts.size() > 0) { if (!makeProgress) { - throw new RuntimeException( - "Unable to build Topology. Could not connect the following bolts: " - + bolts.keySet()); + StringBuilder strBld = new StringBuilder(); + strBld.append("Unable to build Topology. Could not connect the following bolts:"); + for (String boltId : bolts.keySet()) { + strBld.append("\n "); + strBld.append(boltId); + strBld.append(": missing input streams ["); + for (Entrystreams : unprocessdInputsPerBolt + .get(boltId)) { + strBld.append("'"); + strBld.append(streams.getKey().get_streamId()); + strBld.append("' from '"); + strBld.append(streams.getKey().get_componentId()); + strBld.append("'; "); + } + strBld.append("]"); --- End diff -- Nice. That's helpful for debugging. > Add Tuple meta information > -- > > Key: FLINK-2721 > URL: https://issues.apache.org/jira/browse/FLINK-2721 > Project: Flink > Issue Type: New Feature > Components: Storm Compatibility >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In {{Bolt.execute(Tuple input)}} the given input tuple contains meta > information about its origin (like source component name, stream id, source > task ID). > This meta information in currently not provided by Flink and the > corresponding methods throw an {{UnsupportedOperationException}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request:
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/commit/21a715867d655bb61df9a9f7eef37e42b99e206a#commitcomment-15932805 In the current master, the `DataStream` class imports the correct `o.a.f.api.common.operators.Keys`. https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L43 Is it possible that your Maven cache has mixed versions? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2213) Configure number of vcores
[ https://issues.apache.org/jira/browse/FLINK-2213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15135750#comment-15135750 ] ASF GitHub Bot commented on FLINK-2213: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1588#discussion_r52101633 --- Diff: docs/setup/config.md --- @@ -211,6 +211,8 @@ The parameters define the behavior of tasks that create result files. yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" +- `yarn.containers.vcores` The number of virtual cores (vcores) per YARN container. By default the number of `vcores` is set equal to the maximum between the number of slots per TaskManager, and the number of cores available to the Java runtime. --- End diff -- what's the rationale for using the max(slots, #cpus) ? I think in most cases users use fewer slots than CPU cores available on the physical machine. > Configure number of vcores > -- > > Key: FLINK-2213 > URL: https://issues.apache.org/jira/browse/FLINK-2213 > Project: Flink > Issue Type: Improvement > Components: YARN Client >Affects Versions: 0.10.0 >Reporter: Ufuk Celebi >Assignee: Kostas > Fix For: 1.0.0 > > > Currently, the number of vcores per YARN container is set to 1. > It is desirable to allow configuring this value. As a simple heuristic it > makes sense to at least set it to the number of slots per container. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-2213 Makes the number of vcores per YARN...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1588#discussion_r52101633 --- Diff: docs/setup/config.md --- @@ -211,6 +211,8 @@ The parameters define the behavior of tasks that create result files. yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" +- `yarn.containers.vcores` The number of virtual cores (vcores) per YARN container. By default the number of `vcores` is set equal to the maximum between the number of slots per TaskManager, and the number of cores available to the Java runtime. --- End diff -- what's the rationale for using the max(slots, #cpus) ? I think in most cases users use fewer slots than CPU cores available on the physical machine. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2721] [Storm Compatibility] Add Tuple m...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1591#discussion_r52102026 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java --- @@ -41,55 +39,61 @@ */ public class StormTuple implements backtype.storm.tuple.Tuple { - /** The Storm representation of the original Flink tuple */ + /** The Storm representation of the original Flink tuple. */ private final Values stormTuple; - /** The schema (ie, ordered field names) of the tuple */ + /** The schema (ie, ordered field names) of this tuple. */ private final Fields schema; - - /** The task id where this tuple is processed */ - private final int taskId; - /** The producer of this tuple */ + /** The task ID where this tuple was produced. */ + private final int producerTaskId; + /** The input stream from which this tuple was received. */ private final String producerStreamId; - /** The producer's component id of this tuple */ + /** The producer's component ID of this tupl.e */ --- End diff -- typo --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2213) Configure number of vcores
[ https://issues.apache.org/jira/browse/FLINK-2213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15135752#comment-15135752 ] ASF GitHub Bot commented on FLINK-2213: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1588#issuecomment-180753220 I hope its a coincidence that the YARN tests failed in this PR. If they fail after your next push again, we have to check if your changes caused the failure > Configure number of vcores > -- > > Key: FLINK-2213 > URL: https://issues.apache.org/jira/browse/FLINK-2213 > Project: Flink > Issue Type: Improvement > Components: YARN Client >Affects Versions: 0.10.0 >Reporter: Ufuk Celebi >Assignee: Kostas > Fix For: 1.0.0 > > > Currently, the number of vcores per YARN container is set to 1. > It is desirable to allow configuring this value. As a simple heuristic it > makes sense to at least set it to the number of slots per container. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-2213 Makes the number of vcores per YARN...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1588#issuecomment-180753220 I hope its a coincidence that the YARN tests failed in this PR. If they fail after your next push again, we have to check if your changes caused the failure --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3234] [dataSet] Add KeySelector support...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1585#discussion_r52099909 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java --- @@ -79,16 +112,33 @@ public SortPartitionOperator(DataSet dataSet, String sortField, Order sortOrd * local partition sorting of the DataSet. * * @param field The field expression referring to the field of the additional sort order of -* the local partition sorting. -* @param order The order of the additional sort order of the local partition sorting. +* the local partition sorting. +* @param order The order of the additional sort order of the local partition sorting. * @return The DataSet with sorted local partitions. */ public SortPartitionOperator sortPartition(String field, Order order) { + if (useKeySelector) { + throw new InvalidProgramException("Expression keys cannot be appended after selector function keys"); + } + int[] flatOrderKeys = getFlatFields(field); this.appendSorting(flatOrderKeys, order); return this; } + /** +* Appends an additional sort order with the specified field in the specified order to the +* local partition sorting of the DataSet. +* +* @param keyExtractor The KeySelector function which extracts the key value of the additional +* sort order of the local partition sorting. +* @param orderThe order of the additional sort order of the local partition sorting. +* @return The DataSet with sorted local partitions. +*/ + public SortPartitionOperator sortPartition(KeySelectorkeyExtractor, Order order) { --- End diff -- Oh yes, you are right! Can you update the JavaDocs of this method and explain that chaining is not possible and how to work around it? Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3349) PlanVisualizer doesn't work
[ https://issues.apache.org/jira/browse/FLINK-3349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15135738#comment-15135738 ] Robert Metzger commented on FLINK-3349: --- So the issue has been resolved? > PlanVisualizer doesn't work > --- > > Key: FLINK-3349 > URL: https://issues.apache.org/jira/browse/FLINK-3349 > Project: Flink > Issue Type: Bug >Affects Versions: 1.0.0 >Reporter: Riccardo Diomedi >Priority: Minor > > the planVisualizer.html doesn't work! > I try to paste the json but nothing happen! -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-3341) Kafka connector's 'auto.offset.reset' inconsistent with Kafka
[ https://issues.apache.org/jira/browse/FLINK-3341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger reassigned FLINK-3341: - Assignee: Robert Metzger > Kafka connector's 'auto.offset.reset' inconsistent with Kafka > - > > Key: FLINK-3341 > URL: https://issues.apache.org/jira/browse/FLINK-3341 > Project: Flink > Issue Type: Bug >Reporter: Shikhar Bhushan >Assignee: Robert Metzger >Priority: Minor > > Kafka docs talk of valid "auto.offset.reset" values being "smallest" or > "largest" > https://kafka.apache.org/08/configuration.html > The {{LegacyFetcher}} looks for "latest" and otherwise defaults to "smallest" > cc [~rmetzger] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-2213 Makes the number of vcores per YARN...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1588#issuecomment-180753150 I didn't test this myself, but this diff could be sufficient for testing your change: ```diff diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java index 8c9a9c7..999b5be 100644 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java +++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java @@ -180,6 +180,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { "-n", "1", "-jm", "768", "-tm", "1024", + "-s", "3", // set the slots 3 to check if the vCores are set properly! "-nm", "customName", "-Dfancy-configuration-value=veryFancy", "-Dyarn.maximum-failed-containers=3"}, @@ -268,6 +269,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { String command = Joiner.on(" ").join(entry.getValue().getLaunchContext().getCommands()); if(command.contains(YarnTaskManagerRunner.class.getSimpleName())) { taskManagerContainer = entry.getKey(); + Assert.assertEquals(3,entry.getValue().getResource().getVirtualCores()); nodeManager = nm; nmIdent = new NMTokenIdentifier(taskManagerContainer.getApplicationAttemptId(), null, "",0); // allow myself to do stuff with the container ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2213) Configure number of vcores
[ https://issues.apache.org/jira/browse/FLINK-2213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15135751#comment-15135751 ] ASF GitHub Bot commented on FLINK-2213: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1588#issuecomment-180753150 I didn't test this myself, but this diff could be sufficient for testing your change: ```diff diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java index 8c9a9c7..999b5be 100644 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java +++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java @@ -180,6 +180,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { "-n", "1", "-jm", "768", "-tm", "1024", + "-s", "3", // set the slots 3 to check if the vCores are set properly! "-nm", "customName", "-Dfancy-configuration-value=veryFancy", "-Dyarn.maximum-failed-containers=3"}, @@ -268,6 +269,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { String command = Joiner.on(" ").join(entry.getValue().getLaunchContext().getCommands()); if(command.contains(YarnTaskManagerRunner.class.getSimpleName())) { taskManagerContainer = entry.getKey(); + Assert.assertEquals(3,entry.getValue().getResource().getVirtualCores()); nodeManager = nm; nmIdent = new NMTokenIdentifier(taskManagerContainer.getApplicationAttemptId(), null, "",0); // allow myself to do stuff with the container ``` > Configure number of vcores > -- > > Key: FLINK-2213 > URL: https://issues.apache.org/jira/browse/FLINK-2213 > Project: Flink > Issue Type: Improvement > Components: YARN Client >Affects Versions: 0.10.0 >Reporter: Ufuk Celebi >Assignee: Kostas > Fix For: 1.0.0 > > > Currently, the number of vcores per YARN container is set to 1. > It is desirable to allow configuring this value. As a simple heuristic it > makes sense to at least set it to the number of slots per container. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2721) Add Tuple meta information
[ https://issues.apache.org/jira/browse/FLINK-2721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15135761#comment-15135761 ] ASF GitHub Bot commented on FLINK-2721: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1591#discussion_r52102057 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java --- @@ -75,12 +75,71 @@ public void testSpoutStormCollector() throws InstantiationException, IllegalAcce } } + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testSpoutStormCollectorWithTaskId() throws InstantiationException, IllegalAccessException { + for (int numberOfAttributes = 0; numberOfAttributes < 25; ++numberOfAttributes) { + final SourceContext flinkCollector = mock(SourceContext.class); + final int taskId = 42; + final String streamId = "streamId"; + + HashMapattributes = new HashMap (); + attributes.put(streamId, numberOfAttributes); + + SpoutCollector collector = new SpoutCollector(attributes, taskId, flinkCollector); + + final Values tuple = new Values(); + final Tuple flinkTuple = Tuple.getTupleClass(numberOfAttributes + 1).newInstance(); + + for (int i = 0; i < numberOfAttributes; ++i) { + tuple.add(new Integer(this.r.nextInt())); + flinkTuple.setField(tuple.get(i), i); + } + flinkTuple.setField(taskId, numberOfAttributes); + + final List taskIds; + final Object messageId = new Integer(this.r.nextInt()); + + taskIds = collector.emit(streamId, tuple, messageId); + + Assert.assertNull(taskIds); + + verify(flinkCollector).collect(flinkTuple); + } + } + + @SuppressWarnings("unchecked") + @Test(expected = UnsupportedOperationException.class) + public void testToManyAttributes() { + HashMap attributes = new HashMap (); + attributes.put("", 26); + + new SpoutCollector(attributes, -1, mock(SourceContext.class)); + } + + @SuppressWarnings("unchecked") + @Test(expected = UnsupportedOperationException.class) + public void testToManyAttributesWithTaskId() { --- End diff -- typo > Add Tuple meta information > -- > > Key: FLINK-2721 > URL: https://issues.apache.org/jira/browse/FLINK-2721 > Project: Flink > Issue Type: New Feature > Components: Storm Compatibility >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In {{Bolt.execute(Tuple input)}} the given input tuple contains meta > information about its origin (like source component name, stream id, source > task ID). > This meta information in currently not provided by Flink and the > corresponding methods throw an {{UnsupportedOperationException}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2721] [Storm Compatibility] Add Tuple m...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1591#discussion_r52102060 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java --- @@ -229,9 +229,24 @@ private void translateTopology() { boolean makeProgress = true; while (bolts.size() > 0) { if (!makeProgress) { - throw new RuntimeException( - "Unable to build Topology. Could not connect the following bolts: " - + bolts.keySet()); + StringBuilder strBld = new StringBuilder(); + strBld.append("Unable to build Topology. Could not connect the following bolts:"); + for (String boltId : bolts.keySet()) { + strBld.append("\n "); + strBld.append(boltId); + strBld.append(": missing input streams ["); + for (Entrystreams : unprocessdInputsPerBolt + .get(boltId)) { + strBld.append("'"); + strBld.append(streams.getKey().get_streamId()); + strBld.append("' from '"); + strBld.append(streams.getKey().get_componentId()); + strBld.append("'; "); + } + strBld.append("]"); --- End diff -- Nice. That's helpful for debugging. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2721] [Storm Compatibility] Add Tuple m...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1591#discussion_r52102056 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java --- @@ -75,12 +75,71 @@ public void testSpoutStormCollector() throws InstantiationException, IllegalAcce } } + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testSpoutStormCollectorWithTaskId() throws InstantiationException, IllegalAccessException { + for (int numberOfAttributes = 0; numberOfAttributes < 25; ++numberOfAttributes) { + final SourceContext flinkCollector = mock(SourceContext.class); + final int taskId = 42; + final String streamId = "streamId"; + + HashMapattributes = new HashMap (); + attributes.put(streamId, numberOfAttributes); + + SpoutCollector collector = new SpoutCollector(attributes, taskId, flinkCollector); + + final Values tuple = new Values(); + final Tuple flinkTuple = Tuple.getTupleClass(numberOfAttributes + 1).newInstance(); + + for (int i = 0; i < numberOfAttributes; ++i) { + tuple.add(new Integer(this.r.nextInt())); + flinkTuple.setField(tuple.get(i), i); + } + flinkTuple.setField(taskId, numberOfAttributes); + + final List taskIds; + final Object messageId = new Integer(this.r.nextInt()); + + taskIds = collector.emit(streamId, tuple, messageId); + + Assert.assertNull(taskIds); + + verify(flinkCollector).collect(flinkTuple); + } + } + + @SuppressWarnings("unchecked") + @Test(expected = UnsupportedOperationException.class) + public void testToManyAttributes() { --- End diff -- typo --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2721) Add Tuple meta information
[ https://issues.apache.org/jira/browse/FLINK-2721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15135760#comment-15135760 ] ASF GitHub Bot commented on FLINK-2721: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1591#discussion_r52102056 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java --- @@ -75,12 +75,71 @@ public void testSpoutStormCollector() throws InstantiationException, IllegalAcce } } + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testSpoutStormCollectorWithTaskId() throws InstantiationException, IllegalAccessException { + for (int numberOfAttributes = 0; numberOfAttributes < 25; ++numberOfAttributes) { + final SourceContext flinkCollector = mock(SourceContext.class); + final int taskId = 42; + final String streamId = "streamId"; + + HashMapattributes = new HashMap (); + attributes.put(streamId, numberOfAttributes); + + SpoutCollector collector = new SpoutCollector(attributes, taskId, flinkCollector); + + final Values tuple = new Values(); + final Tuple flinkTuple = Tuple.getTupleClass(numberOfAttributes + 1).newInstance(); + + for (int i = 0; i < numberOfAttributes; ++i) { + tuple.add(new Integer(this.r.nextInt())); + flinkTuple.setField(tuple.get(i), i); + } + flinkTuple.setField(taskId, numberOfAttributes); + + final List taskIds; + final Object messageId = new Integer(this.r.nextInt()); + + taskIds = collector.emit(streamId, tuple, messageId); + + Assert.assertNull(taskIds); + + verify(flinkCollector).collect(flinkTuple); + } + } + + @SuppressWarnings("unchecked") + @Test(expected = UnsupportedOperationException.class) + public void testToManyAttributes() { --- End diff -- typo > Add Tuple meta information > -- > > Key: FLINK-2721 > URL: https://issues.apache.org/jira/browse/FLINK-2721 > Project: Flink > Issue Type: New Feature > Components: Storm Compatibility >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In {{Bolt.execute(Tuple input)}} the given input tuple contains meta > information about its origin (like source component name, stream id, source > task ID). > This meta information in currently not provided by Flink and the > corresponding methods throw an {{UnsupportedOperationException}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2721] [Storm Compatibility] Add Tuple m...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1591#discussion_r52102057 --- Diff: flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java --- @@ -75,12 +75,71 @@ public void testSpoutStormCollector() throws InstantiationException, IllegalAcce } } + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testSpoutStormCollectorWithTaskId() throws InstantiationException, IllegalAccessException { + for (int numberOfAttributes = 0; numberOfAttributes < 25; ++numberOfAttributes) { + final SourceContext flinkCollector = mock(SourceContext.class); + final int taskId = 42; + final String streamId = "streamId"; + + HashMapattributes = new HashMap (); + attributes.put(streamId, numberOfAttributes); + + SpoutCollector collector = new SpoutCollector(attributes, taskId, flinkCollector); + + final Values tuple = new Values(); + final Tuple flinkTuple = Tuple.getTupleClass(numberOfAttributes + 1).newInstance(); + + for (int i = 0; i < numberOfAttributes; ++i) { + tuple.add(new Integer(this.r.nextInt())); + flinkTuple.setField(tuple.get(i), i); + } + flinkTuple.setField(taskId, numberOfAttributes); + + final List taskIds; + final Object messageId = new Integer(this.r.nextInt()); + + taskIds = collector.emit(streamId, tuple, messageId); + + Assert.assertNull(taskIds); + + verify(flinkCollector).collect(flinkTuple); + } + } + + @SuppressWarnings("unchecked") + @Test(expected = UnsupportedOperationException.class) + public void testToManyAttributes() { + HashMap attributes = new HashMap (); + attributes.put("", 26); + + new SpoutCollector(attributes, -1, mock(SourceContext.class)); + } + + @SuppressWarnings("unchecked") + @Test(expected = UnsupportedOperationException.class) + public void testToManyAttributesWithTaskId() { --- End diff -- typo --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2721) Add Tuple meta information
[ https://issues.apache.org/jira/browse/FLINK-2721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15135758#comment-15135758 ] ASF GitHub Bot commented on FLINK-2721: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1591#discussion_r52102026 --- Diff: flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java --- @@ -41,55 +39,61 @@ */ public class StormTuple implements backtype.storm.tuple.Tuple { - /** The Storm representation of the original Flink tuple */ + /** The Storm representation of the original Flink tuple. */ private final Values stormTuple; - /** The schema (ie, ordered field names) of the tuple */ + /** The schema (ie, ordered field names) of this tuple. */ private final Fields schema; - - /** The task id where this tuple is processed */ - private final int taskId; - /** The producer of this tuple */ + /** The task ID where this tuple was produced. */ + private final int producerTaskId; + /** The input stream from which this tuple was received. */ private final String producerStreamId; - /** The producer's component id of this tuple */ + /** The producer's component ID of this tupl.e */ --- End diff -- typo > Add Tuple meta information > -- > > Key: FLINK-2721 > URL: https://issues.apache.org/jira/browse/FLINK-2721 > Project: Flink > Issue Type: New Feature > Components: Storm Compatibility >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In {{Bolt.execute(Tuple input)}} the given input tuple contains meta > information about its origin (like source component name, stream id, source > task ID). > This meta information in currently not provided by Flink and the > corresponding methods throw an {{UnsupportedOperationException}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3349) PlanVisualizer doesn't work
[ https://issues.apache.org/jira/browse/FLINK-3349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15135714#comment-15135714 ] Riccardo Diomedi commented on FLINK-3349: - I now realize that i don't have neither .css files nor .js files in Flink source folder...so i downloaded the release version and all those files are there! > PlanVisualizer doesn't work > --- > > Key: FLINK-3349 > URL: https://issues.apache.org/jira/browse/FLINK-3349 > Project: Flink > Issue Type: Bug >Affects Versions: 1.0.0 >Reporter: Riccardo Diomedi >Priority: Minor > > the planVisualizer.html doesn't work! > I try to paste the json but nothing happen! -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request:
Github user alexeyegorov commented on the pull request: https://github.com/apache/flink/commit/21a715867d655bb61df9a9f7eef37e42b99e206a#commitcomment-15932065 @uce yes, I've already added _2.10 to my dependencies that are scala dependent... but I still get the build problem. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3297) Streaming connector for ZeroMQ
[ https://issues.apache.org/jira/browse/FLINK-3297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15135732#comment-15135732 ] Márton Balassi commented on FLINK-3297: --- Hey [~mohitsethi], we used to have a ZeroMQ connector back when the project was called Stratosphere, but had to remove it when moving to the ASF due to licensing issues. [1] Since then it seems that the ZeroMQ licensing has become more Apache friendly, but a double check is needed as it is still LGPL-based. [1] http://www.apache.org/foundation/license-faq.html [2] http://zeromq.org/area:licensing > Streaming connector for ZeroMQ > -- > > Key: FLINK-3297 > URL: https://issues.apache.org/jira/browse/FLINK-3297 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Mohit Sethi >Priority: Minor > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15135739#comment-15135739 ] ASF GitHub Bot commented on FLINK-3034: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1580#issuecomment-180746275 How do you want to implement this? I'm asking because I don't see an obvious way to expose this to the user. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3034][Streaming Connectors] Redis Sink ...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1580#issuecomment-180746275 How do you want to implement this? I'm asking because I don't see an obvious way to expose this to the user. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2721] [Storm Compatibility] Add Tuple m...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/1591#issuecomment-180765494 Looks like the meta data is now completely available :). +1 for merging this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2721) Add Tuple meta information
[ https://issues.apache.org/jira/browse/FLINK-2721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15135764#comment-15135764 ] ASF GitHub Bot commented on FLINK-2721: --- Github user mxm commented on the pull request: https://github.com/apache/flink/pull/1591#issuecomment-180765494 Looks like the meta data is now completely available :). +1 for merging this. > Add Tuple meta information > -- > > Key: FLINK-2721 > URL: https://issues.apache.org/jira/browse/FLINK-2721 > Project: Flink > Issue Type: New Feature > Components: Storm Compatibility >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In {{Bolt.execute(Tuple input)}} the given input tuple contains meta > information about its origin (like source component name, stream id, source > task ID). > This meta information in currently not provided by Flink and the > corresponding methods throw an {{UnsupportedOperationException}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3234) SortPartition does not support KeySelectorFunctions
[ https://issues.apache.org/jira/browse/FLINK-3234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15135695#comment-15135695 ] ASF GitHub Bot commented on FLINK-3234: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1585#discussion_r52099968 --- Diff: flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java --- @@ -169,6 +169,72 @@ public void testSortPartitionWithExpressionKeys4() { tupleDs.sortPartition("f3", Order.ASCENDING); } + @Test + public void testSortPartitionWithKeySelector1() { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> tupleDs = env.fromCollection(tupleWithCustomData, tupleWithCustomInfo); + + // should work + try { + tupleDs.sortPartition(new KeySelector , Integer>() { + @Override + public Integer getKey(Tuple4 value) throws Exception { + return value.f0; + } + }, Order.ASCENDING); + } catch (Exception e) { + Assert.fail(); + } + } + + @Test(expected = InvalidProgramException.class) + public void testSortPartitionWithKeySelector2() { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet > tupleDs = env.fromCollection(tupleWithCustomData, tupleWithCustomInfo); + + // must not work + tupleDs.sortPartition(new KeySelector , Long[]>() { + @Override + public Long[] getKey(Tuple4 value) throws Exception { + return value.f3; + } + }, Order.ASCENDING); + } + + @Test(expected = InvalidProgramException.class) --- End diff -- Can you add a test that first uses KeySelectors and then ExpressionKeys? > SortPartition does not support KeySelectorFunctions > --- > > Key: FLINK-3234 > URL: https://issues.apache.org/jira/browse/FLINK-3234 > Project: Flink > Issue Type: Improvement > Components: DataSet API >Affects Versions: 1.0.0, 0.10.1 >Reporter: Fabian Hueske >Assignee: Chiwan Park > Fix For: 1.0.0 > > > The following is not supported by the DataSet API: > {code} > DataSet data = ... > DataSet data.sortPartition( > new KeySelector () { > public Long getKey(MyObject v) { > ... > } > }, > Order.ASCENDING); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-3349) PlanVisualizer doesn't work
[ https://issues.apache.org/jira/browse/FLINK-3349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-3349. --- Resolution: Fixed Great. I'm closing the JIRA then. > PlanVisualizer doesn't work > --- > > Key: FLINK-3349 > URL: https://issues.apache.org/jira/browse/FLINK-3349 > Project: Flink > Issue Type: Bug >Affects Versions: 1.0.0 >Reporter: Riccardo Diomedi >Priority: Minor > > the planVisualizer.html doesn't work! > I try to paste the json but nothing happen! -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3341] Make 'auto.offset.reset' compatib...
GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/1597 [FLINK-3341] Make 'auto.offset.reset' compatible with Kafka 0.8 and 0.9 You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink flink3341 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1597.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1597 commit 5c7d6a4a0df015073c4e9e21d2d3f597631ffba0 Author: Robert MetzgerDate: 2016-02-06T12:27:06Z [FLINK-3341] Make 'auto.offset.reset' compatible with Kafka 0.8 and 0.9 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3341) Kafka connector's 'auto.offset.reset' inconsistent with Kafka
[ https://issues.apache.org/jira/browse/FLINK-3341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15135749#comment-15135749 ] ASF GitHub Bot commented on FLINK-3341: --- GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/1597 [FLINK-3341] Make 'auto.offset.reset' compatible with Kafka 0.8 and 0.9 You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink flink3341 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1597.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1597 commit 5c7d6a4a0df015073c4e9e21d2d3f597631ffba0 Author: Robert MetzgerDate: 2016-02-06T12:27:06Z [FLINK-3341] Make 'auto.offset.reset' compatible with Kafka 0.8 and 0.9 > Kafka connector's 'auto.offset.reset' inconsistent with Kafka > - > > Key: FLINK-3341 > URL: https://issues.apache.org/jira/browse/FLINK-3341 > Project: Flink > Issue Type: Bug >Reporter: Shikhar Bhushan >Assignee: Robert Metzger >Priority: Minor > > Kafka docs talk of valid "auto.offset.reset" values being "smallest" or > "largest" > https://kafka.apache.org/08/configuration.html > The {{LegacyFetcher}} looks for "latest" and otherwise defaults to "smallest" > cc [~rmetzger] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2721) Add Tuple meta information
[ https://issues.apache.org/jira/browse/FLINK-2721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15135763#comment-15135763 ] ASF GitHub Bot commented on FLINK-2721: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1591#discussion_r52102118 --- Diff: flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/VerifyMetaDataBolt.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.tests.operators; + +import java.util.Map; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; + +public class VerifyMetaDataBolt extends BaseRichBolt { + private static final long serialVersionUID = 1353222852073800478L; + + public static final String STREAM_ID = "boltMeta"; + + private OutputCollector collector; + private TopologyContext context; + + public static boolean errorOccured = false; + + @SuppressWarnings("rawtypes") + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + this.context = context; + } + + @Override + public void execute(Tuple input) { + if (!input.getSourceComponent().equals(input.getString(0)) + || !input.getSourceStreamId().equals(input.getString(1)) + || !input.getSourceGlobalStreamid().get_componentId().equals(input.getString(0)) + || !input.getSourceGlobalStreamid().get_streamId().equals(input.getString(1)) + || input.getSourceTask() != input.getInteger(2).intValue()) { + errorOccured = true; --- End diff -- Should the message id also be verified here sine it belongs to the meta data? > Add Tuple meta information > -- > > Key: FLINK-2721 > URL: https://issues.apache.org/jira/browse/FLINK-2721 > Project: Flink > Issue Type: New Feature > Components: Storm Compatibility >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > In {{Bolt.execute(Tuple input)}} the given input tuple contains meta > information about its origin (like source component name, stream id, source > task ID). > This meta information in currently not provided by Flink and the > corresponding methods throw an {{UnsupportedOperationException}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2721] [Storm Compatibility] Add Tuple m...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1591#discussion_r52102118 --- Diff: flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/VerifyMetaDataBolt.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.tests.operators; + +import java.util.Map; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; + +public class VerifyMetaDataBolt extends BaseRichBolt { + private static final long serialVersionUID = 1353222852073800478L; + + public static final String STREAM_ID = "boltMeta"; + + private OutputCollector collector; + private TopologyContext context; + + public static boolean errorOccured = false; + + @SuppressWarnings("rawtypes") + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + this.context = context; + } + + @Override + public void execute(Tuple input) { + if (!input.getSourceComponent().equals(input.getString(0)) + || !input.getSourceStreamId().equals(input.getString(1)) + || !input.getSourceGlobalStreamid().get_componentId().equals(input.getString(0)) + || !input.getSourceGlobalStreamid().get_streamId().equals(input.getString(1)) + || input.getSourceTask() != input.getInteger(2).intValue()) { + errorOccured = true; --- End diff -- Should the message id also be verified here sine it belongs to the meta data? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-3210) Unnecessary call to deserializer#deserialize() in LegacyFetcher#SimpleConsumerThread#run()
[ https://issues.apache.org/jira/browse/FLINK-3210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu resolved FLINK-3210. --- Resolution: Later > Unnecessary call to deserializer#deserialize() in > LegacyFetcher#SimpleConsumerThread#run() > -- > > Key: FLINK-3210 > URL: https://issues.apache.org/jira/browse/FLINK-3210 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > Here is related code: > {code} > byte[] valueBytes; > if (payload == null) { > deletedMessages++; > valueBytes = null; > } else { > ... > final T value = deserializer.deserialize(keyBytes, > valueBytes, fp.topic, offset); > {code} > When valueBytes is null, there is no need to call deserializer#deserialize() -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3234) SortPartition does not support KeySelectorFunctions
[ https://issues.apache.org/jira/browse/FLINK-3234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15135692#comment-15135692 ] ASF GitHub Bot commented on FLINK-3234: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1585#discussion_r52099909 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java --- @@ -79,16 +112,33 @@ public SortPartitionOperator(DataSet dataSet, String sortField, Order sortOrd * local partition sorting of the DataSet. * * @param field The field expression referring to the field of the additional sort order of -* the local partition sorting. -* @param order The order of the additional sort order of the local partition sorting. +* the local partition sorting. +* @param order The order of the additional sort order of the local partition sorting. * @return The DataSet with sorted local partitions. */ public SortPartitionOperator sortPartition(String field, Order order) { + if (useKeySelector) { + throw new InvalidProgramException("Expression keys cannot be appended after selector function keys"); + } + int[] flatOrderKeys = getFlatFields(field); this.appendSorting(flatOrderKeys, order); return this; } + /** +* Appends an additional sort order with the specified field in the specified order to the +* local partition sorting of the DataSet. +* +* @param keyExtractor The KeySelector function which extracts the key value of the additional +* sort order of the local partition sorting. +* @param orderThe order of the additional sort order of the local partition sorting. +* @return The DataSet with sorted local partitions. +*/ + public SortPartitionOperator sortPartition(KeySelectorkeyExtractor, Order order) { --- End diff -- Oh yes, you are right! Can you update the JavaDocs of this method and explain that chaining is not possible and how to work around it? Thanks > SortPartition does not support KeySelectorFunctions > --- > > Key: FLINK-3234 > URL: https://issues.apache.org/jira/browse/FLINK-3234 > Project: Flink > Issue Type: Improvement > Components: DataSet API >Affects Versions: 1.0.0, 0.10.1 >Reporter: Fabian Hueske >Assignee: Chiwan Park > Fix For: 1.0.0 > > > The following is not supported by the DataSet API: > {code} > DataSet data = ... > DataSet data.sortPartition( > new KeySelector () { > public Long getKey(MyObject v) { > ... > } > }, > Order.ASCENDING); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3343) Exception while using Kafka 0.9 connector
[ https://issues.apache.org/jira/browse/FLINK-3343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15135756#comment-15135756 ] Farouk Salem commented on FLINK-3343: - Sorry, I checked an old version of kafka documentation (Kafka 0.7). When I set the batch size with zero, "GC overhead limit exceeded" exception is thrown. I tried to set it with 100 but still the same problem "Batch expired". It works fine with Kafka 0.82 > Exception while using Kafka 0.9 connector > -- > > Key: FLINK-3343 > URL: https://issues.apache.org/jira/browse/FLINK-3343 > Project: Flink > Issue Type: Improvement > Components: flink-contrib, Kafka Connector >Affects Versions: 1.00 >Reporter: Farouk Salem > > While running a job, without fault tolerance, producing data to Kafka, the > job failed due to "Batch Expired exception". I tried to increase the > "request.timeout.ms" and "max.block.ms" to 6 instead of 3 but still > the same problem. The only way to ride on this problem is using snapshotting. > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48106 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48105 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48104 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,068 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask >- Caught exception while processing timer. > java.lang.RuntimeException: Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:319) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.evaluateWindow(AggregatingKeyedTimePanes.java:59) > at > org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.computeWindow(AbstractAlignedProcessingTimeWindowOperator.java:242) > at > org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.trigger(AbstractAlignedProcessingTimeWindowOperator.java:223) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:606) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:319) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:316) > ... 15 more > Caused by: java.lang.Exception: Failed to send data to Kafka: Batch Expired > at >
[jira] [Commented] (FLINK-3337) mvn test fails on flink-runtime because curator classes not found
[ https://issues.apache.org/jira/browse/FLINK-3337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15135792#comment-15135792 ] ASF GitHub Bot commented on FLINK-3337: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1596#issuecomment-180775810 Seems to work. The test failures are unreleated. > mvn test fails on flink-runtime because curator classes not found > - > > Key: FLINK-3337 > URL: https://issues.apache.org/jira/browse/FLINK-3337 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Affects Versions: 1.0.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Attachments: mvn.txt > > > This has been reported before. I am running {{mvn test}} on an AWS > c4.2xlarge, Flink HEAD (version 69f7f6d9...) and seeing the missing curator > classes. For example, > {code} > testMultipleLeaders(org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest) > Time elapsed: 1.042 sec <<< ERROR! > java.lang.NoClassDefFoundError: org/apache/curator/RetryPolicy > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionTest.testMultipleLeaders(ZooKeeperLeaderElectionTest.java:291) > {code} > {code} > Tests in error: > > JobManagerLeaderElectionTest.testLeaderElection:99->createJobManagerProps:166 > NoClassDefFound > > JobManagerLeaderElectionTest.testLeaderReelection:130->createJobManagerProps:166 > NoClassDefFound > ZooKeeperLeaderElectionTest.testEphemeralZooKeeperNodes:444 NoClassDefFound > or... > ZooKeeperLeaderElectionTest.testExceptionForwarding:372 NoClassDefFound > org/ap... > ZooKeeperLeaderElectionTest.testMultipleLeaders:291 NoClassDefFound > org/apache... > ZooKeeperLeaderElectionTest.testZooKeeperLeaderElectionRetrieval:94 > NoClassDefFound > ZooKeeperLeaderElectionTest.testZooKeeperReelection:137 » NoClassDefFound > org/... > ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement:207 > NoClassDefFound > > ZooKeeperLeaderRetrievalTest.testConnectingAddressRetrievalWithDelayedLeaderElection:96 > NoClassDefFound > ZooKeeperLeaderRetrievalTest.testTimeoutOfFindConnectingAddress:187 » > NoClassDefFound > ZooKeeperUtilTest.testZooKeeperEnsembleConnectStringConfiguration:40 > NoClassDefFound > {code} > The issue is resolved when removing the curator excludes from > {{flink-runtime/pom.xml}}: > {code} > > > org.apache.curator:curator-recipes > > org.apache.curator:curator-client > > org.apache.curator:curator-framework > > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3337] [runtime] mvn test fails on flink...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1596#issuecomment-180775810 Seems to work. The test failures are unreleated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---