[jira] [Updated] (FLINK-4205) Implement stratified sampling for DataSet
[ https://issues.apache.org/jira/browse/FLINK-4205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4205: -- Component/s: DataSet API > Implement stratified sampling for DataSet > - > > Key: FLINK-4205 > URL: https://issues.apache.org/jira/browse/FLINK-4205 > Project: Flink > Issue Type: New Feature > Components: DataSet API >Reporter: Do Le Quoc > > Since a Dataset might consist of data from disparate sources. As such, every > data source should be considered fairly to have a representative sample. For > this, stratified sampling is needed to ensure that data from every source > (stratum) is selected and none of the minorities are excluded. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4387) Instability in KvStateClientTest.testClientServerIntegration()
[ https://issues.apache.org/jira/browse/FLINK-4387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420708#comment-15420708 ] Robert Metzger commented on FLINK-4387: --- I saw this assertion error also in another test {code} Exception in thread "globalEventExecutor-1-1" java.lang.AssertionError at io.netty.util.concurrent.AbstractScheduledEventExecutor.pollScheduledTask(AbstractScheduledEventExecutor.java:83) at io.netty.util.concurrent.GlobalEventExecutor.fetchFromScheduledTaskQueue(GlobalEventExecutor.java:110) at io.netty.util.concurrent.GlobalEventExecutor.takeTask(GlobalEventExecutor.java:95) at io.netty.util.concurrent.GlobalEventExecutor$TaskRunner.run(GlobalEventExecutor.java:226) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) at java.lang.Thread.run(Thread.java:745) {code} https://s3.amazonaws.com/archive.travis-ci.org/jobs/152241331/log.txt But I wonder why this error occurs now. Did we update the netty version recently? > Instability in KvStateClientTest.testClientServerIntegration() > -- > > Key: FLINK-4387 > URL: https://issues.apache.org/jira/browse/FLINK-4387 > Project: Flink > Issue Type: Bug >Affects Versions: 1.1.0 >Reporter: Robert Metzger >Assignee: Ufuk Celebi > Labels: test-stability > > According to this log: > https://s3.amazonaws.com/archive.travis-ci.org/jobs/151491745/log.txt > the {{KvStateClientTest}} didn't complete. > {code} > "main" #1 prio=5 os_prio=0 tid=0x7fb2b400a000 nid=0x29dc in Object.wait() > [0x7fb2bcb3b000] >java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > - waiting on <0xf7c049a0> (a > io.netty.util.concurrent.DefaultPromise) > at java.lang.Object.wait(Object.java:502) > at > io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:254) > - locked <0xf7c049a0> (a > io.netty.util.concurrent.DefaultPromise) > at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:32) > at > org.apache.flink.runtime.query.netty.KvStateServer.shutDown(KvStateServer.java:185) > at > org.apache.flink.runtime.query.netty.KvStateClientTest.testClientServerIntegration(KvStateClientTest.java:680) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > {code} > and > {code} > Exception in thread "globalEventExecutor-1-3" java.lang.AssertionError > at > io.netty.util.concurrent.AbstractScheduledEventExecutor.pollScheduledTask(AbstractScheduledEventExecutor.java:83) > at > io.netty.util.concurrent.GlobalEventExecutor.fetchFromScheduledTaskQueue(GlobalEventExecutor.java:110) > at > io.netty.util.concurrent.GlobalEventExecutor.takeTask(GlobalEventExecutor.java:95) > at > io.netty.util.concurrent.GlobalEventExecutor$TaskRunner.run(GlobalEventExecutor.java:226) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4394) RMQSource: The QueueName is not accessible to subclasses
[ https://issues.apache.org/jira/browse/FLINK-4394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420750#comment-15420750 ] Robert Metzger commented on FLINK-4394: --- Thank you for opening a pull request. I gave you contributor permissions in our JIRA so that you can assign JIRAs to yourself in the future. I assigned this one to you. > RMQSource: The QueueName is not accessible to subclasses > > > Key: FLINK-4394 > URL: https://issues.apache.org/jira/browse/FLINK-4394 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.1 >Reporter: Dominik Bruhn > > In version 1.1.0 we made the RMQSource extensible so that subclasses can > configure how they want their queue for RabbitMQ/AMQP create. The subclasses > can override > {code} > protected void setupQueue() throws IOException { > channel.queueDeclare(queueName, true, false, false, null); > } > {code} > The problem is, that the queueName property is private. So when override the > setupQueue parameter, you don't know what actual queueName was provided. A > simple change of the queueName property to protected fixes this. > PR will follow -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4394) RMQSource: The QueueName is not accessible to subclasses
[ https://issues.apache.org/jira/browse/FLINK-4394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4394: -- Assignee: Dominik Bruhn > RMQSource: The QueueName is not accessible to subclasses > > > Key: FLINK-4394 > URL: https://issues.apache.org/jira/browse/FLINK-4394 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.1 >Reporter: Dominik Bruhn >Assignee: Dominik Bruhn > > In version 1.1.0 we made the RMQSource extensible so that subclasses can > configure how they want their queue for RabbitMQ/AMQP create. The subclasses > can override > {code} > protected void setupQueue() throws IOException { > channel.queueDeclare(queueName, true, false, false, null); > } > {code} > The problem is, that the queueName property is private. So when override the > setupQueue parameter, you don't know what actual queueName was provided. A > simple change of the queueName property to protected fixes this. > PR will follow -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4393) Failed to serialize accumulators for task
[ https://issues.apache.org/jira/browse/FLINK-4393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4393: -- Description: Dear Team, I am getting the below exception while trying to use the Table API by looping through the DataSet using collect() method. {code} 2016-08-15 07:18:52,503 WARN org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed to serialize accumulators for task. java.lang.OutOfMemoryError at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301) at org.apache.flink.util.SerializedValue.(SerializedValue.java:52) at org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58) at org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75) at org.apache.flink.runtime.taskmanager.TaskManager.unregisterTaskAndNotifyFinalState(TaskManager.scala:1248) at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:446) at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:292) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:124) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Suppressed: java.lang.OutOfMemoryError at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) at java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822) at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719) at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:303) ... 28 more 2016-08-15 07:18:52,508 ERROR org.apache.flink.runtime.executiongraph.ExecutionGraph- Failed to deserialize final accumulator results. java.lang.NullPointerException at
[jira] [Commented] (FLINK-4393) Failed to serialize accumulators for task
[ https://issues.apache.org/jira/browse/FLINK-4393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420868#comment-15420868 ] Robert Metzger commented on FLINK-4393: --- What's the DataSet size you are trying to collect()? The problem is that the amount of data you can transfer from the cluster to your client is limited by a) the amount of heap space at the client and the amount of data the RPC system can transfer. IIRC, the RPC system's limit is 10MB with the default configuration. > Failed to serialize accumulators for task > - > > Key: FLINK-4393 > URL: https://issues.apache.org/jira/browse/FLINK-4393 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.1.0 > Environment: Redhat 6 >Reporter: Sajeev Ramakrishnan > > Dear Team, > I am getting the below exception while trying to use the Table API by > looping through the DataSet using collect() method. > {code} > 2016-08-15 07:18:52,503 WARN > org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed to > serialize accumulators for task. > java.lang.OutOfMemoryError > at > java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301) > at > org.apache.flink.util.SerializedValue.(SerializedValue.java:52) > at > org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58) > at > org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75) > at > org.apache.flink.runtime.taskmanager.TaskManager.unregisterTaskAndNotifyFinalState(TaskManager.scala:1248) > at > org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:446) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:292) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:124) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Suppressed: java.lang.OutOfMemoryError > at > java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) > at > java.io.ByteArrayOutputStr
[jira] [Resolved] (FLINK-4394) RMQSource: The QueueName is not accessible to subclasses
[ https://issues.apache.org/jira/browse/FLINK-4394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4394. --- Resolution: Fixed Fix Version/s: 1.2.0 Thank you for the contribution. Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/5ccd9071 > RMQSource: The QueueName is not accessible to subclasses > > > Key: FLINK-4394 > URL: https://issues.apache.org/jira/browse/FLINK-4394 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.1 >Reporter: Dominik Bruhn >Assignee: Dominik Bruhn > Fix For: 1.2.0 > > > In version 1.1.0 we made the RMQSource extensible so that subclasses can > configure how they want their queue for RabbitMQ/AMQP create. The subclasses > can override > {code} > protected void setupQueue() throws IOException { > channel.queueDeclare(queueName, true, false, false, null); > } > {code} > The problem is, that the queueName property is private. So when override the > setupQueue parameter, you don't know what actual queueName was provided. A > simple change of the queueName property to protected fixes this. > PR will follow -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
[ https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15421044#comment-15421044 ] Robert Metzger commented on FLINK-4035: --- Thank you for trying this out. I've opened a pull request yesterday that extends the kafka 0.9 connector, but uses the 0.10.0.0 Kafka dependency (I'll update to Kafka 0.10.0.1 before merging). The missing backwards compatibility in Kafka is indeed an issue for systems like ours. > Bump Kafka producer in Kafka sink to Kafka 0.10.0.0 > --- > > Key: FLINK-4035 > URL: https://issues.apache.org/jira/browse/FLINK-4035 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Elias Levy >Assignee: Robert Metzger >Priority: Minor > > Kafka 0.10.0.0 introduced protocol changes related to the producer. > Published messages now include timestamps and compressed messages now include > relative offsets. As it is now, brokers must decompress publisher compressed > messages, assign offset to them, and recompress them, which is wasteful and > makes it less likely that compression will be used at all. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4393) Failed to serialize accumulators for task
[ https://issues.apache.org/jira/browse/FLINK-4393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422367#comment-15422367 ] Robert Metzger commented on FLINK-4393: --- 3.6 GB are too much data to transfer using collect(). I think you have to work around this limitation. You could for example write the data into a file and, and read the file from the client. > Failed to serialize accumulators for task > - > > Key: FLINK-4393 > URL: https://issues.apache.org/jira/browse/FLINK-4393 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.1.0 > Environment: Redhat 6 >Reporter: Sajeev Ramakrishnan > > Dear Team, > I am getting the below exception while trying to use the Table API by > looping through the DataSet using collect() method. > {code} > 2016-08-15 07:18:52,503 WARN > org.apache.flink.runtime.accumulators.AccumulatorRegistry - Failed to > serialize accumulators for task. > java.lang.OutOfMemoryError > at > java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) > at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) > at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301) > at > org.apache.flink.util.SerializedValue.(SerializedValue.java:52) > at > org.apache.flink.runtime.accumulators.AccumulatorSnapshot.(AccumulatorSnapshot.java:58) > at > org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75) > at > org.apache.flink.runtime.taskmanager.TaskManager.unregisterTaskAndNotifyFinalState(TaskManager.scala:1248) > at > org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:446) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:292) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:124) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Suppressed: java.lang.OutOfMemoryError > at > java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) > at > java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) > at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStre
[jira] [Commented] (FLINK-4341) Kinesis connector does not emit maximum watermark properly
[ https://issues.apache.org/jira/browse/FLINK-4341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15426231#comment-15426231 ] Robert Metzger commented on FLINK-4341: --- The reason why the Kinesis consumer is not emitting the {{Long.MAX_VALUE}} watermark is to support transparent resharding. So if the number of shards changes, inactive consumer instances might start consuming records. If we send a {{Long.MAX_VALUE}}, the watermarks would be messed up. This means that we need to fail the Kinesis consumer if a resharding has occurred. > Kinesis connector does not emit maximum watermark properly > -- > > Key: FLINK-4341 > URL: https://issues.apache.org/jira/browse/FLINK-4341 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0, 1.1.1 >Reporter: Scott Kidder >Assignee: Robert Metzger >Priority: Blocker > Fix For: 1.2.0, 1.1.2 > > > **Prevously reported as "Checkpoint state size grows unbounded when task > parallelism not uniform"** > This issue was first encountered with Flink release 1.1.0 (commit 45f7825). I > was previously using a 1.1.0 snapshot (commit 18995c8) which performed as > expected. This issue was introduced somewhere between those commits. > I've got a Flink application that uses the Kinesis Stream Consumer to read > from a Kinesis stream with 2 shards. I've got 2 task managers with 2 slots > each, providing a total of 4 slots. When running the application with a > parallelism of 4, the Kinesis consumer uses 2 slots (one per Kinesis shard) > and 4 slots for subsequent tasks that process the Kinesis stream data. I use > an in-memory store for checkpoint data. > Yesterday I upgraded to Flink 1.1.0 (45f7825) and noticed that checkpoint > states were growing unbounded when running with a parallelism of 4, > checkpoint interval of 10 seconds: > {code} > ID State Size > 1 11.3 MB > 220.9 MB > 3 30.6 MB > 4 41.4 MB > 5 52.6 MB > 6 62.5 MB > 7 71.5 MB > 8 83.3 MB > 9 93.5 MB > {code} > The first 4 checkpoints generally succeed, but then fail with an exception > like the following: > {code} > java.lang.RuntimeException: Error triggering a checkpoint as the result of > receiving checkpoint barrier at > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:768) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:758) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: Size of the state is larger than the maximum > permitted memory-backed state. Size=12105407 , maxSize=5242880 . Consider > using a different state backend, like the File System State backend. > at > org.apache.flink.runtime.state.memory.MemoryStateBackend.checkSize(MemoryStateBackend.java:146) > at > org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetBytes(MemoryStateBackend.java:200) > at > org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetHandle(MemoryStateBackend.java:190) > at > org.apache.flink.runtime.state.AbstractStateBackend$CheckpointStateOutputView.closeAndGetHandle(AbstractStateBackend.java:447) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:879) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:598) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:762) > ... 8 more > {code} > Or: > {code} > 2016-08-09 17:44:43,626 INFO > org.apache.flink.streaming.runtime.tasks.StreamTask - Restoring > checkpointed state to task Fold: property_id, player -> 10-minute > Sliding-Window Percentile Aggregation -> Sink: InfluxDB (2/4) > 2016-08-09 17:44:51,236 ERROR akka.remote.EndpointWriter- > Transient association error (association remains live) > akka.remote.OversizedPayloadException: Discarding oversized payload sent to > Actor[akka.tcp://flink@10.55.2.212:6123/user/jobmanager#510517238]: max > allowed size 10485760 bytes, actual size of encoded class > org.apache.f
[jira] [Created] (FLINK-4429) Move Redis Sink from Flink to Bahir
Robert Metzger created FLINK-4429: - Summary: Move Redis Sink from Flink to Bahir Key: FLINK-4429 URL: https://issues.apache.org/jira/browse/FLINK-4429 Project: Flink Issue Type: Task Components: Streaming Connectors Reporter: Robert Metzger Assignee: Robert Metzger As per [1] the Flink community decided to move the Redis connector from Flink to Bahir. [1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Move-Redis-and-Flume-connectors-to-Apache-Bahir-and-redirect-contributions-there-td13102.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4425) "Out Of Memory" during savepoint deserialization
[ https://issues.apache.org/jira/browse/FLINK-4425?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4425: -- Component/s: State Backends, Checkpointing > "Out Of Memory" during savepoint deserialization > > > Key: FLINK-4425 > URL: https://issues.apache.org/jira/browse/FLINK-4425 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.1.1 >Reporter: Sergii Koshel > Attachments: savepoint-c25e4b360a7d.zip > > > I've created savepoint and trying to start job using it (via -s param) and > getting exception like below: > {code:title=Exception|borderStyle=solid} > java.lang.OutOfMemoryError: Java heap space > at > org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer.deserialize(SavepointV1Serializer.java:167) > at > org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer.deserialize(SavepointV1Serializer.java:42) > at > org.apache.flink.runtime.checkpoint.savepoint.FsSavepointStore.loadSavepoint(FsSavepointStore.java:133) > at > org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator.restoreSavepoint(SavepointCoordinator.java:201) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.restoreSavepoint(ExecutionGraph.java:983) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1302) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} > jobmanager.heap.mb: 1280 > taskmanager.heap.mb: 1024 > java 1.8 > savepoint + checkpoint size < 1 Mb in total -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4418) ClusterClient/ConnectionUtils#findConnectingAddress fails immediately if InetAddress.getLocalHost throws exception
[ https://issues.apache.org/jira/browse/FLINK-4418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15428147#comment-15428147 ] Robert Metzger commented on FLINK-4418: --- Thank you for working on this. I gave you contributor permissions in our JIRA, so that you can assign JIRAs to yourself in the future. > ClusterClient/ConnectionUtils#findConnectingAddress fails immediately if > InetAddress.getLocalHost throws exception > -- > > Key: FLINK-4418 > URL: https://issues.apache.org/jira/browse/FLINK-4418 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.1.0 >Reporter: Shannon Carey >Assignee: Shannon Carey > > When attempting to connect to a cluster with a ClusterClient, if the > machine's hostname is not resolvable to an IP, an exception is thrown > preventing success. > This is the case if, for example, the hostname is not present & mapped to a > local IP in /etc/hosts. > The exception is below. I suggest that findAddressUsingStrategy() should > catch java.net.UnknownHostException thrown by InetAddress.getLocalHost() and > return null, allowing alternative strategies to be attempted by > findConnectingAddress(). I will open a PR to this effect. Ideally this could > be included in both 1.2 and 1.1.2. > In the stack trace below, "ip-10-2-64-47" is the internal host name of an AWS > EC2 instance. > {code} > 21:11:35 org.apache.flink.client.program.ProgramInvocationException: Failed > to retrieve the JobManager gateway. > 21:11:35 at > org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:430) > 21:11:35 at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:90) > 21:11:35 at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389) > 21:11:35 at > org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75) > 21:11:35 at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:334) > 21:11:35 at > com.expedia.www.flink.job.scheduler.FlinkJobSubmitter.get(FlinkJobSubmitter.java:81) > 21:11:35 at > com.expedia.www.flink.job.scheduler.streaming.StreamingJobManager.run(StreamingJobManager.java:105) > 21:11:35 at > com.expedia.www.flink.job.scheduler.JobScheduler.runStreamingApp(JobScheduler.java:69) > 21:11:35 at > com.expedia.www.flink.job.scheduler.JobScheduler.main(JobScheduler.java:34) > 21:11:35 Caused by: java.lang.RuntimeException: Failed to resolve JobManager > address at /10.2.89.80:43126 > 21:11:35 at > org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:189) > 21:11:35 at > org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:649) > 21:11:35 at > org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:428) > 21:11:35 ... 8 more > 21:11:35 Caused by: java.net.UnknownHostException: ip-10-2-64-47: > ip-10-2-64-47: unknown error > 21:11:35 at java.net.InetAddress.getLocalHost(InetAddress.java:1505) > 21:11:35 at > org.apache.flink.runtime.net.ConnectionUtils.findAddressUsingStrategy(ConnectionUtils.java:232) > 21:11:35 at > org.apache.flink.runtime.net.ConnectionUtils.findConnectingAddress(ConnectionUtils.java:123) > 21:11:35 at > org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:187) > 21:11:35 ... 10 more > 21:11:35 Caused by: java.net.UnknownHostException: ip-10-2-64-47: unknown > error > 21:11:35 at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) > 21:11:35 at > java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928) > 21:11:35 at > java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323) > 21:11:35 at java.net.InetAddress.getLocalHost(InetAddress.java:1500) > 21:11:35 ... 13 more > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4418) ClusterClient/ConnectionUtils#findConnectingAddress fails immediately if InetAddress.getLocalHost throws exception
[ https://issues.apache.org/jira/browse/FLINK-4418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4418: -- Assignee: Shannon Carey > ClusterClient/ConnectionUtils#findConnectingAddress fails immediately if > InetAddress.getLocalHost throws exception > -- > > Key: FLINK-4418 > URL: https://issues.apache.org/jira/browse/FLINK-4418 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.1.0 >Reporter: Shannon Carey >Assignee: Shannon Carey > > When attempting to connect to a cluster with a ClusterClient, if the > machine's hostname is not resolvable to an IP, an exception is thrown > preventing success. > This is the case if, for example, the hostname is not present & mapped to a > local IP in /etc/hosts. > The exception is below. I suggest that findAddressUsingStrategy() should > catch java.net.UnknownHostException thrown by InetAddress.getLocalHost() and > return null, allowing alternative strategies to be attempted by > findConnectingAddress(). I will open a PR to this effect. Ideally this could > be included in both 1.2 and 1.1.2. > In the stack trace below, "ip-10-2-64-47" is the internal host name of an AWS > EC2 instance. > {code} > 21:11:35 org.apache.flink.client.program.ProgramInvocationException: Failed > to retrieve the JobManager gateway. > 21:11:35 at > org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:430) > 21:11:35 at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:90) > 21:11:35 at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389) > 21:11:35 at > org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75) > 21:11:35 at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:334) > 21:11:35 at > com.expedia.www.flink.job.scheduler.FlinkJobSubmitter.get(FlinkJobSubmitter.java:81) > 21:11:35 at > com.expedia.www.flink.job.scheduler.streaming.StreamingJobManager.run(StreamingJobManager.java:105) > 21:11:35 at > com.expedia.www.flink.job.scheduler.JobScheduler.runStreamingApp(JobScheduler.java:69) > 21:11:35 at > com.expedia.www.flink.job.scheduler.JobScheduler.main(JobScheduler.java:34) > 21:11:35 Caused by: java.lang.RuntimeException: Failed to resolve JobManager > address at /10.2.89.80:43126 > 21:11:35 at > org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:189) > 21:11:35 at > org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:649) > 21:11:35 at > org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:428) > 21:11:35 ... 8 more > 21:11:35 Caused by: java.net.UnknownHostException: ip-10-2-64-47: > ip-10-2-64-47: unknown error > 21:11:35 at java.net.InetAddress.getLocalHost(InetAddress.java:1505) > 21:11:35 at > org.apache.flink.runtime.net.ConnectionUtils.findAddressUsingStrategy(ConnectionUtils.java:232) > 21:11:35 at > org.apache.flink.runtime.net.ConnectionUtils.findConnectingAddress(ConnectionUtils.java:123) > 21:11:35 at > org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:187) > 21:11:35 ... 10 more > 21:11:35 Caused by: java.net.UnknownHostException: ip-10-2-64-47: unknown > error > 21:11:35 at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) > 21:11:35 at > java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928) > 21:11:35 at > java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323) > 21:11:35 at java.net.InetAddress.getLocalHost(InetAddress.java:1500) > 21:11:35 ... 13 more > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4426) Unable to create proxy to the ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4426: -- Description: We have a Mapr cluster on which I am trying to run a single flink job (from examples) on YARN Running the example (./bin/flink run -m yarn-cluster -yn 4 ./examples/batch/WordCount.jar) fails with an "Unable to create proxy to the ResourceManager null" error: More detailed logs from the flink run below (server addresses removed): {code} = 2016-08-18 23:02:32,249 DEBUG org.apache.hadoop.metrics2.lib.MutableMetricsFactory - field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess with annotation @org.apache.hadoop.metrics2.annotation.Metric(value=[Rate of successful kerberos logins and latency (milliseconds)], valueName=Time, about=, type=DEFAULT, always=false, sampleName=Ops) 2016-08-18 23:02:32,261 DEBUG org.apache.hadoop.metrics2.lib.MutableMetricsFactory - field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure with annotation @org.apache.hadoop.metrics2.annotation.Metric(value=[Rate of failed kerberos logins and latency (milliseconds)], valueName=Time, about=, type=DEFAULT, always=false, sampleName=Ops) 2016-08-18 23:02:32,261 DEBUG org.apache.hadoop.metrics2.lib.MutableMetricsFactory - field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.getGroups with annotation @org.apache.hadoop.metrics2.annotation.Metric(value=[GetGroups], valueName=Time, about=, type=DEFAULT, always=false, sampleName=Ops) 2016-08-18 23:02:32,263 DEBUG org.apache.hadoop.metrics2.impl.MetricsSystemImpl - UgiMetrics, User and group related metrics 2016-08-18 23:02:33,777 DEBUG com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils - init 2016-08-18 23:02:33,793 DEBUG com.mapr.baseutils.JVMProperties - Setting JVM property zookeeper.saslprovider to com.mapr.security.simplesasl.SimpleSaslProvider 2016-08-18 23:02:33,794 DEBUG com.mapr.baseutils.JVMProperties - Setting JVM property zookeeper.sasl.clientconfig to Client_simple 2016-08-18 23:02:33,794 DEBUG com.mapr.baseutils.JVMProperties - Setting JVM property java.security.auth.login.config to /opt/mapr/conf/mapr.login.conf 2016-08-18 23:02:33,797 DEBUG org.apache.hadoop.conf.Configuration - Loaded org.apache.hadoop.conf.CoreDefaultProperties 2016-08-18 23:02:33,805 DEBUG org.apache.hadoop.security.UserGroupInformation - HADOOP_SECURITY_AUTHENTICATION is set to: SIMPLE 2016-08-18 23:02:33,805 DEBUG org.apache.hadoop.security.UserGroupInformation - Login configuration entry is hadoop_simple 2016-08-18 23:02:33,806 DEBUG org.apache.hadoop.security.UserGroupInformation - authenticationMethod from JAAS configuration:SIMPLE 2016-08-18 23:02:33,867 DEBUG org.apache.hadoop.conf.Configuration - Loaded org.apache.hadoop.conf.CoreDefaultProperties 2016-08-18 23:02:33,875 DEBUG org.apache.hadoop.security.Groups - Creating new Groups object 2016-08-18 23:02:33,878 DEBUG org.apache.hadoop.util.PerformanceAdvisory - Falling back to shell based 2016-08-18 23:02:33,879 DEBUG org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback - Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping 2016-08-18 23:02:33,934 DEBUG org.apache.hadoop.conf.Configuration - Loaded org.apache.hadoop.conf.CoreDefaultProperties 2016-08-18 23:02:34,002 DEBUG org.apache.hadoop.conf.Configuration - Loaded org.apache.hadoop.yarn.conf.YarnDefaultProperties 2016-08-18 23:02:34,021 DEBUG org.apache.hadoop.util.Shell - setsid exited with exit code 0 2016-08-18 23:02:34,047 DEBUG org.apache.hadoop.security.Groups - Group mapping impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback; cacheTimeout=30; warningDeltaMs=5000 2016-08-18 23:02:34,058 DEBUG org.apache.hadoop.security.login.HadoopLoginModule- Priority principal search list is [class javax.security.auth.kerberos.KerberosPrincipal] 2016-08-18 23:02:34,058 DEBUG org.apache.hadoop.security.login.HadoopLoginModule- Additional principal search list is [class com.sun.security.auth.UnixPrincipal] 2016-08-18 23:02:34,058 DEBUG org.apache.hadoop.security.login.HadoopLoginModule- hadoop login 2016-08-18 23:02:34,059 DEBUG org.apache.hadoop.security.login.HadoopLoginModule- hadoop login commit 2016-08-18 23:02:34,098
[jira] [Commented] (FLINK-4426) Unable to create proxy to the ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15428158#comment-15428158 ] Robert Metzger commented on FLINK-4426: --- Hi, It seems that the MapR cluster is secured, and using some custom MapR code for the security: {code} 2016-08-18 23:02:34,594 DEBUG com.mapr.login.client.MapRLoginHttpsClient - Entering authenticate if needed. 2016-08-18 23:02:34,594 DEBUG com.mapr.login.client.MapRLoginHttpsClient - Kerberos not configured for this cluster. 2016-08-18 23:02:34,594 DEBUG com.mapr.login.client.MapRLoginHttpsClient - security appears to be off 2016-08-18 23:02:38,613 DEBUG com.mapr.fs.MapRFileSystem - User Info object initialized for user hsawhney with user ID 10031 {code} and: {code} 2016-08-18 23:02:38,635 INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=[ZOOKEEPERS] sessionTimeout=3 watcher=com.mapr.util.zookeeper.ZKDataRetrieval@545c1b34 2016-08-18 23:02:38,641 DEBUG org.apache.zookeeper.ClientCnxn - zookeeper.disableAutoWatchReset is false 2016-08-18 23:02:38,668 DEBUG org.apache.zookeeper.client.ZooKeeperSaslClient - JAAS loginContext is: Client_simple 2016-08-18 23:02:38,673 INFO org.apache.zookeeper.Login - successfully logged in. 2016-08-18 23:02:38,685 INFO org.apache.zookeeper.client.ZooKeeperSaslClient - Client will use GSSAPI as SASL mechanism. 2016-08-18 23:02:38,685 DEBUG org.apache.zookeeper.client.ZooKeeperSaslClient - creating sasl client: client=hsawhney;service=zookeeper;serviceHostname=[ZOOKEEPER] 2016-08-18 23:02:38,700 ERROR org.apache.zookeeper.client.ZooKeeperSaslClient - Exception while trying to create SASL client java.security.PrivilegedActionException: javax.security.sasl.SaslException: Failure to initialize security context [Caused by GSSException: Invalid name provided (Mechanism level: KrbException: Cannot locate default realm)] at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.zookeeper.client.ZooKeeperSaslClient.createSaslClient(ZooKeeperSaslClient.java:283) at org.apache.zookeeper.client.ZooKeeperSaslClient.(ZooKeeperSaslClient.java:131) at org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:949) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1003) Caused by: javax.security.sasl.SaslException: Failure to initialize security context [Caused by GSSException: Invalid name provided (Mechanism level: KrbException: Cannot locate default realm)] at com.sun.security.sasl.gsskerb.GssKrb5Client.(GssKrb5Client.java:150) at com.sun.security.sasl.gsskerb.FactoryImpl.createSaslClient(FactoryImpl.java:63) at javax.security.sasl.Sasl.createSaslClient(Sasl.java:372) at org.apache.zookeeper.client.ZooKeeperSaslClient$1.run(ZooKeeperSaslClient.java:288) at org.apache.zookeeper.client.ZooKeeperSaslClient$1.run(ZooKeeperSaslClient.java:283) ... 6 more Caused by: GSSException: Invalid name provided (Mechanism level: KrbException: Cannot locate default realm) at sun.security.jgss.krb5.Krb5NameElement.getInstance(Krb5NameElement.java:129) at sun.security.jgss.krb5.Krb5MechFactory.getNameElement(Krb5MechFactory.java:95) at sun.security.jgss.GSSManagerImpl.getNameElement(GSSManagerImpl.java:202) at sun.security.jgss.GSSNameImpl.getElement(GSSNameImpl.java:476) at sun.security.jgss.GSSNameImpl.init(GSSNameImpl.java:201) at sun.security.jgss.GSSNameImpl.(GSSNameImpl.java:170) at sun.security.jgss.GSSManagerImpl.createName(GSSManagerImpl.java:137) at com.sun.security.sasl.gsskerb.GssKrb5Client.(GssKrb5Client.java:108) ... 10 more {code} I wonder if the MapR support can help you resolving the issue. Maybe the {{MapRLoginHttpsClient}} is initializing some custom auth our Zookeeper client can't use? Maybe you need to compile Flink yourself with a custom MapR Zookeeper version, supporting their authentication mechanisms? [~mxm] do you have an idea? > Unable to create proxy to the ResourceManager > - > > Key: FLINK-4426 > URL: https://issues.apache.org/jira/browse/FLINK-4426 > Project: Flink > Issue Type: Bug > Components: ResourceManager >Affects Versions: 1.0.3, 1.1.1 > Environment: Flink 1.0.3 built with MapR (2.7.0-mapr-1602) >Reporter: Harpreet Sawhney > > We have a Mapr cluster on which I am trying to run a single flink job (from > exampl
[jira] [Resolved] (FLINK-4402) Wrong metrics parameter names in documentation
[ https://issues.apache.org/jira/browse/FLINK-4402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4402. --- Resolution: Resolved Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/3be9a285 Thank you for the contribution! > Wrong metrics parameter names in documentation > --- > > Key: FLINK-4402 > URL: https://issues.apache.org/jira/browse/FLINK-4402 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.1.1 > Environment: all >Reporter: RWenden >Assignee: Neelesh Srinivas Salian >Priority: Trivial > Fix For: 1.1.2 > > Original Estimate: 4h > Remaining Estimate: 4h > > On the page > https://ci.apache.org/projects/flink/flink-docs-master/apis/metrics.html > the following metrics parameters should be > faulty: metrics.scope.tm.task , should be metrics.scope.task > faulty: metrics.scope.tm.operator , should be metrics.scope.operator > to make it work on Flink 1.1.1. > But to fix this, the constants in ConfigConstants.java can also be changed to > fit the documentation. Either way... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4432) (YARN) JobManager web port is no longer available in configuration page in web interface
Robert Metzger created FLINK-4432: - Summary: (YARN) JobManager web port is no longer available in configuration page in web interface Key: FLINK-4432 URL: https://issues.apache.org/jira/browse/FLINK-4432 Project: Flink Issue Type: Task Components: YARN Client Affects Versions: 1.1.1 Reporter: Robert Metzger http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/1-1-1-JobManager-config-endpoint-no-longer-supplies-port-td8498.html {{jobmanager.web.port}} is not set anymore in the config object. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4439) Error message KafkaConsumer08 when all 'bootstrap.servers' are invalid
[ https://issues.apache.org/jira/browse/FLINK-4439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15429735#comment-15429735 ] Robert Metzger commented on FLINK-4439: --- In my opinion, the logging is pretty good. There's a log message at WARN level: {code} 2016-08-21 15:22:30 WARN FlinkKafkaConsumerBase:290 - Error communicating with broker inexistentKafkHost:9092 to find partitions for [testTopic].class java.nio.channels.ClosedChannelException. Message: null {code} and the stack trace is at debug level. I'm not sure if failing fast is a good solution: Maybe its just a temporary issue with the broker, or the client can not contact the broker. > Error message KafkaConsumer08 when all 'bootstrap.servers' are invalid > -- > > Key: FLINK-4439 > URL: https://issues.apache.org/jira/browse/FLINK-4439 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.0.3 >Reporter: Gheorghe Gheorghe >Priority: Minor > > The "flink-connector-kafka-0.8_2" is logging the following error when all > 'bootstrap.servers' are invalid when passed to the FlinkKafkaConsumer08. > See stacktrace: > {code:title=stacktrace|borderStyle=solid} > 2016-08-21 15:22:30 WARN FlinkKafkaConsumerBase:290 - Error communicating > with broker inexistentKafkHost:9092 to find partitions for [testTopic].class > java.nio.channels.ClosedChannelException. Message: null > 2016-08-21 15:22:30 DEBUG FlinkKafkaConsumerBase:292 - Detailed trace > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) > at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:91) > at kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:68) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:264) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:164) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:131) > at MetricsFromKafka$.main(MetricsFromKafka.scala:38) > at MetricsFromKafka.main(MetricsFromKafka.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at sbt.Run.invokeMain(Run.scala:67) > at sbt.Run.run0(Run.scala:61) > at sbt.Run.sbt$Run$$execute$1(Run.scala:51) > at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:55) > at sbt.Run$$anonfun$run$1.apply(Run.scala:55) > at sbt.Run$$anonfun$run$1.apply(Run.scala:55) > at sbt.Logger$$anon$4.apply(Logger.scala:84) > at sbt.TrapExit$App.run(TrapExit.scala:248) > at java.lang.Thread.run(Thread.java:745) > {code} > In the above stackrace it is hard to figure out that the actual servers > provided as a config cannot be resolved to a valid ip address. Moreover the > flink kafka consumer will try all of those servers one by one and failing to > get partition information. > The suggested improvement is to fail fast and announce the user that the > servers provided in the 'boostrap.servers' config are invalid. If at least > one server is valid then the exception should not be thrown. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4222) Allow Kinesis configuration to get credentials from AWS Metadata
[ https://issues.apache.org/jira/browse/FLINK-4222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4222. --- Resolution: Fixed Fix Version/s: 1.2.0 Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/a497ebc8 > Allow Kinesis configuration to get credentials from AWS Metadata > > > Key: FLINK-4222 > URL: https://issues.apache.org/jira/browse/FLINK-4222 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.0.3 >Reporter: Nick Chadwick >Priority: Minor > Labels: easyfix > Fix For: 1.2.0 > > Original Estimate: 1h > Remaining Estimate: 1h > > When deploying Flink TaskManagers in an EC2 environment, it would be nice to > be able to use the EC2 IAM Role credentials provided by the EC2 Metadata > service. > This allows for credentials to be automatically discovered by services > running on EC2 instances at runtime, and removes the need to explicitly > create and assign credentials to TaskManagers. > This should be a fairly small change to the configuration of the > flink-connector-kinesis, which will greatly improve the ease of deployment to > Amazon EC2 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2055) Implement Streaming HBaseSink
[ https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-2055: -- Assignee: (was: Hilmi Yildirim) > Implement Streaming HBaseSink > - > > Key: FLINK-2055 > URL: https://issues.apache.org/jira/browse/FLINK-2055 > Project: Flink > Issue Type: New Feature > Components: Streaming, Streaming Connectors >Affects Versions: 0.9 >Reporter: Robert Metzger > > As per : > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2055) Implement Streaming HBaseSink
[ https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-2055: -- Assignee: Erli Ding > Implement Streaming HBaseSink > - > > Key: FLINK-2055 > URL: https://issues.apache.org/jira/browse/FLINK-2055 > Project: Flink > Issue Type: New Feature > Components: Streaming, Streaming Connectors >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Erli Ding > > As per : > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4446) Move Flume Sink from Flink to Bahir
Robert Metzger created FLINK-4446: - Summary: Move Flume Sink from Flink to Bahir Key: FLINK-4446 URL: https://issues.apache.org/jira/browse/FLINK-4446 Project: Flink Issue Type: Task Components: Streaming Connectors Reporter: Robert Metzger Assignee: Robert Metzger As per [1] the Flink community decided to move the Flume connector from Flink to Bahir. [1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Move-Redis-and-Flume-connectors-to-Apache-Bahir-and-redirect-contributions-there-td13102.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4341) Kinesis connector does not emit maximum watermark properly
[ https://issues.apache.org/jira/browse/FLINK-4341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15433114#comment-15433114 ] Robert Metzger commented on FLINK-4341: --- For the short term: I think we have to disable the transparent resharding support in Kinesis. Before we can add support for partition discovery in Kafka (FLINK-4022) and before activating resharding in kinesis again, we need to implement a low watermark service in the JobManager. > Kinesis connector does not emit maximum watermark properly > -- > > Key: FLINK-4341 > URL: https://issues.apache.org/jira/browse/FLINK-4341 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0, 1.1.1 >Reporter: Scott Kidder >Assignee: Robert Metzger >Priority: Blocker > Fix For: 1.2.0, 1.1.2 > > > **Prevously reported as "Checkpoint state size grows unbounded when task > parallelism not uniform"** > This issue was first encountered with Flink release 1.1.0 (commit 45f7825). I > was previously using a 1.1.0 snapshot (commit 18995c8) which performed as > expected. This issue was introduced somewhere between those commits. > I've got a Flink application that uses the Kinesis Stream Consumer to read > from a Kinesis stream with 2 shards. I've got 2 task managers with 2 slots > each, providing a total of 4 slots. When running the application with a > parallelism of 4, the Kinesis consumer uses 2 slots (one per Kinesis shard) > and 4 slots for subsequent tasks that process the Kinesis stream data. I use > an in-memory store for checkpoint data. > Yesterday I upgraded to Flink 1.1.0 (45f7825) and noticed that checkpoint > states were growing unbounded when running with a parallelism of 4, > checkpoint interval of 10 seconds: > {code} > ID State Size > 1 11.3 MB > 220.9 MB > 3 30.6 MB > 4 41.4 MB > 5 52.6 MB > 6 62.5 MB > 7 71.5 MB > 8 83.3 MB > 9 93.5 MB > {code} > The first 4 checkpoints generally succeed, but then fail with an exception > like the following: > {code} > java.lang.RuntimeException: Error triggering a checkpoint as the result of > receiving checkpoint barrier at > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:768) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:758) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: Size of the state is larger than the maximum > permitted memory-backed state. Size=12105407 , maxSize=5242880 . Consider > using a different state backend, like the File System State backend. > at > org.apache.flink.runtime.state.memory.MemoryStateBackend.checkSize(MemoryStateBackend.java:146) > at > org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetBytes(MemoryStateBackend.java:200) > at > org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetHandle(MemoryStateBackend.java:190) > at > org.apache.flink.runtime.state.AbstractStateBackend$CheckpointStateOutputView.closeAndGetHandle(AbstractStateBackend.java:447) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:879) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:598) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:762) > ... 8 more > {code} > Or: > {code} > 2016-08-09 17:44:43,626 INFO > org.apache.flink.streaming.runtime.tasks.StreamTask - Restoring > checkpointed state to task Fold: property_id, player -> 10-minute > Sliding-Window Percentile Aggregation -> Sink: InfluxDB (2/4) > 2016-08-09 17:44:51,236 ERROR akka.remote.EndpointWriter- > Transient association error (association remains live) > akka.remote.OversizedPayloadException: Discarding oversized payload sent to > Actor[akka.tcp://flink@10.55.2.212:6123/user/jobmanager#510517238]: max > allowed size 10485760 bytes, actual size of encoded class > org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint was > 10891825 bytes. > {code} > T
[jira] [Closed] (FLINK-3481) KafkaShortRetention08ITCase.testAutoOffsetReset failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-3481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger closed FLINK-3481. - Resolution: Cannot Reproduce I'm closing this one since the issue hasn't occurred since February. > KafkaShortRetention08ITCase.testAutoOffsetReset failed on Travis > > > Key: FLINK-3481 > URL: https://issues.apache.org/jira/browse/FLINK-3481 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Robert Metzger >Priority: Critical > Labels: test-stability > > The KafkaShortRetention08ITCase.testAutoOffsetReset test case failed on > Travis [1]. > [1] https://s3.amazonaws.com/archive.travis-ci.org/jobs/66400/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-3343) Exception while using Kafka 0.9 connector
[ https://issues.apache.org/jira/browse/FLINK-3343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger closed FLINK-3343. - Resolution: Invalid > Exception while using Kafka 0.9 connector > -- > > Key: FLINK-3343 > URL: https://issues.apache.org/jira/browse/FLINK-3343 > Project: Flink > Issue Type: Improvement > Components: flink-contrib, Kafka Connector >Affects Versions: 1.0.0 >Reporter: Farouk Salem > > While running a job, without fault tolerance, producing data to Kafka, the > job failed due to "Batch Expired exception". I tried to increase the > "request.timeout.ms" and "max.block.ms" to 6 instead of 3 but still > the same problem. The only way to ride on this problem is using snapshotting. > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48106 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48105 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,036 WARN org.apache.kafka.clients.producer.internals.Sender >- Got error produce response with correlation id 48104 on topic-partition > flinkWordCountNoFaultToleranceSmall > -2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION > 09:58:11,068 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask >- Caught exception while processing timer. > java.lang.RuntimeException: Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:319) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.streaming.runtime.operators.windowing.AggregatingKeyedTimePanes.evaluateWindow(AggregatingKeyedTimePanes.java:59) > at > org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.computeWindow(AbstractAlignedProcessingTimeWindowOperator.java:242) > at > org.apache.flink.streaming.runtime.operators.windowing.AbstractAlignedProcessingTimeWindowOperator.trigger(AbstractAlignedProcessingTimeWindowOperator.java:223) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:606) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:319) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:300) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:48) > at > org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:29) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:316) > ... 15 more > Caused by: java.lang.Exception: Failed to send data to Kafka: Batch Expired > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:37) >
[jira] [Resolved] (FLINK-3683) Unstable tests: in the Kafka09ITCase, the testMultipleSourcesOnePartition() and the testOneToOneSources()
[ https://issues.apache.org/jira/browse/FLINK-3683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-3683. --- Resolution: Cannot Reproduce I'm closing this one since it hasn't occurred since March, and the Kafka tests have been hardened in the meantime. Please reopen if the error occurs again. > Unstable tests: in the Kafka09ITCase, the testMultipleSourcesOnePartition() > and the testOneToOneSources() > - > > Key: FLINK-3683 > URL: https://issues.apache.org/jira/browse/FLINK-3683 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Kostas Kloudas > Labels: test-stability > > The aforementioned tests fail sometimes. To reproduce the behavior put them > in a for-loop and let them run 100 times. In this case the problem seems to > be that the topic was not deleted before being recreated for the next run. > And for a trace on Travis, look here: > https://api.travis-ci.org/jobs/119493332/log.txt?deansi=true > (although this was not on the master branch) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-3823) Kafka08ITCase.testOffsetInZookeeper failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-3823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger closed FLINK-3823. - Resolution: Cannot Reproduce I'm closing this one since it hasn't occurred since April, and the Kafka tests have been hardened in the meantime. Please reopen if the error occurs again. > Kafka08ITCase.testOffsetInZookeeper failed on Travis > > > Key: FLINK-3823 > URL: https://issues.apache.org/jira/browse/FLINK-3823 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > > The test case {{Kafka08ITCase.testOffsetInZookeeper}} failed on Travis. > https://s3.amazonaws.com/archive.travis-ci.org/jobs/125795678/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-3531) KafkaShortRetention09ITCase.testAutoOffsetReset fails on Travis
[ https://issues.apache.org/jira/browse/FLINK-3531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-3531. --- Resolution: Cannot Reproduce I'm closing this one since it hasn't occurred since February, and the Kafka tests have been hardened in the meantime. Please reopen if the error occurs again. > KafkaShortRetention09ITCase.testAutoOffsetReset fails on Travis > --- > > Key: FLINK-3531 > URL: https://issues.apache.org/jira/browse/FLINK-3531 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.0 >Reporter: Till Rohrmann > Labels: test-stability > > The test case {{KafkaShortRetention09ITCase.testAutoOffsetReset}} failed on > Travis. > https://s3.amazonaws.com/archive.travis-ci.org/jobs/112049279/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-2049) KafkaSink sporadically fails to send message
[ https://issues.apache.org/jira/browse/FLINK-2049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-2049. --- Resolution: Cannot Reproduce I'm closing this one since it hasn't occurred since May 2015, and the Kafka tests have been hardened in the meantime. Please reopen if the error occurs again. > KafkaSink sporadically fails to send message > > > Key: FLINK-2049 > URL: https://issues.apache.org/jira/browse/FLINK-2049 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Streaming >Affects Versions: 0.9 >Reporter: Robert Metzger > > This test https://travis-ci.org/StephanEwen/incubator-flink/jobs/63147661 > failed with: > {code} > 10:38:22,415 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask >- StreamOperator failed due to: java.lang.RuntimeException: > java.lang.RuntimeException: kafka.common.FailedToSendMessageException: Failed > to send messages after 10 tries. > at > org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:142) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:34) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:139) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: > kafka.common.FailedToSendMessageException: Failed to send messages after 10 > tries. > at > org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:142) > at > org.apache.flink.streaming.api.operators.ChainableStreamOperator.collect(ChainableStreamOperator.java:54) > at > org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:39) > at > org.apache.flink.streaming.connectors.kafka.KafkaITCase$3.run(KafkaITCase.java:326) > at > org.apache.flink.streaming.api.operators.StreamSource.callUserFunction(StreamSource.java:40) > at > org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137) > ... 4 more > Caused by: kafka.common.FailedToSendMessageException: Failed to send messages > after 10 tries. > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) > at kafka.producer.Producer.send(Producer.scala:77) > at kafka.javaapi.producer.Producer.send(Producer.scala:33) > at > org.apache.flink.streaming.connectors.kafka.api.KafkaSink.invoke(KafkaSink.java:183) > at > org.apache.flink.streaming.api.operators.StreamSink.callUserFunction(StreamSink.java:39) > at > org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137) > ... 9 more > {code} > I've extracted the relevant logs: > https://gist.github.com/rmetzger/ddbb0fead5efdd58a539. > The error comes from Kafka's producer code. We are not doing much in our > Kafka Sink, so I really think this is not really a flink issue. > When the issue occurs again, I'll write to the Kafka list so seek for help. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-2735) KafkaProducerITCase.testCustomPartitioning sporadically fails
[ https://issues.apache.org/jira/browse/FLINK-2735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-2735. --- Resolution: Cannot Reproduce I'm closing this one since it hasn't occurred since November 15, and the Kafka tests have been hardened in the meantime. Please reopen if the error occurs again. > KafkaProducerITCase.testCustomPartitioning sporadically fails > - > > Key: FLINK-2735 > URL: https://issues.apache.org/jira/browse/FLINK-2735 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 0.10.0 >Reporter: Robert Metzger > Labels: test-stability > > In the following test run: > https://s3.amazonaws.com/archive.travis-ci.org/jobs/8158/log.txt > there was the following failure > {code} > Caused by: java.lang.Exception: Unable to get last offset for topic > customPartitioningTestTopic and partitions [FetchPartition {partition=2, > offset=-915623761776}]. > Exception for partition 2: kafka.common.UnknownException > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:408) > at java.lang.Class.newInstance(Class.java:438) > at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86) > at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala) > at > org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getLastOffset(LegacyFetcher.java:521) > at > org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:370) > at > org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:168) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Unable to get last offset for topic > customPartitioningTestTopic and partitions [FetchPartition {partition=2, > offset=-915623761776}]. > Exception for partition 2: kafka.common.UnknownException > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:408) > at java.lang.Class.newInstance(Class.java:438) > at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86) > at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala) > at > org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getLastOffset(LegacyFetcher.java:521) > at > org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:370) > at > org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getLastOffset(LegacyFetcher.java:524) > at > org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:370) > Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 17.455 sec > <<< FAILURE! - in > org.apache.flink.streaming.connectors.kafka.KafkaProducerITCase > testCustomPartitioning(org.apache.flink.streaming.connectors.kafka.KafkaProducerITCase) > Time elapsed: 7.809 sec <<< FAILURE! > java.lang.AssertionError: Test failed: The program execution failed: Job > execution failed. > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.streaming.connectors.kafka.KafkaTestBase.tryExecute(KafkaTestBase.java:313) > at > org.apache.flink.streaming.connectors.kafka.KafkaProducerITCase.testCustomPartitioning(KafkaProducerITCase.java:155) > {code} > From the broker logs it seems to be an issue in the Kafka broker > {code} > 14:43:03,328 INFO kafka.network.Processor >- Closing socket connection to /127.0.0.1. > 14:43:03,334 WARN kafka.server.KafkaApis
[jira] [Commented] (FLINK-3146) Expose the Kafka consumer lag as a metric in the web interface for Kafka 0.8.x
[ https://issues.apache.org/jira/browse/FLINK-3146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15434565#comment-15434565 ] Robert Metzger commented on FLINK-3146: --- FLINK-4186 is related, as it exposes the current read offset through Flink's metrics. External systems can use that to determine the lag. > Expose the Kafka consumer lag as a metric in the web interface for Kafka 0.8.x > -- > > Key: FLINK-3146 > URL: https://issues.apache.org/jira/browse/FLINK-3146 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Robert Metzger > > Right now the Kafka consumer is not reporting any metrics to the web frontend. > For the Kafka 0.9.0.0 I'll probably use the metrics reported by Kafka, for > Kafka 0.8. we need to manually request the current ("latest") offset and > compare it to the last read offset. > I've added this JIRA to see how many Flink users are interested in this. If > you want this feature, please comment into the JIRA! -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4479) Replace trademark (tm) with registered trademark (R) sign on Flink website
Robert Metzger created FLINK-4479: - Summary: Replace trademark (tm) with registered trademark (R) sign on Flink website Key: FLINK-4479 URL: https://issues.apache.org/jira/browse/FLINK-4479 Project: Flink Issue Type: Bug Components: Project Website Reporter: Robert Metzger Assignee: Robert Metzger Flink is now a registered trademark, so we should reflect that on our website. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3823) Kafka08ITCase.testOffsetInZookeeper failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-3823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15436839#comment-15436839 ] Robert Metzger commented on FLINK-3823: --- I pushed a commit to fix the log upload so that I can see what's going on. http://git-wip-us.apache.org/repos/asf/flink/commit/444315a1 > Kafka08ITCase.testOffsetInZookeeper failed on Travis > > > Key: FLINK-3823 > URL: https://issues.apache.org/jira/browse/FLINK-3823 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > > The test case {{Kafka08ITCase.testOffsetInZookeeper}} failed on Travis. > https://s3.amazonaws.com/archive.travis-ci.org/jobs/125795678/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3704) JobManagerHAProcessFailureBatchRecoveryITCase.testJobManagerProcessFailure unstable
[ https://issues.apache.org/jira/browse/FLINK-3704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15436844#comment-15436844 ] Robert Metzger commented on FLINK-3704: --- The issue still persists: https://s3.amazonaws.com/archive.travis-ci.org/jobs/154771942/log.txt {code} testJobManagerProcessFailure[1](org.apache.flink.test.recovery.JobManagerHAProcessFailureBatchRecoveryITCase) Time elapsed: 300.276 sec <<< ERROR! java.util.concurrent.TimeoutException: Futures timed out after [278414 nanoseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at scala.concurrent.Await.result(package.scala) at org.apache.flink.runtime.testutils.JobManagerActorTestUtils.waitForTaskManagers(JobManagerActorTestUtils.java:152) at org.apache.flink.test.recovery.JobManagerHAProcessFailureBatchRecoveryITCase.testJobManagerProcessFailure(JobManagerHAProcessFailureBatchRecoveryITCase.java:294) {code} > JobManagerHAProcessFailureBatchRecoveryITCase.testJobManagerProcessFailure > unstable > --- > > Key: FLINK-3704 > URL: https://issues.apache.org/jira/browse/FLINK-3704 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Robert Metzger > Labels: test-stability > > https://s3.amazonaws.com/archive.travis-ci.org/jobs/120882840/log.txt > {code} > testJobManagerProcessFailure[1](org.apache.flink.test.recovery.JobManagerHAProcessFailureBatchRecoveryITCase) > Time elapsed: 9.302 sec <<< ERROR! > java.io.IOException: Actor at > akka.tcp://flink@127.0.0.1:55591/user/jobmanager not reachable. Please make > sure that the actor is running and its port is reachable. > at > org.apache.flink.runtime.akka.AkkaUtils$.getActorRef(AkkaUtils.scala:384) > at org.apache.flink.runtime.akka.AkkaUtils.getActorRef(AkkaUtils.scala) > at > org.apache.flink.test.recovery.JobManagerHAProcessFailureBatchRecoveryITCase.testJobManagerProcessFailure(JobManagerHAProcessFailureBatchRecoveryITCase.java:290) > Caused by: akka.actor.ActorNotFound: Actor not found for: > ActorSelection[Anchor(akka.tcp://flink@127.0.0.1:55591/), > Path(/user/jobmanager)] > at > akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) > at > akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) > at > scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) > at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) > at > akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) > at > akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) > at > akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) > at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267) > at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508) > at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541) > at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531) > at > akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87) > at akka.remote.EndpointWriter.postStop(Endpoint.scala:561) > at akka.actor.Actor$class.aroundPostStop(Actor.scala:475) > at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415) > at > akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) > at > akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) > at akka.actor.ActorCell.terminate(ActorCell.scala:369) > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) > at akka.actor.ActorCell.systemInv
[jira] [Updated] (FLINK-4433) Refactor the StreamSource.
[ https://issues.apache.org/jira/browse/FLINK-4433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4433: -- Component/s: DataStream API > Refactor the StreamSource. > -- > > Key: FLINK-4433 > URL: https://issues.apache.org/jira/browse/FLINK-4433 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > > With the addition of continuous file monitoring, apart from the > {{StreamSource}} also the {{ContinuousFileReaderOperator}} uses a > {{SourceContext}}. Given this, all the implementations of the > {{SourceContext}} should be removed from the {{StreamSource}} and become > independent classes. > In addition, the {{AsyncExceptionChecker}} interface should be removed as its > functionality can be replaced by the {{task.failExternally()}} method. This > also implies slight changes in the source context implementations. > Also in the {{trigger()}} method of the {{WatermarkEmittingTask}}, all the > {{owner.getCurrentProcessingTime()}} could be replaced by the {{timestamp}} > argument of that method. This will remove some of the calls to the > {{getCurrentProcessingTime()}} which can be expensive. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4433) Refactor the StreamSource.
[ https://issues.apache.org/jira/browse/FLINK-4433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4433: -- Issue Type: Improvement (was: Bug) > Refactor the StreamSource. > -- > > Key: FLINK-4433 > URL: https://issues.apache.org/jira/browse/FLINK-4433 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > > With the addition of continuous file monitoring, apart from the > {{StreamSource}} also the {{ContinuousFileReaderOperator}} uses a > {{SourceContext}}. Given this, all the implementations of the > {{SourceContext}} should be removed from the {{StreamSource}} and become > independent classes. > In addition, the {{AsyncExceptionChecker}} interface should be removed as its > functionality can be replaced by the {{task.failExternally()}} method. This > also implies slight changes in the source context implementations. > Also in the {{trigger()}} method of the {{WatermarkEmittingTask}}, all the > {{owner.getCurrentProcessingTime()}} could be replaced by the {{timestamp}} > argument of that method. This will remove some of the calls to the > {{getCurrentProcessingTime()}} which can be expensive. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4479) Replace trademark (tm) with registered trademark (R) sign on Flink website
[ https://issues.apache.org/jira/browse/FLINK-4479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15437181#comment-15437181 ] Robert Metzger commented on FLINK-4479: --- PR open: https://github.com/apache/flink-web/pull/32 > Replace trademark (tm) with registered trademark (R) sign on Flink website > -- > > Key: FLINK-4479 > URL: https://issues.apache.org/jira/browse/FLINK-4479 > Project: Flink > Issue Type: Bug > Components: Project Website >Reporter: Robert Metzger >Assignee: Robert Metzger > > Flink is now a registered trademark, so we should reflect that on our website. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input
[ https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-3679: -- Component/s: Kafka Connector > DeserializationSchema should handle zero or more outputs for every input > > > Key: FLINK-3679 > URL: https://issues.apache.org/jira/browse/FLINK-3679 > Project: Flink > Issue Type: Bug > Components: DataStream API, Kafka Connector >Reporter: Jamie Grier > > There are a couple of issues with the DeserializationSchema API that I think > should be improved. This request has come to me via an existing Flink user. > The main issue is simply that the API assumes that there is a one-to-one > mapping between input and outputs. In reality there are scenarios where one > input message (say from Kafka) might actually map to zero or more logical > elements in the pipeline. > Particularly important here is the case where you receive a message from a > source (such as Kafka) and say the raw bytes don't deserialize properly. > Right now the only recourse is to throw IOException and therefore fail the > job. > This is definitely not good since bad data is a reality and failing the job > is not the right option. If the job fails we'll just end up replaying the > bad data and the whole thing will start again. > Instead in this case it would be best if the user could just return the empty > set. > The other case is where one input message should logically be multiple output > messages. This case is probably less important since there are other ways to > do this but in general it might be good to make the > DeserializationSchema.deserialize() method return a collection rather than a > single element. > Maybe we need to support a DeserializationSchema variant that has semantics > more like that of FlatMap. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4524) JobManagerProcessReapingTest.testReapProcessOnFailure unstable
Robert Metzger created FLINK-4524: - Summary: JobManagerProcessReapingTest.testReapProcessOnFailure unstable Key: FLINK-4524 URL: https://issues.apache.org/jira/browse/FLINK-4524 Project: Flink Issue Type: Bug Components: JobManager Reporter: Robert Metzger In this travis build: https://s3.amazonaws.com/archive.travis-ci.org/jobs/155355651/log.txt the {{JobManagerProcessReapingTest.testReapProcessOnFailure}} test failed with {code} testReapProcessOnFailure(org.apache.flink.runtime.jobmanager.JobManagerProcessReapingTest) Time elapsed: 8.642 sec <<< FAILURE! java.lang.AssertionError: JobManager process did not terminate upon actor death at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.assertTrue(Assert.java:41) at org.junit.Assert.assertFalse(Assert.java:64) at org.apache.flink.runtime.jobmanager.JobManagerProcessReapingTest.testReapProcessOnFailure(JobManagerProcessReapingTest.java:155) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3704) JobManagerHAProcessFailureBatchRecoveryITCase.testJobManagerProcessFailure unstable
[ https://issues.apache.org/jira/browse/FLINK-3704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15445309#comment-15445309 ] Robert Metzger commented on FLINK-3704: --- Another failure: https://s3.amazonaws.com/archive.travis-ci.org/jobs/155395229/log.txt It now also contains logs again :) > JobManagerHAProcessFailureBatchRecoveryITCase.testJobManagerProcessFailure > unstable > --- > > Key: FLINK-3704 > URL: https://issues.apache.org/jira/browse/FLINK-3704 > Project: Flink > Issue Type: Bug > Components: JobManager >Reporter: Robert Metzger > Labels: test-stability > > https://s3.amazonaws.com/archive.travis-ci.org/jobs/120882840/log.txt > {code} > testJobManagerProcessFailure[1](org.apache.flink.test.recovery.JobManagerHAProcessFailureBatchRecoveryITCase) > Time elapsed: 9.302 sec <<< ERROR! > java.io.IOException: Actor at > akka.tcp://flink@127.0.0.1:55591/user/jobmanager not reachable. Please make > sure that the actor is running and its port is reachable. > at > org.apache.flink.runtime.akka.AkkaUtils$.getActorRef(AkkaUtils.scala:384) > at org.apache.flink.runtime.akka.AkkaUtils.getActorRef(AkkaUtils.scala) > at > org.apache.flink.test.recovery.JobManagerHAProcessFailureBatchRecoveryITCase.testJobManagerProcessFailure(JobManagerHAProcessFailureBatchRecoveryITCase.java:290) > Caused by: akka.actor.ActorNotFound: Actor not found for: > ActorSelection[Anchor(akka.tcp://flink@127.0.0.1:55591/), > Path(/user/jobmanager)] > at > akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) > at > akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) > at > scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) > at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) > at > akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) > at > akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) > at > akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) > at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267) > at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508) > at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541) > at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531) > at > akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87) > at akka.remote.EndpointWriter.postStop(Endpoint.scala:561) > at akka.actor.Actor$class.aroundPostStop(Actor.scala:475) > at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415) > at > akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) > at > akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) > at akka.actor.ActorCell.terminate(ActorCell.scala:369) > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) > at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4341) Kinesis connector does not emit maximum watermark properly
[ https://issues.apache.org/jira/browse/FLINK-4341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4341. --- Resolution: Fixed Assignee: Tzu-Li (Gordon) Tai (was: Robert Metzger) Resolved for master in http://git-wip-us.apache.org/repos/asf/flink/commit/7b574cf5 Resolved for 1.1.2 in http://git-wip-us.apache.org/repos/asf/flink/commit/81f30c5e > Kinesis connector does not emit maximum watermark properly > -- > > Key: FLINK-4341 > URL: https://issues.apache.org/jira/browse/FLINK-4341 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0, 1.1.1 >Reporter: Scott Kidder >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.2.0, 1.1.2 > > > **Prevously reported as "Checkpoint state size grows unbounded when task > parallelism not uniform"** > This issue was first encountered with Flink release 1.1.0 (commit 45f7825). I > was previously using a 1.1.0 snapshot (commit 18995c8) which performed as > expected. This issue was introduced somewhere between those commits. > I've got a Flink application that uses the Kinesis Stream Consumer to read > from a Kinesis stream with 2 shards. I've got 2 task managers with 2 slots > each, providing a total of 4 slots. When running the application with a > parallelism of 4, the Kinesis consumer uses 2 slots (one per Kinesis shard) > and 4 slots for subsequent tasks that process the Kinesis stream data. I use > an in-memory store for checkpoint data. > Yesterday I upgraded to Flink 1.1.0 (45f7825) and noticed that checkpoint > states were growing unbounded when running with a parallelism of 4, > checkpoint interval of 10 seconds: > {code} > ID State Size > 1 11.3 MB > 220.9 MB > 3 30.6 MB > 4 41.4 MB > 5 52.6 MB > 6 62.5 MB > 7 71.5 MB > 8 83.3 MB > 9 93.5 MB > {code} > The first 4 checkpoints generally succeed, but then fail with an exception > like the following: > {code} > java.lang.RuntimeException: Error triggering a checkpoint as the result of > receiving checkpoint barrier at > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:768) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:758) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: Size of the state is larger than the maximum > permitted memory-backed state. Size=12105407 , maxSize=5242880 . Consider > using a different state backend, like the File System State backend. > at > org.apache.flink.runtime.state.memory.MemoryStateBackend.checkSize(MemoryStateBackend.java:146) > at > org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetBytes(MemoryStateBackend.java:200) > at > org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetHandle(MemoryStateBackend.java:190) > at > org.apache.flink.runtime.state.AbstractStateBackend$CheckpointStateOutputView.closeAndGetHandle(AbstractStateBackend.java:447) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:879) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:598) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:762) > ... 8 more > {code} > Or: > {code} > 2016-08-09 17:44:43,626 INFO > org.apache.flink.streaming.runtime.tasks.StreamTask - Restoring > checkpointed state to task Fold: property_id, player -> 10-minute > Sliding-Window Percentile Aggregation -> Sink: InfluxDB (2/4) > 2016-08-09 17:44:51,236 ERROR akka.remote.EndpointWriter- > Transient association error (association remains live) > akka.remote.OversizedPayloadException: Discarding oversized payload sent to > Actor[akka.tcp://flink@10.55.2.212:6123/user/jobmanager#510517238]: max > allowed size 10485760 bytes, actual size of encoded class > org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint was > 10891825 bytes. > {code} > This can be fixed by simply submitting the job with a parallelism of 2. I > suspect
[jira] [Commented] (FLINK-4485) Finished jobs in yarn session fill /tmp filesystem
[ https://issues.apache.org/jira/browse/FLINK-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15445493#comment-15445493 ] Robert Metzger commented on FLINK-4485: --- I looked a bit into the issue because I'm wondering whether we want to include the fix into the 1.1.2 release. So to me it seems that the {{BlobLibraryCacheManager}} is doing everything as expected. I wonder whether we need to close the URLClassloader when removing the job from the JobManager. > Finished jobs in yarn session fill /tmp filesystem > -- > > Key: FLINK-4485 > URL: https://issues.apache.org/jira/browse/FLINK-4485 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.1.0 >Reporter: Niels Basjes >Priority: Blocker > > On a Yarn cluster I start a yarn-session with a few containers and task slots. > Then I fire a 'large' number of Flink batch jobs in sequence against this > yarn session. It is the exact same job (java code) yet it gets different > parameters. > In this scenario it is exporting HBase tables to files in HDFS and the > parameters are about which data from which tables and the name of the target > directory. > After running several dozen jobs the jobs submission started to fail and we > investigated. > We found that the cause was that on the Yarn node which was hosting the > jobmanager the /tmp file system was full (4GB was 100% full). > How ever the output of {{du -hcs /tmp}} showed only 200MB in use. > We found that a very large file (we guess it is the jar of the job) was put > in /tmp , used, deleted yet the file handle was not closed by the jobmanager. > As soon as we killed the jobmanager the disk space was freed. > The summary of the impact of this is that a yarn-session that receives enough > jobs brings down the Yarn node for all users. > See parts of the output we got from {{lsof}} below. > {code} > COMMAND PID USER FD TYPE DEVICE SIZE > NODE NAME > java 15034 nbasjes 550r REG 253,17 66219695 > 245 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0003 > (deleted) > java 15034 nbasjes 551r REG 253,17 66219695 > 252 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0007 > (deleted) > java 15034 nbasjes 552r REG 253,17 66219695 > 267 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0012 > (deleted) > java 15034 nbasjes 553r REG 253,17 66219695 > 250 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0005 > (deleted) > java 15034 nbasjes 554r REG 253,17 66219695 > 288 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0018 > (deleted) > java 15034 nbasjes 555r REG 253,17 66219695 > 298 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0025 > (deleted) > java 15034 nbasjes 557r REG 253,17 66219695 > 254 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0008 > (deleted) > java 15034 nbasjes 558r REG 253,17 66219695 > 292 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0019 > (deleted) > java 15034 nbasjes 559r REG 253,17 66219695 > 275 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0013 > (deleted) > java 15034 nbasjes 560r REG 253,17 66219695 > 159 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0002 > (deleted) > java 15034 nbasjes 562r REG 253,17 66219695 > 238 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0001 > (deleted) > java 15034 nbasjes 568r REG 253,17 66219695 > 246 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0004 > (deleted) > java 15034 nbasjes 569r REG 253,17 66219695 > 255 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0009 > (deleted) > java 15034 nbasjes 571r REG 253,17 66219695 > 299 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0026 > (deleted) > java 15034 nbasjes 572r REG 253,17 66219695 > 293 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0020 > (deleted) > java 15034 nbasjes 574r REG 253,17 66219695 > 256 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0010
[jira] [Commented] (FLINK-4485) Finished jobs in yarn session fill /tmp filesystem
[ https://issues.apache.org/jira/browse/FLINK-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15445495#comment-15445495 ] Robert Metzger commented on FLINK-4485: --- Sorry, I didn't refresh before writing a comment. > Finished jobs in yarn session fill /tmp filesystem > -- > > Key: FLINK-4485 > URL: https://issues.apache.org/jira/browse/FLINK-4485 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.1.0 >Reporter: Niels Basjes >Priority: Blocker > > On a Yarn cluster I start a yarn-session with a few containers and task slots. > Then I fire a 'large' number of Flink batch jobs in sequence against this > yarn session. It is the exact same job (java code) yet it gets different > parameters. > In this scenario it is exporting HBase tables to files in HDFS and the > parameters are about which data from which tables and the name of the target > directory. > After running several dozen jobs the jobs submission started to fail and we > investigated. > We found that the cause was that on the Yarn node which was hosting the > jobmanager the /tmp file system was full (4GB was 100% full). > How ever the output of {{du -hcs /tmp}} showed only 200MB in use. > We found that a very large file (we guess it is the jar of the job) was put > in /tmp , used, deleted yet the file handle was not closed by the jobmanager. > As soon as we killed the jobmanager the disk space was freed. > The summary of the impact of this is that a yarn-session that receives enough > jobs brings down the Yarn node for all users. > See parts of the output we got from {{lsof}} below. > {code} > COMMAND PID USER FD TYPE DEVICE SIZE > NODE NAME > java 15034 nbasjes 550r REG 253,17 66219695 > 245 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0003 > (deleted) > java 15034 nbasjes 551r REG 253,17 66219695 > 252 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0007 > (deleted) > java 15034 nbasjes 552r REG 253,17 66219695 > 267 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0012 > (deleted) > java 15034 nbasjes 553r REG 253,17 66219695 > 250 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0005 > (deleted) > java 15034 nbasjes 554r REG 253,17 66219695 > 288 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0018 > (deleted) > java 15034 nbasjes 555r REG 253,17 66219695 > 298 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0025 > (deleted) > java 15034 nbasjes 557r REG 253,17 66219695 > 254 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0008 > (deleted) > java 15034 nbasjes 558r REG 253,17 66219695 > 292 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0019 > (deleted) > java 15034 nbasjes 559r REG 253,17 66219695 > 275 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0013 > (deleted) > java 15034 nbasjes 560r REG 253,17 66219695 > 159 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0002 > (deleted) > java 15034 nbasjes 562r REG 253,17 66219695 > 238 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0001 > (deleted) > java 15034 nbasjes 568r REG 253,17 66219695 > 246 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0004 > (deleted) > java 15034 nbasjes 569r REG 253,17 66219695 > 255 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0009 > (deleted) > java 15034 nbasjes 571r REG 253,17 66219695 > 299 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0026 > (deleted) > java 15034 nbasjes 572r REG 253,17 66219695 > 293 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0020 > (deleted) > java 15034 nbasjes 574r REG 253,17 66219695 > 256 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0010 > (deleted) > java 15034 nbasjes 575r REG 253,17 66219695 > 302 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0029 > (deleted) > java 15034 nbasjes 576r REG 253
[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input
[ https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15445795#comment-15445795 ] Robert Metzger commented on FLINK-3679: --- Two users were affected by this recently: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Handle-deserialization-error-td8724.html#a8725 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Handling-Kafka-DeserializationSchema-exceptions-td8700.html I think we need to fix this issue. > DeserializationSchema should handle zero or more outputs for every input > > > Key: FLINK-3679 > URL: https://issues.apache.org/jira/browse/FLINK-3679 > Project: Flink > Issue Type: Bug > Components: DataStream API, Kafka Connector >Reporter: Jamie Grier > > There are a couple of issues with the DeserializationSchema API that I think > should be improved. This request has come to me via an existing Flink user. > The main issue is simply that the API assumes that there is a one-to-one > mapping between input and outputs. In reality there are scenarios where one > input message (say from Kafka) might actually map to zero or more logical > elements in the pipeline. > Particularly important here is the case where you receive a message from a > source (such as Kafka) and say the raw bytes don't deserialize properly. > Right now the only recourse is to throw IOException and therefore fail the > job. > This is definitely not good since bad data is a reality and failing the job > is not the right option. If the job fails we'll just end up replaying the > bad data and the whole thing will start again. > Instead in this case it would be best if the user could just return the empty > set. > The other case is where one input message should logically be multiple output > messages. This case is probably less important since there are other ways to > do this but in general it might be good to make the > DeserializationSchema.deserialize() method return a collection rather than a > single element. > Maybe we need to support a DeserializationSchema variant that has semantics > more like that of FlatMap. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4496) Refactor the TimeServiceProvider to take a Trigerable instead of a Runnable.
[ https://issues.apache.org/jira/browse/FLINK-4496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15445978#comment-15445978 ] Robert Metzger commented on FLINK-4496: --- What's the reasoning behind this change? > Refactor the TimeServiceProvider to take a Trigerable instead of a Runnable. > > > Key: FLINK-4496 > URL: https://issues.apache.org/jira/browse/FLINK-4496 > Project: Flink > Issue Type: Sub-task >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1914) Wrong FS while starting YARN session without correct HADOOP_HOME
[ https://issues.apache.org/jira/browse/FLINK-1914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15448470#comment-15448470 ] Robert Metzger commented on FLINK-1914: --- Thank you for your interest in contributing to Flink. I'm not sure if this issue is still valid, because the YARN code has been refactored a lot since April 2015. There were not many users complaining about this recently, so if its okay for you, I would rather vote to close this issue. > Wrong FS while starting YARN session without correct HADOOP_HOME > > > Key: FLINK-1914 > URL: https://issues.apache.org/jira/browse/FLINK-1914 > Project: Flink > Issue Type: Bug > Components: YARN Client >Reporter: Zoltán Zvara >Priority: Trivial > Labels: yarn, yarn-client > > When YARN session invoked ({{yarn-session.sh}}) without a correct > {{HADOOP_HOME}} (AM still deployed to - for example to {{0.0.0.0:8032}}), but > the deployed AM fails with an {{IllegalArgumentException}}: > {code} > java.lang.IllegalArgumentException: Wrong FS: > file:/home/.../flink-dist-0.9-SNAPSHOT.jar, expected: hdfs://localhost:9000 > at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:642) > at > org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:181) > at > org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:92) > at > org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1106) > at > org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1102) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1102) > at org.apache.flink.yarn.Utils.registerLocalResource(Utils.java:105) > at > org.apache.flink.yarn.ApplicationMasterActor$$anonfun$org$apache$flink$yarn$ApplicationMasterActor$$startYarnSession$2.apply(ApplicationMasterActor.scala:436) > at > org.apache.flink.yarn.ApplicationMasterActor$$anonfun$org$apache$flink$yarn$ApplicationMasterActor$$startYarnSession$2.apply(ApplicationMasterActor.scala:371) > at scala.util.Try$.apply(Try.scala:161) > at > org.apache.flink.yarn.ApplicationMasterActor$class.org$apache$flink$yarn$ApplicationMasterActor$$startYarnSession(ApplicationMasterActor.scala:371) > at > org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:155) > at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94) > {code} > IMO this {{IllegalArgumentException}} should get handled in > {{org.apache.flink.yarn.Utils.registerLocalResource}} or on an upper level to > provide a better error message. This needs to be looked up from YARN logs at > the moment, which is painful to a trivial mistake like missing > {{HADOOP_HOME}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4555) Explicitly kill TaskManager on YARN when ApplicationMaster is shutting down
Robert Metzger created FLINK-4555: - Summary: Explicitly kill TaskManager on YARN when ApplicationMaster is shutting down Key: FLINK-4555 URL: https://issues.apache.org/jira/browse/FLINK-4555 Project: Flink Issue Type: Bug Components: YARN Client Affects Versions: 1.1.0 Reporter: Robert Metzger Priority: Critical It seems that Flink is not explicitly destroying the TaskManager JVM when the ApplicationMaster is shutting down (when the YARN application is stopping). Since this was once in Flink (in 1.0.x) we should add a test case to ensure this feature stays in the code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3930) Implement Service-Level Authorization
[ https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-3930: -- Component/s: Security > Implement Service-Level Authorization > - > > Key: FLINK-3930 > URL: https://issues.apache.org/jira/browse/FLINK-3930 > Project: Flink > Issue Type: New Feature > Components: Security >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Service-level authorization is the initial authorization mechanism to ensure > clients (or servers) connecting to the Flink cluster are authorized to do so. > The purpose is to prevent a cluster from being used by an unauthorized > user, whether to execute jobs, disrupt cluster functionality, or gain access > to secrets stored within the cluster. > Implement service-level authorization as described in the design doc. > - Introduce a shared secret cookie > - Enable Akka security cookie > - Implement data transfer authentication > - Secure the web dashboard -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4569) JobRetrievalITCase.testJobRetrieval() does not forward exceptions to parent thread.
Robert Metzger created FLINK-4569: - Summary: JobRetrievalITCase.testJobRetrieval() does not forward exceptions to parent thread. Key: FLINK-4569 URL: https://issues.apache.org/jira/browse/FLINK-4569 Project: Flink Issue Type: Bug Components: Client Reporter: Robert Metzger The mentioned test seems to fail frequently, without being detected, because the Assert.fail() is called in a separate thread which doesn't forward exceptions. https://s3.amazonaws.com/archive.travis-ci.org/jobs/156177995/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4513) Kafka connector documentation refers to Flink 1.1-SNAPSHOT
[ https://issues.apache.org/jira/browse/FLINK-4513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4513: -- Fix Version/s: (was: 1.1.2) 1.1.3 > Kafka connector documentation refers to Flink 1.1-SNAPSHOT > -- > > Key: FLINK-4513 > URL: https://issues.apache.org/jira/browse/FLINK-4513 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.1.1 >Reporter: Fabian Hueske >Priority: Trivial > Fix For: 1.1.3 > > > The Kafka connector documentation: > https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/connectors/kafka.html > of Flink 1.1 refers to a Flink 1.1-SNAPSHOT Maven version. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4396) GraphiteReporter class not found at startup of jobmanager
[ https://issues.apache.org/jira/browse/FLINK-4396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4396: -- Fix Version/s: (was: 1.1.2) 1.1.3 > GraphiteReporter class not found at startup of jobmanager > - > > Key: FLINK-4396 > URL: https://issues.apache.org/jira/browse/FLINK-4396 > Project: Flink > Issue Type: Improvement > Components: Build System, Metrics >Affects Versions: 1.1.1 > Environment: Windows and Unix >Reporter: RWenden > Fix For: 1.1.3 > > Original Estimate: 4h > Remaining Estimate: 4h > > For Flink 1.1.1 we configured Graphite metrics settings on the > flink-conf.yaml (for job manager (and taskmanager)). > We see the following error in the log: > 2016-08-15 14:20:34,167 ERROR org.apache.flink.runtime.metrics.MetricRegistry > - Could not instantiate metrics reportermy_reporter. Metrics > might not be exposed/reported. > java.lang.ClassNotFoundException: > org.apache.flink.metrics.graphite.GraphiteReporter > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:264) > at > org.apache.flink.runtime.metrics.MetricRegistry.(MetricRegistry.java:119) > We found out that this class is not packaged inside flink-dist_2.11-1.1.1.jar. > Long story short: we had to install/provide the following jars into the lib > folder to make Graphite metrics to work: > flink-metrics-graphite-1.1.1.jar > flink-metrics-dropwizard-1.1.1.jar > metrics-graphite-3.1.0.jar (from dropwizard) > We think these libraries (and the ones for Ganglia,StatsD,...) should be > included in flink-dist_2.11-1.1.1.jar, for these are needed at manager > startup time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4329: -- Fix Version/s: (was: 1.1.2) 1.1.3 > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4479) Replace trademark (tm) with registered trademark (R) sign on Flink website
[ https://issues.apache.org/jira/browse/FLINK-4479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4479. --- Resolution: Fixed Resolved with https://github.com/apache/flink-web/commit/f2053d28b12390539275af8d3a6e14941baeba98 > Replace trademark (tm) with registered trademark (R) sign on Flink website > -- > > Key: FLINK-4479 > URL: https://issues.apache.org/jira/browse/FLINK-4479 > Project: Flink > Issue Type: Bug > Components: Project Website >Reporter: Robert Metzger >Assignee: Robert Metzger > > Flink is now a registered trademark, so we should reflect that on our website. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3427) Add watermark monitoring to JobManager web frontend
[ https://issues.apache.org/jira/browse/FLINK-3427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15466863#comment-15466863 ] Robert Metzger commented on FLINK-3427: --- Hey [~ivan.mushketyk], how do you plan to visualize the watermarks in the front end? > Add watermark monitoring to JobManager web frontend > --- > > Key: FLINK-3427 > URL: https://issues.apache.org/jira/browse/FLINK-3427 > Project: Flink > Issue Type: Improvement > Components: Streaming, Webfrontend >Reporter: Robert Metzger >Assignee: Ivan Mushketyk > > Currently, its quite hard to figure out issues with the watermarks. > I think we can improve the situation by reporting the following information > through the metrics system: > - Report the current low watermark for each operator (this way, you can see > if one operator is preventing the watermarks to rise) > - Report the number of events arrived after the low watermark (users can see > how accurate the watermarks are) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3427) Add watermark monitoring to JobManager web frontend
[ https://issues.apache.org/jira/browse/FLINK-3427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15468520#comment-15468520 ] Robert Metzger commented on FLINK-3427: --- Afaik there is already a gauge for getting the low watermark. > Add watermark monitoring to JobManager web frontend > --- > > Key: FLINK-3427 > URL: https://issues.apache.org/jira/browse/FLINK-3427 > Project: Flink > Issue Type: Improvement > Components: Streaming, Webfrontend >Reporter: Robert Metzger >Assignee: Ivan Mushketyk > > Currently, its quite hard to figure out issues with the watermarks. > I think we can improve the situation by reporting the following information > through the metrics system: > - Report the current low watermark for each operator (this way, you can see > if one operator is preventing the watermarks to rise) > - Report the number of events arrived after the low watermark (users can see > how accurate the watermarks are) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3427) Add watermark monitoring to JobManager web frontend
[ https://issues.apache.org/jira/browse/FLINK-3427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15468531#comment-15468531 ] Robert Metzger commented on FLINK-3427: --- [~ivan.mushketyk] did you already start working on this issue? I'm asking because I have somebody who wanted to implement this soon, and we have some concrete plans how to do it. If you're already half-way through with the implementation, you can get the task and finish it. However, if you did not start yet, I'm wondering whether it would be okay for you if I can take the task? I'm sorry that I didn't see your comment three days ago earlier. > Add watermark monitoring to JobManager web frontend > --- > > Key: FLINK-3427 > URL: https://issues.apache.org/jira/browse/FLINK-3427 > Project: Flink > Issue Type: Improvement > Components: Streaming, Webfrontend >Reporter: Robert Metzger >Assignee: Ivan Mushketyk > > Currently, its quite hard to figure out issues with the watermarks. > I think we can improve the situation by reporting the following information > through the metrics system: > - Report the current low watermark for each operator (this way, you can see > if one operator is preventing the watermarks to rise) > - Report the number of events arrived after the low watermark (users can see > how accurate the watermarks are) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4590) Some Table API tests are failing when debug lvl is set to DEBUG
Robert Metzger created FLINK-4590: - Summary: Some Table API tests are failing when debug lvl is set to DEBUG Key: FLINK-4590 URL: https://issues.apache.org/jira/browse/FLINK-4590 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.2.0 Reporter: Robert Metzger For debugging another issue, I've set the log level on travis to DEBUG. After that, the Table API tests started failing {code} Failed tests: SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while applying rule DataSetScanRule SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while applying rule DataSetScanRule SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while applying rule DataSetScanRule SetOperatorsITCase.testMinusAll:156 Internal error: Error occurred while applying rule DataSetScanRule SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error occurred while applying rule DataSetScanRule SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error occurred while applying rule DataSetScanRule SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error occurred while applying rule DataSetScanRule SetOperatorsITCase.testMinusDifferentFieldNames:204 Internal error: Error occurred while applying rule DataSetScanRule SetOperatorsITCase.testMinus:175 Internal error: Error occurred while applying rule DataSetScanRule SetOperatorsITCase.testMinus:175 Internal error: Error occurred while applying rule DataSetScanRule SetOperatorsITCase.testMinus:175 Internal error: Error occurred while applying rule DataSetScanRule SetOperatorsITCase.testMinus:175 Internal error: Error occurred while applying rule DataSetScanRule {code} Probably Calcite is executing additional assertions depending on the debug level. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4585) Fix broken links in index.md
[ https://issues.apache.org/jira/browse/FLINK-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4585. --- Resolution: Fixed Resolved in http://git-wip-us.apache.org/repos/asf/flink-web/commit/bb6d820f > Fix broken links in index.md > > > Key: FLINK-4585 > URL: https://issues.apache.org/jira/browse/FLINK-4585 > Project: Flink > Issue Type: Bug > Components: Project Website >Reporter: Alexander Pivovarov >Priority: Minor > > The following links are broken > DataSet API > link: > http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html > correct link: > https://ci.apache.org/projects/flink/flink-docs-master/dev/batch/index.html > Table API > link: > http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/table.html > correct link: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table_api.html > Gelly > link: > http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/gelly.html > correct link: > https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/gelly/index.html > The following links show "Page 'X' Has Moved to" for 1-2 sec and then > redirect to another page > DataStream API > link: > http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html > redirects-to: > https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_api.html > programming guide > link: > http://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html > redirects-to DataSet API: > https://ci.apache.org/projects/flink/flink-docs-master/dev/batch/index.html > probably it should be "Basic API Concepts" > https://ci.apache.org/projects/flink/flink-docs-master/dev/api_concepts.html > or Quick Start - > https://ci.apache.org/projects/flink/flink-docs-master/quickstart/setup_quickstart.html > CEP > link: > http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html > redirects-to: > https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html > ML > link: > http://ci.apache.org/projects/flink/flink-docs-master/apis/batch/libs/ml/index.html > redirects-to: > https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/ml/index.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4634) TaskStopTest.testStopExecution() times out
Robert Metzger created FLINK-4634: - Summary: TaskStopTest.testStopExecution() times out Key: FLINK-4634 URL: https://issues.apache.org/jira/browse/FLINK-4634 Project: Flink Issue Type: Bug Components: TaskManager Reporter: Robert Metzger The mentioned test failed in: https://s3.amazonaws.com/archive.travis-ci.org/jobs/160407887/log.txt {code} testStopExecution(org.apache.flink.runtime.taskmanager.TaskStopTest) Time elapsed: 10.619 sec <<< ERROR! java.lang.Exception: test timed out after 1 milliseconds at org.junit.internal.runners.MethodRoadie$1.run(MethodRoadie.java:77) at org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:96) at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.executeTest(PowerMockJUnit44RunnerDelegateImpl.java:294) at org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner.executeTestInSuper(PowerMockJUnit47RunnerDelegateImpl.java:127) at org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner.executeTest(PowerMockJUnit47RunnerDelegateImpl.java:82) at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.runBeforesThenTestThenAfters(PowerMockJUnit44RunnerDelegateImpl.java:282) at org.junit.internal.runners.MethodRoadie.runWithTimeout(MethodRoadie.java:57) at org.junit.internal.runners.MethodRoadie.run(MethodRoadie.java:47) at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.invokeTestMethod(PowerMockJUnit44RunnerDelegateImpl.java:207) at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.runMethods(PowerMockJUnit44RunnerDelegateImpl.java:146) at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$1.run(PowerMockJUnit44RunnerDelegateImpl.java:120) at org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:33) at org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:45) at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.run(PowerMockJUnit44RunnerDelegateImpl.java:118) at org.powermock.modules.junit4.common.internal.impl.JUnit4TestSuiteChunkerImpl.run(JUnit4TestSuiteChunkerImpl.java:104) at org.powermock.modules.junit4.common.internal.impl.AbstractCommonPowerMockRunner.run(AbstractCommonPowerMockRunner.java:53) at org.powermock.modules.junit4.PowerMockRunner.run(PowerMockRunner.java:53) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5998) Un-fat Hadoop from Flink fat jar
[ https://issues.apache.org/jira/browse/FLINK-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904837#comment-15904837 ] Robert Metzger commented on FLINK-5998: --- [~wheat9] Thanks a lot for assigning that issue to you. If we are lucky, this change is relatively easy to do and requires just some changes on the shading / assembly plugin. Please let me know if there's anything that's unclear so that we can discuss it. > Un-fat Hadoop from Flink fat jar > > > Key: FLINK-5998 > URL: https://issues.apache.org/jira/browse/FLINK-5998 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Robert Metzger >Assignee: Haohui Mai > > As a first step towards FLINK-2268, I would suggest to put all hadoop > dependencies into a jar separate from Flink's fat jar. > This would allow users to put a custom Hadoop jar in there, or even deploy > Flink without a Hadoop fat jar at all in environments where Hadoop is > provided (EMR). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6022) Improve support for Avro GenericRecord
Robert Metzger created FLINK-6022: - Summary: Improve support for Avro GenericRecord Key: FLINK-6022 URL: https://issues.apache.org/jira/browse/FLINK-6022 Project: Flink Issue Type: Improvement Components: Type Serialization System Reporter: Robert Metzger Currently, Flink is serializing the schema for each Avro GenericRecord in the stream. This leads to a lot of overhead over the wire/disk + high serialization costs. Therefore, I'm proposing to improve the support for GenericRecord in Flink by shipping the schema to each serializer through the AvroTypeInformation. Then, we can only support GenericRecords with the same type per stream, but the performance will be much better. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904920#comment-15904920 ] Robert Metzger commented on FLINK-6022: --- I think protobuf always works with generated serializers with a fixed schema (that's the common case for Avro as well), so I don't think there's a need to add support there. For Thrift I don't know. I haven't heard any complaints for Thrift and Protobuf. For Avro this issue has come up with at least two users. > Improve support for Avro GenericRecord > -- > > Key: FLINK-6022 > URL: https://issues.apache.org/jira/browse/FLINK-6022 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Robert Metzger > > Currently, Flink is serializing the schema for each Avro GenericRecord in the > stream. > This leads to a lot of overhead over the wire/disk + high serialization costs. > Therefore, I'm proposing to improve the support for GenericRecord in Flink by > shipping the schema to each serializer through the AvroTypeInformation. > Then, we can only support GenericRecords with the same type per stream, but > the performance will be much better. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6031) Add parameter for per job yarn clusters to control whether the user code jar is included into the system classloader.
Robert Metzger created FLINK-6031: - Summary: Add parameter for per job yarn clusters to control whether the user code jar is included into the system classloader. Key: FLINK-6031 URL: https://issues.apache.org/jira/browse/FLINK-6031 Project: Flink Issue Type: Bug Components: YARN Reporter: Robert Metzger Priority: Critical FLINK-4913 added the user jar into the system classloader, when starting a Flink per job YARN cluster. Some users were experiencing issues with the changed behavior. I suggest to introduce a new yarn specific configuration parameter (for the flink-conf.yaml file) to control if the user jar is added into system classloader. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6022) Improve support for Avro GenericRecord
[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924180#comment-15924180 ] Robert Metzger commented on FLINK-6022: --- There is actually a way to registering anything serializable with the execution config, the "setGlobalJobParameters(GlobalJobParameters params)". The main use case for that is showing the job parameters in the web frontend (the ParameterTool has support for that as well). Also, the GlobalJobParameters are accessible everywhere in the user code (when using the Rich* variants). Having said all this, I would NOT recommend using the GlobalJobParameters for the Avro serializer. The much more appropriate place for shipping some serialized data (that is specific to a serializer) from the user APIs to the cluster are the TypeInformations. By putting the schema of the generic records into the {{AvroTypeInfo}} (or something similar for GenericAvroRecords), you'll have the schema available on all serializers. > Improve support for Avro GenericRecord > -- > > Key: FLINK-6022 > URL: https://issues.apache.org/jira/browse/FLINK-6022 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Robert Metzger > > Currently, Flink is serializing the schema for each Avro GenericRecord in the > stream. > This leads to a lot of overhead over the wire/disk + high serialization costs. > Therefore, I'm proposing to improve the support for GenericRecord in Flink by > shipping the schema to each serializer through the AvroTypeInformation. > Then, we can only support GenericRecords with the same type per stream, but > the performance will be much better. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6065) Make TransportClient for ES5 pluggable
Robert Metzger created FLINK-6065: - Summary: Make TransportClient for ES5 pluggable Key: FLINK-6065 URL: https://issues.apache.org/jira/browse/FLINK-6065 Project: Flink Issue Type: Improvement Components: ElasticSearch Connector, Streaming Connectors Reporter: Robert Metzger This JIRA is based on a user request: http://stackoverflow.com/questions/42807454/flink-xpack-elasticsearch-5-elasticsearchsecurityexception-missing-autentication?noredirect=1#comment72728053_42807454 Currently, in the {{Elasticsearch5ApiCallBridge}} the {{PreBuiltTransportClient}} is hardcoded. It would be nice to make this client pluggable to allow using other clients such as the {{PreBuiltXPackTransportClient}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5048) Kafka Consumer (0.9/0.10) threading model leads problematic cancellation behavior
[ https://issues.apache.org/jira/browse/FLINK-5048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15926820#comment-15926820 ] Robert Metzger commented on FLINK-5048: --- I'm not aware of any user on 1.1 affected by this. The change is quite involved and could potentially break existing code. Therefore, I would not backport that change. > Kafka Consumer (0.9/0.10) threading model leads problematic cancellation > behavior > - > > Key: FLINK-5048 > URL: https://issues.apache.org/jira/browse/FLINK-5048 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.1.3 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.2.0, 1.1.5 > > > The {{FLinkKafkaConsumer}} (0.9 / 0.10) spawns a separate thread that > operates the KafkaConsumer. That thread is shielded from interrupts, because > the Kafka Consumer has not been handling thread interrupts well. > Since that thread is also the thread that emits records, it may block in the > network stack (backpressure) or in chained operators. The later case leads to > situations where cancellations get very slow unless that thread would be > interrupted (which it cannot be). > I propose to change the thread model as follows: > - A spawned consumer thread pull from the KafkaConsumer and pushes its > pulled batch of records into a blocking queue (size one) > - The main thread of the task will pull the record batches from the > blocking queue and emit the records. > This allows actually for some additional I/O overlay while limiting the > additional memory consumption - only two batches are ever held, one being > fetched and one being emitted. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6084) Cassandra connector does not declare all dependencies
Robert Metzger created FLINK-6084: - Summary: Cassandra connector does not declare all dependencies Key: FLINK-6084 URL: https://issues.apache.org/jira/browse/FLINK-6084 Project: Flink Issue Type: Bug Components: Cassandra Connector Affects Versions: 1.2.0 Reporter: Robert Metzger Assignee: Robert Metzger Priority: Critical This has been reported by a user: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-2-and-Cassandra-Connector-td11546.html The cassandra client defines metrics-core as a dependency, but the shading is dropping the dependency when building the dependency reduced pom. To resolve the issue, we need to add the following line into the shading config of the cassandra module: true This makes the metrics dependency appear again in the dep red pom. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (FLINK-6121) Include TravisCI build status to README.md
[ https://issues.apache.org/jira/browse/FLINK-6121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-6121. --- Resolution: Duplicate Assignee: (was: Bowen Li) Fix Version/s: (was: 1.2.1) (was: 1.3.0) Close as duplicate. > Include TravisCI build status to README.md > -- > > Key: FLINK-6121 > URL: https://issues.apache.org/jira/browse/FLINK-6121 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.2.0 >Reporter: Bowen Li >Priority: Minor > > Add TravisCI build status to README in github repo. Expectation is to have > something like https://github.com/apache/incubator-airflow -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6122) add TravisCI build status to README.md
[ https://issues.apache.org/jira/browse/FLINK-6122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15932329#comment-15932329 ] Robert Metzger commented on FLINK-6122: --- I'm not sure if we want to merge this change. We once had the travis build status badge in our readme, but decided to remove it, because it often shows "Build failed" due to travis issues etc. This gives people the impression that our builds are very unstable . > add TravisCI build status to README.md > -- > > Key: FLINK-6122 > URL: https://issues.apache.org/jira/browse/FLINK-6122 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.2.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Minor > Fix For: 1.3.0, 1.2.1 > > > Add TravisCI build status to README in github repo. Expectation is to have > something like https://github.com/apache/incubator-airflow -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6125) Commons httpclient is not shaded anymore in Flink 1.2
Robert Metzger created FLINK-6125: - Summary: Commons httpclient is not shaded anymore in Flink 1.2 Key: FLINK-6125 URL: https://issues.apache.org/jira/browse/FLINK-6125 Project: Flink Issue Type: Bug Components: Build System, Kinesis Connector Reporter: Robert Metzger Priority: Critical This has been reported by a user: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Return-of-Flink-shading-problems-in-1-2-0-td12257.html The Kinesis connector requires Flink to not expose any httpclient dependencies. Since Flink 1.2 it seems that we are exposing that dependency again -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6125) Commons httpclient is not shaded anymore in Flink 1.2
[ https://issues.apache.org/jira/browse/FLINK-6125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger reassigned FLINK-6125: - Assignee: Robert Metzger > Commons httpclient is not shaded anymore in Flink 1.2 > - > > Key: FLINK-6125 > URL: https://issues.apache.org/jira/browse/FLINK-6125 > Project: Flink > Issue Type: Bug > Components: Build System, Kinesis Connector >Reporter: Robert Metzger >Assignee: Robert Metzger >Priority: Critical > > This has been reported by a user: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Return-of-Flink-shading-problems-in-1-2-0-td12257.html > The Kinesis connector requires Flink to not expose any httpclient > dependencies. Since Flink 1.2 it seems that we are exposing that dependency > again -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5998) Un-fat Hadoop from Flink fat jar
[ https://issues.apache.org/jira/browse/FLINK-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15932403#comment-15932403 ] Robert Metzger commented on FLINK-5998: --- [~wheat9] what's the status on this issue? > Un-fat Hadoop from Flink fat jar > > > Key: FLINK-5998 > URL: https://issues.apache.org/jira/browse/FLINK-5998 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Robert Metzger >Assignee: Haohui Mai > > As a first step towards FLINK-2268, I would suggest to put all hadoop > dependencies into a jar separate from Flink's fat jar. > This would allow users to put a custom Hadoop jar in there, or even deploy > Flink without a Hadoop fat jar at all in environments where Hadoop is > provided (EMR). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6020) Blob Server cannot hanlde multiple job sumits(with same content) parallelly
[ https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger reassigned FLINK-6020: - Assignee: Robert Metzger (was: Tao Wang) > Blob Server cannot hanlde multiple job sumits(with same content) parallelly > --- > > Key: FLINK-6020 > URL: https://issues.apache.org/jira/browse/FLINK-6020 > Project: Flink > Issue Type: Bug >Reporter: Tao Wang >Assignee: Robert Metzger >Priority: Critical > > In yarn-cluster mode, if we submit one same job multiple times parallelly, > the task will encounter class load problem and lease occuputation. > Because blob server stores user jars in name with generated sha1sum of those, > first writes a temp file and move it to finalialize. For recovery it also > will put them to HDFS with same file name. > In same time, when multiple clients sumit same job with same jar, the local > jar files in blob server and those file on hdfs will be handled in multiple > threads(BlobServerConnection), and impact each other. > It's better to have a way to handle this, now two ideas comes up to my head: > 1. lock the write operation, or > 2. use some unique identifier as file name instead of ( or added up to) > sha1sum of the file contents. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6020) Blob Server cannot hanlde multiple job sumits(with same content) parallelly
[ https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-6020: -- Component/s: Distributed Coordination > Blob Server cannot hanlde multiple job sumits(with same content) parallelly > --- > > Key: FLINK-6020 > URL: https://issues.apache.org/jira/browse/FLINK-6020 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Reporter: Tao Wang >Assignee: Robert Metzger >Priority: Critical > > In yarn-cluster mode, if we submit one same job multiple times parallelly, > the task will encounter class load problem and lease occuputation. > Because blob server stores user jars in name with generated sha1sum of those, > first writes a temp file and move it to finalialize. For recovery it also > will put them to HDFS with same file name. > In same time, when multiple clients sumit same job with same jar, the local > jar files in blob server and those file on hdfs will be handled in multiple > threads(BlobServerConnection), and impact each other. > It's better to have a way to handle this, now two ideas comes up to my head: > 1. lock the write operation, or > 2. use some unique identifier as file name instead of ( or added up to) > sha1sum of the file contents. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6020) Blob Server cannot hanlde multiple job sumits(with same content) parallelly
[ https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger reassigned FLINK-6020: - Assignee: Tao Wang (was: Robert Metzger) > Blob Server cannot hanlde multiple job sumits(with same content) parallelly > --- > > Key: FLINK-6020 > URL: https://issues.apache.org/jira/browse/FLINK-6020 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Reporter: Tao Wang >Assignee: Tao Wang >Priority: Critical > > In yarn-cluster mode, if we submit one same job multiple times parallelly, > the task will encounter class load problem and lease occuputation. > Because blob server stores user jars in name with generated sha1sum of those, > first writes a temp file and move it to finalialize. For recovery it also > will put them to HDFS with same file name. > In same time, when multiple clients sumit same job with same jar, the local > jar files in blob server and those file on hdfs will be handled in multiple > threads(BlobServerConnection), and impact each other. > It's better to have a way to handle this, now two ideas comes up to my head: > 1. lock the write operation, or > 2. use some unique identifier as file name instead of ( or added up to) > sha1sum of the file contents. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6020) Blob Server cannot handle multiple job submits (with same content) parallelly
[ https://issues.apache.org/jira/browse/FLINK-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-6020: -- Summary: Blob Server cannot handle multiple job submits (with same content) parallelly (was: Blob Server cannot hanlde multiple job sumits(with same content) parallelly) > Blob Server cannot handle multiple job submits (with same content) parallelly > - > > Key: FLINK-6020 > URL: https://issues.apache.org/jira/browse/FLINK-6020 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Reporter: Tao Wang >Assignee: Tao Wang >Priority: Critical > > In yarn-cluster mode, if we submit one same job multiple times parallelly, > the task will encounter class load problem and lease occuputation. > Because blob server stores user jars in name with generated sha1sum of those, > first writes a temp file and move it to finalialize. For recovery it also > will put them to HDFS with same file name. > In same time, when multiple clients sumit same job with same jar, the local > jar files in blob server and those file on hdfs will be handled in multiple > threads(BlobServerConnection), and impact each other. > It's better to have a way to handle this, now two ideas comes up to my head: > 1. lock the write operation, or > 2. use some unique identifier as file name instead of ( or added up to) > sha1sum of the file contents. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5931) Make Flink highly available even if defaultFS is unavailable
[ https://issues.apache.org/jira/browse/FLINK-5931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-5931: -- Component/s: Distributed Coordination > Make Flink highly available even if defaultFS is unavailable > > > Key: FLINK-5931 > URL: https://issues.apache.org/jira/browse/FLINK-5931 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Reporter: Haohui Mai >Assignee: Haohui Mai > > In order to use Flink in mission-critical environments, Flink must be > available even if the {{defaultFS}} is unavailable. > We have deployed HDFS in HA mode in our production environment. In our > experience we have experienced performance degradations and downtime when the > HDFS cluster is being expanded or under maintenances. Under this case it is > desirable to deploy jobs through alternative filesystem (e.g., S3). > This jira is to track the improvements to Flink to enable Flink to continue > to operate even {{defaultFS}} is unavailable. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5685) Connection leak in Taskmanager
[ https://issues.apache.org/jira/browse/FLINK-5685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-5685: -- Component/s: Distributed Coordination > Connection leak in Taskmanager > -- > > Key: FLINK-5685 > URL: https://issues.apache.org/jira/browse/FLINK-5685 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Reporter: Andrey >Priority: Critical > > Steps to reproduce: > * setup cluster with the following configuration: 1 job manager, 2 task > managers > * job manager starts rejecting connection attempts from task manager. > {code} > 2017-01-30 03:24:42,908 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Trying to > register at JobManager akka.tcp://flink@ip:6123/user/jobmanager (attempt > 4326, timeout: 30 seconds) > 2017-01-30 03:24:42,913 WARN Remoting > - Tried to associate with unreachable remote address > [akka.tcp://flink@ip:6123]. Address is now gated for 5000 ms, all messages to > this > address will be delivered to dead letters. Reason: The remote system has > quarantined this system. No further associations to the remote system are > possible until this system is restarted. > {code} > * task manager tries multiple times. (looks like it doens't close connection > after failure) > * job manager unable to process any messages. In logs: > {code} > 2017-01-30 03:25:12,932 WARN > org.jboss.netty.channel.socket.nio.AbstractNioSelector- Failed to > accept a connection. > java.io.IOException: Too many open files > at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) > at > sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) > at > sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) > at > org.jboss.netty.channel.socket.nio.NioServerBoss.process(NioServerBoss.java:100) > at > org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) > at > org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5761) ClassNotFoundException during cancel job
[ https://issues.apache.org/jira/browse/FLINK-5761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-5761: -- Component/s: State Backends, Checkpointing > ClassNotFoundException during cancel job > > > Key: FLINK-5761 > URL: https://issues.apache.org/jira/browse/FLINK-5761 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.1.4 >Reporter: Andrey > > Steps to reproduce: > * setup flink cluster in HA mode > * submit job with rocksdb state backend and enableFullyAsyncSnapshots > * send some load to the job > * in the middle of processing cancel job using the command: ./flink cancel > > In the JobManager logs: > {code} > 2017-02-09 13:55:49,511 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping > checkpoint coordinator for job e140ad8a3deeae991a9bbe080222d3f6 > 2017-02-09 13:55:49,517 INFO > org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore - > Removed job graph e140ad8a3deeae991a9bbe080222d3f6 from ZooKeeper. > 2017-02-09 13:55:49,519 WARN > org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - > Failed to discard checkpoint 1. > java.lang.Exception: Could not discard the completed checkpoint Checkpoint 1 > @ 1486648542769 for e140ad8a3deeae991a9bbe080222d3f6. > at > org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore$1.processResult(ZooKeeperCompletedCheckpointStore.java:308) > at > org.apache.flink.shaded.org.apache.curator.framework.imps.Backgrounding$1$1.run(Backgrounding.java:109) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.contrib.streaming.state.RocksDBStateBackend$FinalFullyAsyncSnapshot > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:65) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) > at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) > at java.util.HashMap.readObject(HashMap.java:1396) > at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1714) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:291) > at > org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58) > at > org.apache.flink.runtime.checkpoint.SubtaskState.discard(SubtaskState.java:85) > at > org.apache.flink.runtime.checkpoint.TaskState.discard(TaskState.java:147) > at > org.apache.flin
[jira] [Updated] (FLINK-5724) Error in the 'Zipping Elements' docs
[ https://issues.apache.org/jira/browse/FLINK-5724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-5724: -- Component/s: DataSet API > Error in the 'Zipping Elements' docs > > > Key: FLINK-5724 > URL: https://issues.apache.org/jira/browse/FLINK-5724 > Project: Flink > Issue Type: Bug > Components: DataSet API, Python API >Reporter: Fokko Driesprong > > The tab for the Python documentation isn't working because there are two tabs > pointing at the Scala example. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5961) Queryable State is broken for HeapKeyedStateBackend
[ https://issues.apache.org/jira/browse/FLINK-5961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-5961: -- Component/s: Queryable State > Queryable State is broken for HeapKeyedStateBackend > --- > > Key: FLINK-5961 > URL: https://issues.apache.org/jira/browse/FLINK-5961 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.2.0 >Reporter: Stefan Richter > > The current implementation of queryable state on `HeapKeyedStateBackend` > attempts to handle concurrency by using `ConcurrentHashMap`s as datastructure. > However, the implementation has at least two issues: > 1) Concurrent modifications of state objects: state can be modified > concurrently to a query, e.g. an element being removed from a list. This can > result in exceptions or incorrect results. > 2) StateDescriptor is indicating whether a `ConcurrentHashMap` is required > because queryable state is active. On restore, this information is unknown at > first and the implementation always uses plain hash maps. When the state is > then finally registered, all previously existing maps are not thread-safe. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5839) Flink Security problem collection
[ https://issues.apache.org/jira/browse/FLINK-5839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-5839: -- Component/s: Security > Flink Security problem collection > - > > Key: FLINK-5839 > URL: https://issues.apache.org/jira/browse/FLINK-5839 > Project: Flink > Issue Type: Improvement > Components: Security >Reporter: shijinkui > > This issue collect some security problem found in huawei's use case. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5987) Upgrade zookeeper dependency to 3.4.8
[ https://issues.apache.org/jira/browse/FLINK-5987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-5987: -- Component/s: Build System > Upgrade zookeeper dependency to 3.4.8 > - > > Key: FLINK-5987 > URL: https://issues.apache.org/jira/browse/FLINK-5987 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Ted Yu > > zookeeper 3.4.8 has been released. > Among the fixes the following are desirable: > ZOOKEEPER-706 large numbers of watches can cause session re-establishment to > fail > ZOOKEEPER-1797 PurgeTxnLog may delete data logs during roll > This issue upgrades zookeeper dependency to 3.4.8 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6060) not exist class referance in the scala function annotation
[ https://issues.apache.org/jira/browse/FLINK-6060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-6060: -- Component/s: Scala API > not exist class referance in the scala function annotation > -- > > Key: FLINK-6060 > URL: https://issues.apache.org/jira/browse/FLINK-6060 > Project: Flink > Issue Type: Wish > Components: Scala API >Reporter: shijinkui > > TaskMessages.scala > ConnectedStreams.scala > DataStream.scala > Who can fix it? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5987) Upgrade zookeeper dependency to 3.4.8
[ https://issues.apache.org/jira/browse/FLINK-5987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15932681#comment-15932681 ] Robert Metzger commented on FLINK-5987: --- Our current zookeeper version is 3.4.6, since this is only a minor upgrade, we COULD do it. However, in general I'm against just upgrading a dependency unless there are users running into problems. > Upgrade zookeeper dependency to 3.4.8 > - > > Key: FLINK-5987 > URL: https://issues.apache.org/jira/browse/FLINK-5987 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Ted Yu > > zookeeper 3.4.8 has been released. > Among the fixes the following are desirable: > ZOOKEEPER-706 large numbers of watches can cause session re-establishment to > fail > ZOOKEEPER-1797 PurgeTxnLog may delete data logs during roll > This issue upgrades zookeeper dependency to 3.4.8 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6120) Implement heartbeat logic between JobManager and ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-6120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-6120: -- Component/s: Distributed Coordination > Implement heartbeat logic between JobManager and ResourceManager > > > Key: FLINK-6120 > URL: https://issues.apache.org/jira/browse/FLINK-6120 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Reporter: zhijiang >Assignee: zhijiang > > It is part of work for Flip-6. > The HeartbeatManager is mainly used for monitoring heartbeat target and > reporting payloads. > For {{ResourceManager}} side, it would trigger monitoring the > {{HeartbeatTarget}} when receive registration from {{JobManager}}, and > schedule a task to {{requestHeartbeat}} at interval time. If not receive > heartbeat response within duration time, the {{HeartbeatListener}} will > notify heartbeat timeout, then the {{ResourceManager}} should remove the > internal registered {{JobManager}}. > For {{JobManager}} side, it would trigger monitoring the {{HeartbeatTarget}} > when receive registration acknowledgement from {{ResourceManager}}. An it > will also be notified heartbeat timeout if not receive heartbeat request from > {{ResourceManager}} within duration time. > The current implementation will not interact payloads via heartbeat, and it > can be added if needed future. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6118) Chained operators forward watermark without checking
[ https://issues.apache.org/jira/browse/FLINK-6118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-6118: -- Component/s: DataStream API > Chained operators forward watermark without checking > > > Key: FLINK-6118 > URL: https://issues.apache.org/jira/browse/FLINK-6118 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0, 1.1.4, 1.3.0 >Reporter: Aljoscha Krettek > > For operators that read from the network we have checks in place that verify > that the input watermark only moves forwards. If an operator is directly > chained to an operator then any {{Output.emitWatermark()}} of the first > operator directly invoke {{processWatermark()}} on the chained operator, > meaning that there are no verification steps in-between. > This only becomes visible when a non-keyed, chained operator is checking the > current operator. Only keyed operators can have timers and for those the > watermark always comes form the network, i.e. it behaves correctly. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5900) Add non-partial merge Aggregates and unit tests
[ https://issues.apache.org/jira/browse/FLINK-5900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15932687#comment-15932687 ] Robert Metzger commented on FLINK-5900: --- To which component does this belong? Is it the table API? > Add non-partial merge Aggregates and unit tests > --- > > Key: FLINK-5900 > URL: https://issues.apache.org/jira/browse/FLINK-5900 > Project: Flink > Issue Type: Improvement >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Current built-in aggregates all support partial-merge. We are blind and not > sure if the non-partial aggregate works or not. We should add non-partial > merge Aggregates and unit tests. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6068) Support If() as a built-in function of TableAPI
[ https://issues.apache.org/jira/browse/FLINK-6068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-6068: -- Component/s: Table API & SQL > Support If() as a built-in function of TableAPI > --- > > Key: FLINK-6068 > URL: https://issues.apache.org/jira/browse/FLINK-6068 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Zhuoluo Yang >Assignee: Zhuoluo Yang > > Most sql system support if() as a built-in udf. However, we didn't register > the if() in the function category. A great many of our users use syntax > 'if(a, b, c)'. Also most sql systems support 'if(a, b, c)' syntax. > Mysql: > https://dev.mysql.com/doc/refman/5.7/en/control-flow-functions.html#function_if > Hive: > https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-ConditionalFunctions -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6093) Implement and turn on retraction for table sink
[ https://issues.apache.org/jira/browse/FLINK-6093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-6093: -- Component/s: Table API & SQL > Implement and turn on retraction for table sink > > > Key: FLINK-6093 > URL: https://issues.apache.org/jira/browse/FLINK-6093 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Add sink tableInsert and NeedRetract property, and consider table sink in > optimizer RetractionRule -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6094) Implement and turn on retraction for stream-stream inner join
[ https://issues.apache.org/jira/browse/FLINK-6094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-6094: -- Component/s: Table API & SQL > Implement and turn on retraction for stream-stream inner join > - > > Key: FLINK-6094 > URL: https://issues.apache.org/jira/browse/FLINK-6094 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > This includes: > Modify the RetractionRule to consider stream-stream inner join > Implement the retract process logic for join -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6090) Implement optimizer for retraction and turn on retraction for over window aggregate
[ https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-6090: -- Component/s: Table API & SQL > Implement optimizer for retraction and turn on retraction for over window > aggregate > --- > > Key: FLINK-6090 > URL: https://issues.apache.org/jira/browse/FLINK-6090 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Implement optimizer for retraction and turn on the retraction for over window > as the first prototype example: > 1.Add RetractionRule at the stage of decoration,which can derive the > replace table/append table, NeedRetraction property. > 2. Match the NeedRetraction and replace table, mark the accumulating mode; > Add the necessary retract generate function at the replace table, and add the > retract process logic at the retract consumer > 3. turn on retraction for over window aggregate -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6033) Support UNNEST query in the stream SQL API
[ https://issues.apache.org/jira/browse/FLINK-6033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-6033: -- Component/s: Table API & SQL > Support UNNEST query in the stream SQL API > -- > > Key: FLINK-6033 > URL: https://issues.apache.org/jira/browse/FLINK-6033 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Haohui Mai >Assignee: Haohui Mai > > It would be nice to support the {{UNNEST}} keyword in the stream SQL API. > The keyword is widely used in queries that relate to nested fields. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5724) Error in the 'Zipping Elements' docs
[ https://issues.apache.org/jira/browse/FLINK-5724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-5724: -- Component/s: Python API > Error in the 'Zipping Elements' docs > > > Key: FLINK-5724 > URL: https://issues.apache.org/jira/browse/FLINK-5724 > Project: Flink > Issue Type: Bug > Components: DataSet API, Python API >Reporter: Fokko Driesprong > > The tab for the Python documentation isn't working because there are two tabs > pointing at the Scala example. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6047) Add support for Retraction in Table API / SQL
[ https://issues.apache.org/jira/browse/FLINK-6047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-6047: -- Component/s: Table API & SQL > Add support for Retraction in Table API / SQL > - > > Key: FLINK-6047 > URL: https://issues.apache.org/jira/browse/FLINK-6047 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > [Design doc]: > https://docs.google.com/document/d/18XlGPcfsGbnPSApRipJDLPg5IFNGTQjnz7emkVpZlkw > [Introduction]: > "Retraction" is an important building block for data streaming to refine the > early fired results in streaming. “Early firing” are very common and widely > used in many streaming scenarios, for instance “window-less” or unbounded > aggregate and stream-stream inner join, windowed (with early firing) > aggregate and stream-stream inner join. There are mainly two cases that > require retractions: 1) update on the keyed table (the key is either a > primaryKey (PK) on source table, or a groupKey/partitionKey in an aggregate); > 2) When dynamic windows (e.g., session window) are in use, the new value may > be replacing more than one previous window due to window merging. > To the best of our knowledge, the retraction for the early fired streaming > results has never been practically solved before. In this proposal, we > develop a retraction solution and explain how it works for the problem of > “update on the keyed table”. The same solution can be easily extended for the > dynamic windows merging, as the key component of retraction - how to refine > an early fired results - is the same across different problems. > [Proposed Jiras]: > Implement decoration phase for rewriting predicated logical plan after > volcano optimization phase > Implement optimizer for retraction and turn on retraction for over window > aggregate > Implement and turn on the retraction for grouping window aggregate > Implement and turn on retraction for table source > Implement and turn on retraction for table sink > Implement and turn on retraction for stream-stream inner join > Implement the retraction for the early firing window > Implement the retraction for the dynamic window with early firing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6092) Implement and turn on retraction for table source
[ https://issues.apache.org/jira/browse/FLINK-6092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-6092: -- Component/s: Table API & SQL > Implement and turn on retraction for table source > -- > > Key: FLINK-6092 > URL: https://issues.apache.org/jira/browse/FLINK-6092 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Add the Primary Key and replace/append properties for table source, and > consider table source in optimizer RetractionRule -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6091) Implement and turn on the retraction for grouping window aggregate
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-6091: -- Component/s: Table API & SQL > Implement and turn on the retraction for grouping window aggregate > -- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > Implement the functions for processing retract message for grouping window > aggregate. No retract generating function needed as for now, as the current > grouping window aggregates are all executed at “without early firing mode”. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6103) LocalFileSystem rename() uses File.renameTo()
[ https://issues.apache.org/jira/browse/FLINK-6103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-6103: -- Component/s: Local Runtime > LocalFileSystem rename() uses File.renameTo() > - > > Key: FLINK-6103 > URL: https://issues.apache.org/jira/browse/FLINK-6103 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Reporter: Flavio Pompermaier > Labels: filesystem > > I've tried to move a directory to another on the LocalFilesystem and it > doesn't work (in my case fs is an instance of java.io.UnixFileSystem). > As for Flink-1840 (there was a PR to fix the issue - > https://github.com/apache/flink/pull/578) the problem is that > {{File.renameTo()}} is not reliable. > Indeed, the Javadoc says: > bq. Renames the file denoted by this abstract pathname. Many aspects of the > behavior of this method are inherently platform-dependent: The rename > operation might not be able to move a file from one filesystem to another, it > might not be atomic, and it might not succeed if a file with the destination > abstract pathname already exists. The return value should always be checked > to make sure that the rename operation was successful. Note that the > java.nio.file.Files class defines the move method to move or rename a file in > a platform independent manner -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6083) [TaskManager] Support readiness/liveness probes
[ https://issues.apache.org/jira/browse/FLINK-6083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-6083: -- Component/s: Distributed Coordination > [TaskManager] Support readiness/liveness probes > --- > > Key: FLINK-6083 > URL: https://issues.apache.org/jira/browse/FLINK-6083 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination >Affects Versions: 1.2.0, 1.3.0 >Reporter: Andrey > > Currently there is no way to tell if TaskManager is operating or not. > Operating means "its connected to JobManager and ready to process requests". > TaskManager should provide metric "jobManagerConnection": > * 0 - disconnected > * 1 - connected > Or any other API which could expose connection status. > Expected usage: > * run task manager in managed environment > * configure readiness and liveness probes (periodically poll jmx or REST) > * restart task manager if liveness probe returns 0 (disconnected) -- This message was sent by Atlassian JIRA (v6.3.15#6346)