[jira] [Created] (FLINK-11419) StreamingFileSink fails to recover after taskmanager failure
Edward Rojas created FLINK-11419: Summary: StreamingFileSink fails to recover after taskmanager failure Key: FLINK-11419 URL: https://issues.apache.org/jira/browse/FLINK-11419 Project: Flink Issue Type: Bug Components: filesystem-connector Affects Versions: 1.7.1 Reporter: Edward Rojas If a job with a StreamingFileSink sending data to HDFS is running in a cluster with multiple taskmanagers and the taskmanagers executing the job goes down (for some reason) "missing data in tmp file" because it's not able to perform a truncate in the file. Here the full stack trace: {code:java} java.io.IOException: Missing data in tmp file: hdfs://path/to/hdfs/2019-01-20/.part-0-0.inprogress.823f9c20-3594-4fe3-ae8c-f57b6c35e191 at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.(HadoopRecoverableFsDataOutputStream.java:93) at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396) at org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to TRUNCATE_FILE /path/to/hdfs/2019-01-20/.part-0-0.inprogress.823f9c20-3594-4fe3-ae8c-f57b6c35e191 for DFSClient_NONMAPREDUCE_-2103482360_62 on x.xxx.xx.xx because this file lease is currently owned by DFSClient_NONMAPREDUCE_1834204750_59 on x.xx.xx.xx at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:3190) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncateInternal(FSNamesystem.java:2282) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncateInt(FSNamesystem.java:2228) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2198) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1056) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:622) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347) at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489) at org.apache.hadoop.ipc.Client.call(Client.java:1435) at org.apache.hadoop.ipc.Client.call(Client.java:1345) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngin
[jira] [Created] (FLINK-11337) Incorrect watermark in StreaminFileSink BucketAssigner.Context when used in connected stream
Edward Rojas created FLINK-11337: Summary: Incorrect watermark in StreaminFileSink BucketAssigner.Context when used in connected stream Key: FLINK-11337 URL: https://issues.apache.org/jira/browse/FLINK-11337 Project: Flink Issue Type: Bug Components: filesystem-connector Affects Versions: 1.7.0 Reporter: Edward Rojas When StreamingFileSink is used as sink of a connected stream the "invoke" method of the sink could be called before the "combinedWatermark" is updated with the timestamp of the element currently being processed, resulting on the context containing the incorrect watermark value (the Long.MIN_VALUE when using "AssignerWithPeriodicWatermarks" for the firsts events in the stream). I reproduce this when using a broadcast stream connected to a data stream. The broadcast stream is using a custom timestamp extractor that always return the Watermark.MAX_VALUE as it's done in a trining example here: [https://github.com/dataArtisans/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/solutions/datastream_java/broadcast/OngoingRidesSolution.java#L143.] This is problematic as the watermark could not be used reliably to compute the bucket id based on event time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11318) [Regression] StreamingFileSink can overwrite existing files
Edward Rojas created FLINK-11318: Summary: [Regression] StreamingFileSink can overwrite existing files Key: FLINK-11318 URL: https://issues.apache.org/jira/browse/FLINK-11318 Project: Flink Issue Type: Bug Components: filesystem-connector Affects Versions: 1.7.1 Reporter: Edward Rojas StreamingFileSink does not validate if a file with the same name of the new part file already exists and this could result in overwriting a file. The BucketingSink perform this kind of validations in the "openNewPartFile" method here: https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L549-L561 So this seems to be a regression an in the "old" BucketingSink this works. This can be problematic for example when migrating a job using Bucketing to to use the StreamingFileSink, file could be overwritten. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
Edward Rojas created FLINK-11087: Summary: Broadcast state migration Incompatibility from 1.5.3 to 1.7.0 Key: FLINK-11087 URL: https://issues.apache.org/jira/browse/FLINK-11087 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.7.0 Environment: Migration from Flink 1.5.3 to Flink 1.7.0 Reporter: Edward Rojas When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast state throws the following error: {noformat} org.apache.flink.util.StateMigrationException: The new key serializer for broadcast state must not be incompatible. at org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238) at org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:745){noformat} The broadcast is using a MapState with StringSerializer as key serializer and a custom JsonSerializer as value serializer. There was no changes in the TypeSerializers used, only upgrade of version. With some debugging I see that at the moment of the validation of the compatibility of states in the DefaultOperatorStateBackend class, the "*registeredBroadcastStates*" containing the data about the 'old' state, contains wrong association of the key and value serializer. This is, JsonSerializer appears as key serializer and StringSerializer appears as value serializer. (when it should be the contrary) After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" class is the responsible of this swap here: https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Re: Flink 1.5 - Job fails to execute in multiple taskmanagers (parallelism > 1)
I just tested the workaround and it works. Thank you -- Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
Re: Flink 1.5 - Job fails to execute in multiple taskmanagers (parallelism > 1)
Regarding heap, the only configurations I do explicitly are /`jobmanager.heap.mb`/, /`taskmanager.heap.mb`/ and /`taskmanager.memory.preallocate: false`/. All other settings for memory have their default value. I just tested and it fails only when SSL is enabled. -- Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
Flink 1.5 - Job fails to execute in multiple taskmanagers (parallelism > 1)
Hi all, I was testing Flink 1.5 rc5 and I found this issue. I'm running a cluster in HA mode with one jobmanager, several taskmanagers, each one with two task slots and default parallelism set to 2. I'm running two jobs, one simple one with a kafka consumer, a filter and a sink. The other a little bit more complex with a kafka consumer, filters, flatmaps, keyed process functions and sinks. Both jobs run correctly when they are assigned to run in the 2 slots of the same taskmanager. But when one slot in in one taskmanager and the other in a different one, the simpler job runs cor correctly but the complex one fails with the following error: org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: java.lang.UnsupportedOperationException: Heap buffer at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:170) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:79) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253) at org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.exceptionCaught(SslHandler.java:697) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:809) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:341) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.shaded.netty4.io.netty.handler.codec.DecoderException: java.lang.UnsupportedOperationException: Heap buffer at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:346) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:229) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) ... 9 more Caused by: java.lang.UnsupportedOperationException: Heap buffer at org.apache.flink.runtime.io.network.netty.NettyBufferPool.heapBuffer(NettyBufferPool.java:236) at
[jira] [Created] (FLINK-9261) Regression - Flink CLI and Web UI not working when SSL is enabled
Edward Rojas created FLINK-9261: --- Summary: Regression - Flink CLI and Web UI not working when SSL is enabled Key: FLINK-9261 URL: https://issues.apache.org/jira/browse/FLINK-9261 Project: Flink Issue Type: Bug Components: Client, Network, Web Client Affects Versions: 1.5.0 Reporter: Edward Rojas When *security.ssl.enabled* config is set to true, Web UI is no longer reachable; there is no logs on jobmanager. When setting *web.ssl.enabled* to false (keeping security.ssl.enabled to true), the dashboard is not reachable and there is the following exception on jobmanager: {code:java} WARN org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Unhandled exception org.apache.flink.shaded.netty4.io.netty.handler.ssl.NotSslRecordException: not an SSL/TLS record: 474554202f20485454502f312e310d0a486f73743a206c6f63616c686f73743a383038310d0a436f6e6e656374696f6e3a206b6565702d616c6976650d0a557067726164652d496e7365637572652d52657175657374733a20310d0a557365722d4167656e743a204d6f7a696c6c612f352e3020284d6163696e746f73683b20496e74656c204d6163204f5320582031305f31335f3329204170706c655765624b69742f3533372e333620284b48544d4c2c206c696b65204765636b6f29204368726f6d652f36352e302e32352e313831205361666172692f3533372e33360d0a4163636570743a20746578742f68746d6c2c6170706c69636174696f6e2f7868746d6c2b786d6c2c6170706c69636174696f6e2f786d6c3b713d302e392c696d6167652f776562702c696d6167652f61706e672c2a2f2a3b713d302e380d0a4163636570742d456e636f64696e673a20677a69702c206465666c6174652c2062720d0a4163636570742d4c616e67756167653a20656e2c656e2d47423b713d302e392c65732d3431393b713d302e382c65733b713d302e372c66722d46523b713d302e362c66723b713d302e350d0a436f6f6b69653a20496465612d39326365626136363d39396464633637632d613838382d346439332d396166612d3737396631373636326264320d0a49662d4d6f6469666965642d53696e63653a205468752c2032362041707220323031382031313a30313a313520474d540d0a0d0a at org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:940) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:315) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:229) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) at java.lang.Thread.run(Thread.java:745) {code} Also when trying to use the Flink CLI, it get stuck on "Waiting for response..." and there is no error messages on jobmanager. None of the commands works, list, run etc. Taskmanagers are able to registrate to Jobmanager, so the SSL configuration is good. SSL configuration: security.ssl.enabled: true security.ssl.keystore: /path/to/keystore security.ssl.keystore-password: security.ssl.key-password: security.ssl.truststore: /path/to/truststore security.ssl.truststore-password: web.ssl.enabled: false This same configuration works perfectly on Flink 1.4. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9255) Regression - Flink CLI -m,--jobmanager option not working
Edward Rojas created FLINK-9255: --- Summary: Regression - Flink CLI -m,--jobmanager option not working Key: FLINK-9255 URL: https://issues.apache.org/jira/browse/FLINK-9255 Project: Flink Issue Type: Bug Components: Client Affects Versions: 1.5.0 Environment: Local environment - Branch release-1.5 Reporter: Edward Rojas Every time the flink CLI is called with the -m option to specify the jobmanager address, the CLI is stuck on "Waiting for response..." and the Jobmanager reports the following error: WARN akka.remote.transport.netty.NettyTransport - Remote connection to [/x.x.x.x:] failed with org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 10485760: 1195725860 - discarded The error occurs even when run locally and try something like "flink list -m localhost:6123". But "flink list" works as expected. I'm using the version from the "release-1.5" branch. It seems like the commit introducing the regression is [link 47909f4 |https://github.com/apache/flink/commit/47909f466b9c9ee1f4caf94e9f6862a21b628817] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9103) SSL verification on TaskManager when parallelism > 1
Edward Rojas created FLINK-9103: --- Summary: SSL verification on TaskManager when parallelism > 1 Key: FLINK-9103 URL: https://issues.apache.org/jira/browse/FLINK-9103 Project: Flink Issue Type: Bug Components: Docker, Security Affects Versions: 1.4.0 Reporter: Edward Rojas Attachments: job.log, task0.log In dynamic environments like Kubernetes, the SSL certificates can be generated to use only the DNS addresses for validation of the identity of servers, given that the IP can change eventually. In this cases when executing Jobs with Parallelism set to 1, the SSL validations are good and the Jobmanager can communicate with Task manager and vice versa. But with parallelism set to more than 1, SSL validation fails when Task Managers communicate to each other as it seems to try to validate against IP address: Caused by: java.security.cert.CertificateException: No subject alternative names matching IP address 172.xx.xxx.xxx found at sun.security.util.HostnameChecker.matchIP(HostnameChecker.java:168) at sun.security.util.HostnameChecker.match(HostnameChecker.java:94) at sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:455) at sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:436) at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:252) at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:136) at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1601) ... 21 more >From the logs, it seems the task managers register successfully its full >address to Netty, but still the IP is used. Attached pertinent logs from JobManager and a TaskManager. -- This message was sent by Atlassian JIRA (v7.6.3#76005)