[jira] [Created] (FLINK-11568) Exception in Kinesis ShardConsumer hidden by InterruptedException
Shannon Carey created FLINK-11568: - Summary: Exception in Kinesis ShardConsumer hidden by InterruptedException Key: FLINK-11568 URL: https://issues.apache.org/jira/browse/FLINK-11568 Project: Flink Issue Type: Improvement Components: Kinesis Connector Affects Versions: 1.6.2 Reporter: Shannon Carey Assignee: Shannon Carey When the Kinesis ShardConsumer encounters an exception, for example due to a problem in the Deserializer, the root cause exception is often hidden by a non-informative InterruptedException caused by the FlinkKinesisConsumer thread being interrupted. Ideally, the root cause exception would be preserved and thrown so that the logs contain enough information to diagnose the issue. This probably affects all versions. Here's an example of a log message with the unhelpful InterruptedException: {code:java} 2019-02-05 13:29:31:383 thread=Source: Custom Source -> Filter -> Map -> Sink: Unnamed (1/8), level=WARN, logger=org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer, message="Error while closing Kinesis data fetcher" java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.awaitTermination(KinesisDataFetcher.java:450) at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:314) at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:323) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) {code} And here's an example of the real exception that we're actually interested in, which is stored inside KinesisDataFetcher#error, but is not thrown or logged: {code:java} org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:416) org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290) org.apache.avro.io.parsing.Parser.advance(Parser.java:88) org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267) org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178) org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152) org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240) org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230) org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174) org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152) org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144) org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:135) org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper.deserialize(KinesisDeserializationSchemaWrapper.java:44) org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:332) org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:231) java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) java.util.concurrent.FutureTask.run(FutureTask.java) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-6805) Flink Cassandra connector dependency on Netty disagrees with Flink
Shannon Carey created FLINK-6805: Summary: Flink Cassandra connector dependency on Netty disagrees with Flink Key: FLINK-6805 URL: https://issues.apache.org/jira/browse/FLINK-6805 Project: Flink Issue Type: Bug Components: Cassandra Connector Affects Versions: 1.2.1, 1.3.0 Reporter: Shannon Carey The Flink Cassandra connector has a dependency on Netty libraries (via promotion of transitive dependencies by the Maven shade plugin) at version 4.0.33.Final, which disagrees with the version included in Flink of 4.0.27.Final which is included & managed by the parent POM via dependency on netty-all. Due to use of netty-all, the dependency management doesn't take effect on the individual libraries such as netty-handler, netty-codec, etc. I suggest that dependency management of Netty should be added for all Netty libraries individually (netty-handler, etc.) so that all Flink modules use the same version, and similarly I suggest that exclusions be added to the quickstart example POM for the individual Netty libraries so that fat JARs don't include conflicting versions of Netty. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5608) Cancel button not always visible
Shannon Carey created FLINK-5608: Summary: Cancel button not always visible Key: FLINK-5608 URL: https://issues.apache.org/jira/browse/FLINK-5608 Project: Flink Issue Type: Bug Components: Webfrontend Affects Versions: 1.1.4 Reporter: Shannon Carey Assignee: Shannon Carey Priority: Minor When the window is not wide enough, or when the job name is too long, the "Cancel" button in the Job view of the web UI is not visible because it is the first element that gets wrapped down and gets covered by the secondary navbar (the tabs). This causes us to often need to resize the browser wider than our monitor in order to use the cancel button. In general, the use of Bootstrap's ".navbar-fixed-top" is problematic if the content may wrap, especially if the content's horizontal width if not known & fixed. The ".navbar-fixed-top" uses fixed positioning, and therefore any unexpected change in height will result in overlap with the rest of the normal-flow content in the page. The Bootstrap docs explain this in their "Overflowing content" callout. I am submitting a PR which does not attempt to resolve all issues with the fixed navbar approach, but attempts to improve the situation by using less horizontal space and by altering the layout approach of the Cancel button. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5542) YARN client incorrectly uses local YARN config to check vcore capacity
Shannon Carey created FLINK-5542: Summary: YARN client incorrectly uses local YARN config to check vcore capacity Key: FLINK-5542 URL: https://issues.apache.org/jira/browse/FLINK-5542 Project: Flink Issue Type: Bug Components: YARN Affects Versions: 1.1.4 Reporter: Shannon Carey See http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/1-1-4-on-YARN-vcores-change-td11016.html When using bin/yarn-session.sh, AbstractYarnClusterDescriptor line 271 in 1.1.4 is comparing the user's selected number of vcores to the vcores configured in the local node's YARN config (from YarnConfiguration eg. yarn-site.xml and yarn-default.xml). It incorrectly prevents Flink from launching even if there is sufficient vcore capacity on the cluster. That is not correct, because the application will not necessarily run on the local node. For example, if running the yarn-session.sh client from the AWS EMR master node, the vcore count there may be different from the vcore count on the core nodes where Flink will actually run. A reasonable way to fix this would probably be to reuse the logic from "yarn-session.sh -q" (FlinkYarnSessionCli line 550) which knows how to get vcore information from the real worker nodes. Alternatively, perhaps we could remove the check entirely and rely on YARN's Scheduler to determine whether sufficient resources exist. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5425) JobManager replaced by IP in metrics
Shannon Carey created FLINK-5425: Summary: JobManager replaced by IP in metrics Key: FLINK-5425 URL: https://issues.apache.org/jira/browse/FLINK-5425 Project: Flink Issue Type: Bug Components: Metrics Affects Versions: 1.1.3 Reporter: Shannon Carey Priority: Minor In metrics at the jobmanager level and below, the "" scope variable is being replaced by the IP rather than the hostname. The taskmanager metrics, meanwhile, use the host name. You can see the job manager behavior at https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java#L147 compared to TaskManagerLocation#getHostname(). The problem with this is mainly that due to the presence of "." (period) characters in the IP address and thereby the metric name, the metric names show up strangely in Graphite/Grafana, where "." is the metric group separator. If it's not possible to make jobmanager metrics use the hostname, I suggest replacing "." with "-" in the section. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5424) Improve Restart Strategy Logging
Shannon Carey created FLINK-5424: Summary: Improve Restart Strategy Logging Key: FLINK-5424 URL: https://issues.apache.org/jira/browse/FLINK-5424 Project: Flink Issue Type: Improvement Components: Core Reporter: Shannon Carey Assignee: Shannon Carey Priority: Minor I'll be submitting a PR which includes some minor improvements to logging related to restart strategies. Specifically, I added a toString so that the log contains better info about failure-rate restart strategy, and I added an explanation in the log when the restart strategy is responsible for preventing job restart (currently, there's no indication that the restart strategy had anything to do with it). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-5322) yarn.taskmanager.env value does not appear in System.getenv
Shannon Carey created FLINK-5322: Summary: yarn.taskmanager.env value does not appear in System.getenv Key: FLINK-5322 URL: https://issues.apache.org/jira/browse/FLINK-5322 Project: Flink Issue Type: Bug Components: YARN Environment: Flink 1.1.3 on AWS EMR emr-5.2.0 (Hadoop "Amazon 2.7.3") Reporter: Shannon Carey Priority: Trivial Fix For: 1.1.3 The value I specified in flink-conf.yaml {code} yarn.taskmanager.env: MY_ENV: test {code} is not available in {{System.getenv("MY_ENV")}} from the plan execution (execution flow of main method) nor from within execution of a streaming operator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Flink survey by data Artisans
There's a newline that disrupts the URL. http://www.surveygizmo.com/s3/3166399/181bdb611f22 Not: http://www.surveygizmo.com/s3/ 3166399/181bdb611f22
[jira] [Created] (FLINK-4803) Job Cancel can hang forever waiting for OutputFormat.close()
Shannon Carey created FLINK-4803: Summary: Job Cancel can hang forever waiting for OutputFormat.close() Key: FLINK-4803 URL: https://issues.apache.org/jira/browse/FLINK-4803 Project: Flink Issue Type: Bug Affects Versions: 1.1.1 Reporter: Shannon Carey If the Flink job uses a badly-behaved OutputFormat (in this example, a HadoopOutputFormat containing a CqlBulkOutputFormat), where the close() method blocks forever, it is impossible to cancel the Flink job even though the blocked thread would respond to an interrupt. The stack traces below show the state of the important threads when a job is canceled and the OutputFormat is blocking forever inside of close(). I suggest that `DataSinkTask.cancel()` method be updated to add a timeout on `this.format.close()`. When the timeout is reached, the Task thread should be interrupted. {code} java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158) - waiting to lock <0x0006bae5f788> (a java.lang.Object) at org.apache.flink.runtime.operators.DataSinkTask.cancel(DataSinkTask.java:268) at org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1149) at java.lang.Thread.run(Thread.java:745) "DataSink (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" #6410 daemon prio=5 os_prio=0 tid=0x7fb7e79a4800 nid=0x2ad8 waiting on condition [0x7fb7bdf78000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0006c5ab5e20> (a java.util.concurrent.SynchronousQueue$TransferStack) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460) at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362) at java.util.concurrent.SynchronousQueue.offer(SynchronousQueue.java:895) at org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.put(SSTableSimpleUnsortedWriter.java:194) at org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.sync(SSTableSimpleUnsortedWriter.java:180) at org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.close(SSTableSimpleUnsortedWriter.java:156) at org.apache.cassandra.io.sstable.CQLSSTableWriter.close(CQLSSTableWriter.java:275) at org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:133) at org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:126) at org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158) - locked <0x0006bae5f788> (a java.lang.Object) at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:234) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] Proposal for Asynchronous I/O in FLINK
Not to derail this thread onto another topic but the problem with using a static instance is that there's no way to shut it down when the job stops. So if, for example, it starts threads, I don't think those threads will stop when the job stops. I'm not very well versed in how various Java 8 implementations perform unloading of classloaders & class definitions/statics therein, but it seems problematic unless the job provides a shutdown hook to which user code can subscribe. On 9/21/16, 8:05 PM, "David Wang" <dwan...@gmail.com> wrote: >Hi Shannon, > >That's right. This FLIP aims to boost TPS of the task workers with async >i/o operation. > >As what Stephan has mentioned, by placing static attribute to shared >resources(like event pool, connection), it is possible to share those >resources among different slots in the same JVM. > >I will make a note in the FLIP about how to share resources ;D > >Thanks, >David > >2016-09-22 1:46 GMT+08:00 Stephan Ewen <se...@apache.org>: > >> @Shannon: One could have a "static" broker to share the same netty across >> slots in the same JVM. Implicitly, Flink does the same with broadcast >> variables. >> >> On Wed, Sep 21, 2016 at 5:04 PM, Shannon Carey <sca...@expedia.com> wrote: >> >> > David, >> > >> > I just wanted to say "thanks" for making this proposal! I'm also >> > interested in performing nonblocking I/O (multiplexing threads/reactive >> > programming) within Flink operators so that we can, for example, >> > communicate with external web services with Netty/RxNetty without >> blocking >> > an entire Flink slot (aka a thread) while we wait for the operation to >> > complete. It looks like your FLIP will enable that use case. >> > >> > I'm not sure whether it will be possible to share one Netty >> EventLoopGroup >> > (or the equivalent for any other non-blocking framework, connection pool, >> > etc.) among multiple slots in a single JVM though. Flink supports >> > open/close operation on a RichFunction, but that's on a per-slot basis. I >> > don't know of a way to open/close objects on a per-job-JVM basis. But I >> > suppose that's an issue that should be discussed and resolved separately. >> > >> > -Shannon >> > >> > >>
Re: [DISCUSS] Proposal for Asynchronous I/O in FLINK
David, I just wanted to say "thanks" for making this proposal! I'm also interested in performing nonblocking I/O (multiplexing threads/reactive programming) within Flink operators so that we can, for example, communicate with external web services with Netty/RxNetty without blocking an entire Flink slot (aka a thread) while we wait for the operation to complete. It looks like your FLIP will enable that use case. I'm not sure whether it will be possible to share one Netty EventLoopGroup (or the equivalent for any other non-blocking framework, connection pool, etc.) among multiple slots in a single JVM though. Flink supports open/close operation on a RichFunction, but that's on a per-slot basis. I don't know of a way to open/close objects on a per-job-JVM basis. But I suppose that's an issue that should be discussed and resolved separately. -Shannon
[jira] [Created] (FLINK-4418) ClusterClient/ConnectionUtils#findConnectingAddress fails immediately if InetAddress.getLocalHost throws exception
Shannon Carey created FLINK-4418: Summary: ClusterClient/ConnectionUtils#findConnectingAddress fails immediately if InetAddress.getLocalHost throws exception Key: FLINK-4418 URL: https://issues.apache.org/jira/browse/FLINK-4418 Project: Flink Issue Type: Bug Components: Client Affects Versions: 1.1.0 Reporter: Shannon Carey When attempting to connect to a cluster with a ClusterClient, if the machine's hostname is not resolvable to an IP, an exception is thrown preventing success. This is the case if, for example, the hostname is not present & mapped to a local IP in /etc/hosts. The exception is below. I suggest that findAddressUsingStrategy() should catch java.net.UnknownHostException thrown by InetAddress.getLocalHost() and return null, allowing alternative strategies to be attempted by findConnectingAddress(). I will open a PR to this effect. Ideally this could be included in both 1.2 and 1.1.2. {code} 21:11:35 org.apache.flink.client.program.ProgramInvocationException: Failed to retrieve the JobManager gateway. 21:11:35at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:430) 21:11:35at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:90) 21:11:35at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389) 21:11:35at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75) 21:11:35at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:334) 21:11:35at com.expedia.www.flink.job.scheduler.FlinkJobSubmitter.get(FlinkJobSubmitter.java:81) 21:11:35at com.expedia.www.flink.job.scheduler.streaming.StreamingJobManager.run(StreamingJobManager.java:105) 21:11:35at com.expedia.www.flink.job.scheduler.JobScheduler.runStreamingApp(JobScheduler.java:69) 21:11:35at com.expedia.www.flink.job.scheduler.JobScheduler.main(JobScheduler.java:34) 21:11:35 Caused by: java.lang.RuntimeException: Failed to resolve JobManager address at /10.2.89.80:43126 21:11:35at org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:189) 21:11:35at org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:649) 21:11:35at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:428) 21:11:35... 8 more 21:11:35 Caused by: java.net.UnknownHostException: ip-10-2-64-47: ip-10-2-64-47: unknown error 21:11:35at java.net.InetAddress.getLocalHost(InetAddress.java:1505) 21:11:35at org.apache.flink.runtime.net.ConnectionUtils.findAddressUsingStrategy(ConnectionUtils.java:232) 21:11:35at org.apache.flink.runtime.net.ConnectionUtils.findConnectingAddress(ConnectionUtils.java:123) 21:11:35at org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:187) 21:11:35... 10 more 21:11:35 Caused by: java.net.UnknownHostException: ip-10-2-64-47: unknown error 21:11:35at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) 21:11:35at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928) 21:11:35at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323) 21:11:35at java.net.InetAddress.getLocalHost(InetAddress.java:1500) 21:11:35... 13 more {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Flink 1.1.0 : Hadoop 1/2 compatibility mismatch
Works for me, thanks! -Shannon
[jira] [Created] (FLINK-4334) Shaded Hadoop1 jar not fully excluded in Quickstart
Shannon Carey created FLINK-4334: Summary: Shaded Hadoop1 jar not fully excluded in Quickstart Key: FLINK-4334 URL: https://issues.apache.org/jira/browse/FLINK-4334 Project: Flink Issue Type: Bug Components: Quickstarts Affects Versions: 1.0.3, 1.0.2, 1.0.1, 1.1.0 Reporter: Shannon Carey The Shaded Hadoop1 jar has artifactId flink-shaded-hadoop1_2.10 since Flink 1.0.0 (see https://github.com/apache/flink/commit/2c4e4d1ffaf4107fb802c90858184fc10af66837), but the quickstart POMs both refer to it as flink-shaded-hadoop1. If using "-Pbuild-jar", the problem is not encountered. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3711) Scala fold() example syntax incorrect
Shannon Carey created FLINK-3711: Summary: Scala fold() example syntax incorrect Key: FLINK-3711 URL: https://issues.apache.org/jira/browse/FLINK-3711 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.0.1, 1.0.0 Reporter: Shannon Carey Priority: Minor Scala's KeyedStream#fold which accepts scala.Function2 is defined as a partially appliable function. The documentation, however, is written as if it is a non-partial function. -- This message was sent by Atlassian JIRA (v6.3.4#6332)